Sunday, November 26, 2023

Connect and traverse On AWS Neptune using GoLang

 

This is an example for using the gremlingo library to connect to the AWS Neptune graph database.

The documentation for the library is very poor, so I though adding this example might be required.

Notice that to run it, you need to run the GO binary in the same VPC as the AWS Neptune.


In the example we connect to the database, add some nodes, and traverse the added nodes.


package main

import (
"fmt"
gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver"
)

func main() {
// Creating the connection to the server.
connection, err := gremlingo.NewDriverRemoteConnection("wss://db-neptune-2.cluster-cby3ysf1nfyw.us-east-1.neptune.amazonaws.com:8182/gremlin",
func(settings *gremlingo.DriverRemoteConnectionSettings) {
settings.TraversalSource = "g"
})
if err != nil {
panic(err)
}
defer connection.Close()

useGraphDb(connection)
}

func useGraphDb(connection *gremlingo.DriverRemoteConnection) {
traversalSource := gremlingo.Traversal_().WithRemote(connection)

alice := addVertex(traversalSource, "buyer", "alice")
bob := addVertex(traversalSource, "buyer", "bob")
charlie := addVertex(traversalSource, "buyer", "charlie")
bread := addVertex(traversalSource, "product", "bread")
butter := addVertex(traversalSource, "product", "butter")

_, err := traversalSource.AddE("buy").From(alice).To(bread).Next()
if err != nil {
panic(err)
}
_, err = traversalSource.AddE("buy").From(bob).To(bread).Next()
if err != nil {
panic(err)
}
_, err = traversalSource.AddE("buy").From(charlie).To(butter).Next()
if err != nil {
panic(err)
}

printList(traversalSource.V())
printList(traversalSource.E())
printList(traversalSource.E().Property("type", "buy").V())
}

func printList(traversalSource *gremlingo.GraphTraversal) {
results, err := traversalSource.Limit(100).ToList()
if err != nil {
panic(err)
}
fmt.Printf("the list has %v items\n", len(results))

for _, r := range results {
fmt.Printf("%+v\n", r.String())
}
}

func addVertex(
traversalSource *gremlingo.GraphTraversalSource,
vertexType string,
vertexName string,
) *gremlingo.Vertex {
result, err := traversalSource.AddV(vertexType).Property("name", vertexName).Next()
if err != nil {
panic(err)
}
vertex, err := result.GetVertex()
if err != nil {
panic(err)
}
return vertex
}



Monday, November 20, 2023

Using OpenAI in Go


 


In this post we will review how to use OpenAI API in Go. We will download an HTML file, and let OpenAI analyze it.


Let's start with the basic, we need to send HTTP requests both for downloading the HTML file, and for accessing OpenAI API, hence the function to handle HTTP requests is:


func sendHttpRequest(
method string,
fullUrl string,
headers map[string]string,
body interface{},
) string {
httpClient := http.DefaultClient
var bodyReader io.Reader
if body != nil {
bodyBytes, err := json.Marshal(body)
if err != nil {
panic(err)
}
bodyReader = bytes.NewReader(bodyBytes)
}

httpRequest, err := http.NewRequest(method, fullUrl, bodyReader)
for key, value := range headers {
httpRequest.Header.Set(key, value)
}
httpResponse, err := httpClient.Do(httpRequest)
if err != nil {
panic(err)
}
bodyBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
panic(err)
}

bodyString := string(bodyBytes)

if httpResponse.StatusCode != 200 {
panic(bodyString)
}
return bodyString
}


Let's analyze a specific HTML file:


analyzeHtml("https://assets.bounceexchange.com/assets/bounce/local_storage_frame17.min.html")



and the analyze function is:


func analyzeHtml(
fullUrl string,
) {
htmlData := sendHttpRequest("GET", fullUrl, nil, nil)

openAiListModels()

guide := "provide the following in JSON format:\n" +
"1. length, int, the characters amount in the HTML\n" +
"2. scripts, boolean, does the HTML include javascripts"
openAiCompletions(&completionRequest{
Model: "gpt-3.5-turbo-1106",
MaxTokens: 100,
Messages: []*completionMessage{
{
Role: "system",
Content: guide,
},
{
Role: "user",
Content: htmlData,
},
},
Temperature: 0,
})
}


We download the HTML file, and then use OpenAI API to analyze it.

As a side note, we can list the models available for us:

func openAiListModels() {
headers := map[string]string{
"Authorization": "Bearer " + Config.OpenAiKey,
}
result := sendHttpRequest("GET", "https://api.openai.com/v1/models", headers, nil)
print(result)
}


The structure of the completion request is:

type completionMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}

type completionRequest struct {
Model string `json:"model"`
Messages []*completionMessage `json:"messages"`
Temperature float32 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
}



and an example output is:


{
"id": "chatcmpl-8MyaJSYyAu4CVheOnHeQaA4Z7ThRQ",
"object": "chat.completion",
"created": 1700486795,
"model": "gpt-3.5-turbo-1106",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "```json\n{\n \"length\": 2283,\n \"scripts\": true\n}\n```"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 705,
"completion_tokens": 20,
"total_tokens": 725
},
"system_fingerprint": "fp_eeff13170a"
}


This output can now be parsed by another code, and act automatically as required.



Sunday, November 12, 2023

AWS Kinesis vs. AWS SQS


 


In previous post we've demonstrated Using AWS Kinesis in Go, and Using AWS SQS in Go. In this post we'll review when should we use each of this technologies.

Before going into details, there is one rule of thumb that let you know which technology to use. This might not be always correct, and we should check the details below for the final decision, but this is a very good start point. 

Use AWS SQS if the following is required:

  • High rate and transparent dynamic scalability of messages processing
  • Ordering is not important
Use AWS Kinesis if the following is required:
  • Reading of the same message by several consumers
  • Replay of messages 
  • Ordering is important


Let's dive into the details. We will review each of the characteristics of the technologies, and compare where it better to use one over the other.


Scale

AWS SQS automatically transparently handles dynamic load changes. This means the application automatically adjusts to load changes. This is a huge advantage for AWS SQS.

AWS Kinesis can handle high load but the AWS Kinesis shards and the application needs to be preconfigured with the expected load. Change of the shards amount requires both change of application behavior and a resharding operation, which is definitely not a fun game.


Multiple Consumers

AWS SQS supports message level operations. We can read a message, and then we should delete it within a preconfigured time limit. Once a message is read and deleted it is no longer available to other consumers. If we require multiple consumers reading the same messages, we cannot do it by AWS SQS itself, but we can use AWS SNS in combination with AWS SQS to create a fanout pattern. This fanout is preconfigured, and dynamic changes to the fanout configuration require both application changes and SQS+SNS changes.

AWS Kinesis works on stream level, and the messages are kept in the stream until the end of the retention period. This enable us using multiple consumers reading the same message, each using its own iterator. More than that, the consumers can be created on the fly without any preconfiguration on the AWS Kinesis side.


Ordering

AWS SQS does not guarantee message ordering. We can how ever use an AWS SQS FIFO queue that guarantees order as well as exactly-once processing. This however comes with a cost both in price and in performance.

AWS Kinesis does guarantee message ordering, but only on the shard level. This is usually fine, as implementation with AWS Kinesis tends to create consumer for each application logical entity on the shard level.


Message Size

AWS SQS supports messages up to 256KiB.
AWS Kinesis supports messages up to 1MiB.


Throughput

AWS SQS supports nearly unlimited number of transactions per second (!!!).

AWS Kinesis supports for each shard:
  • write 1MB/1000 transactions per second
  • read 2MB/2000 transactions per second

AWS Services Integration

AWS SQS messages can be configured to be processed by AWS Lambda.

AWS Kinesis messages can be configured to be processes by many AWS services such as AWS S3, AWS EkasticSearch, AWS Lambda, and more.


Pricing

In general first look on the costs seems to be similar, but the details here matter especially if we plan to process huge amount of data. The costs should be compared based of the amount of shards we might need for AWS Kinesis. There is not guideline here (sorry...), we will need to plan the entire solution to get an estimation of the cost.




Sunday, November 5, 2023

Using AWS Kinesis in Go


 

In this post we will review a simple Go implementation of AWS kinesis producer and consumer.


The main function starts the producer and the consumer, and then waits forever.


import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"sync"
"time"
)

func main() {
go producer()
go consumer()

waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
waitGroup.Wait()
}


The producer writes a record to AWS kinesis stream every second.


func producer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)
for i := 0; ; i++ {
input := kinesis.PutRecordInput{
Data: []byte(fmt.Sprintf("message data %v", i)),
PartitionKey: aws.String(fmt.Sprintf("key%v", i)),
StreamName: aws.String("s1"),
}
output, err := kinesisClient.PutRecord(&input)
if err != nil {
panic(err)
}

fmt.Printf("produced record to shard %v\n", *output.ShardId)
time.Sleep(time.Second)
}
}


The consumer is more complex. First we need to list the stream's shards, and start a consumer for each of the shards.


func consumer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)

input := kinesis.DescribeStreamInput{
StreamName: aws.String("s1"),
}
output, err := kinesisClient.DescribeStream(&input)
if err != nil {
panic(err)
}

fmt.Printf("%v shards loacted\n", len(output.StreamDescription.Shards))

for i := 0; i < len(output.StreamDescription.Shards); i++ {
shardId := output.StreamDescription.Shards[i].ShardId
go consumerShard(kinesisClient, shardId)
}
}


Each shard consumer has a small trick. First we use iterator to get all the new records. Once we got a record, we modify the iterator to read all records after the last received record.


func consumerShard(
kinesisClient *kinesis.Kinesis,
shardId *string,
) {
iteratorInput := kinesis.GetShardIteratorInput{
ShardId: shardId,
ShardIteratorType: aws.String("LATEST"),
StreamName: aws.String("s1"),
}
shardIteratorOutput, err := kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}

var lastRecord *string
for {
recordsInput := kinesis.GetRecordsInput{
ShardIterator: shardIteratorOutput.ShardIterator,
}
records, err := kinesisClient.GetRecords(&recordsInput)
if err != nil {
panic(err)
}

fmt.Printf("shard %v got %v records\n", *shardId, len(records.Records))

for i := 0; i < len(records.Records); i++ {
record := records.Records[i]
fmt.Printf("shard %v data: %v\n", *shardId, string(record.Data))
lastRecord = record.SequenceNumber
}

time.Sleep(5 * time.Second)

if lastRecord != nil {
iteratorInput.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
iteratorInput.StartingSequenceNumber = lastRecord
shardIteratorOutput, err = kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}
}
}
}


An example output for running this is below.


produced record to shard shardId-000000000000
4 shards loacted
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 0 records
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
produced record to shard shardId-000000000003
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 3
shard shardId-000000000003 got 2 records
shard shardId-000000000003 data: message data 1
shard shardId-000000000003 data: message data 4
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 2
shard shardId-000000000002 got 0 records
produced record to shard shardId-000000000000
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
shard shardId-000000000000 got 2 records
shard shardId-000000000000 data: message data 5
shard shardId-000000000000 data: message data 9
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 7
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 2 records
shard shardId-000000000001 data: message data 6
shard shardId-000000000001 data: message data 8
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 4 records
shard shardId-000000000003 data: message data 10
shard shardId-000000000003 data: message data 11
shard shardId-000000000003 data: message data 12
shard shardId-000000000003 data: message data 13
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000002
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000002
produced record to shard shardId-000000000000
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 18
shard shardId-000000000002 got 2 records
shard shardId-000000000002 data: message data 14
shard shardId-000000000002 data: message data 17
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 16
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 15
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000


This is just a starting point for using AWS kinesis, as they are many details that affect the stability of a production grade solution. Still, this is a good starting point.