Wednesday, April 28, 2021

Reading from AWS Kinesis using GoLang


 


In this post we will review reading from AWS Kinesis using GoLang. 

AWS kinesis is widely used in AWS services, for example, it can be used to read real time logging records from CloudFront. See this blog for setup of a real time logging on CloudFront.


To access AWS services, we start with authentication to AWS. See this blog for alternatives for authentication method. Once authentication is configured, we can connect to the kinesis data stream. The record reading is done from a specific shard. In this example, we will select the first available shard.



import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func consume() {
awsConfig, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"))
if err != nil {
panic(err)
}

client := kinesis.NewFromConfig(awsConfig)

streamName := "my-kinesis-data-stream"
describeInput := kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
}
describeOutput, err := client.DescribeStream(context.Background(), &describeInput)
if err != nil {
panic(err)
}
shard := describeOutput.StreamDescription.Shards[0]


The reading from the stream is done using an infinite busy waiting loop, and an iterator. In this example, we start reading from the first record in the shard, and keep waiting for additional new records.



iteratorInput := kinesis.GetShardIteratorInput{
ShardId: shard.ShardId,
ShardIteratorType: "TRIM_HORIZON",
StreamName: aws.String(streamName),
StartingSequenceNumber: nil,
Timestamp: nil,
}
iteratorOutput, err := client.GetShardIterator(context.Background(), &iteratorInput)
if err != nil {
panic(err)
}

var iterator = *iteratorOutput.ShardIterator
for {
getInput := kinesis.GetRecordsInput{
ShardIterator: &iterator,
Limit: nil,
}
getOutput, err := client.GetRecords(context.Background(), &getInput)
if err != nil {
panic(err)
}
for _, record := range getOutput.Records {
data := string(record.Data)
fmt.Printf("data: %v\n", data)
}

iterator = *getOutput.NextShardIterator
}






Tuesday, April 20, 2021

CloudFront Real-Time Logging using CloudFormation



 

In this post we will review how to use a CloudFormation template to configure Real-Time Logging to a Kinesis data stream.

First we need to create the kinesis data stream:



KinesisDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: my-kinesis-data-stream
RetentionPeriodHours: 24
ShardCount: 1



Next, we configure a IAM role with permission to write to the kinesis data stream:



RealTimeLogggingRole:
Type: AWS::IAM::Role
Properties:
Tags:
- Key: Name
Value: my-real-time-logging-role
Path: "/"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service: cloudfront.amazonaws.com
Policies:
- PolicyName: my-real-time-logging-policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:DescribeStreamSummary
- kinesis:DescribeStream
- kinesis:PutRecord
- kinesis:PutRecords
Resource:
- !GetAtt KinesisDataStream.Arn



Now we can configure the real time logging to use the IAM role and the kinesis stream:



RealTimeLoggging:
Type: AWS::CloudFront::RealtimeLogConfig
Properties:
Name: my-real-time-logging
SamplingRate: 100
Fields:
- timestamp
- c-ip
- cs-host
- cs-uri-stem
- cs-headers
EndPoints:
- StreamType: Kinesis
KinesisStreamConfig:
RoleArn: !GetAtt RealTimeLogggingRole.Arn
StreamArn: !GetAtt KinesisDataStream.Arn



The last thing to do, is to configure our CloudFront distribution to use this real time logging:



CloudFrontDistribution:
Type: AWS::CloudFront::Distribution
Properties:
DistributionConfig:
DefaultCacheBehavior:
RealtimeLogConfigArn: !Ref RealTimeLoggging



Notice that the CloudFront distribution displayed here is only partial. For a full example of a CloudFront creation, see this post.


Tuesday, April 13, 2021

Throttle API calls in GO


 


In this post we will review how to throttle API calls to a library in GO.

We assume that the library does some updates upon a notification. In this case, the notification does not contain any data, it simply means: "Something changed, do your stuff". 

Now, we want the update to occur no more than once in a minute. It does not matter how many time it was notified, only that the library does its work no more than once in a minute.

To do this, we use an updates channel, and a GO routine. The throttle struct includes the channel for updates, as well as the configuration of throttle.



package throttle

import (
"time"
)

type Handler func() error

type Throttle struct {
interval time.Duration
handler Handler
updates chan bool
lastRun time.Time
}

func CreateThrottle(
interval time.Duration,
handler Handler,
) *Throttle {
return &Throttle{
interval: interval,
handler: handler,
updates: make(chan bool, 100),
lastRun: time.Now(),
}
}




Next we include the API calls for the throttle:



func (t *Throttle) Start() {
go t.channelLoop()
}

func (t *Throttle) Notify() {
t.updates <- true
}



and last, we implement the GO routine to handle the updates. Notice that in case the notifications are too frequent, we schedule a later run of the API upon a timer.



func (t *Throttle) channelLoop() {
var nextTimeout *time.Duration

for {
if nextTimeout == nil {
_ = <-t.updates
} else {
select {
case _ = <-t.updates:
case <-time.After(*nextTimeout):
}
}

passedTime := time.Now().Sub(t.lastRun)
if passedTime < t.interval {
timeout := t.interval - passedTime
nextTimeout = &timeout
} else {
err := t.handler()
if err != nil {
panic(err)
}
t.lastRun = time.Now()
nextTimeout = nil
}
}
}



That's it! 

The throttle API is ready. Let's see and example of usage:



throttling = CreateThrottle(time.Minute, myExecutor)
throttling.Start()












Wednesday, April 7, 2021

How To Fake Source IP XFF Header



 

Recently in one of our test sites, I had to fake my source IP, as I had to test the GUI response to multiple source IPs. I had to work using a valid browser, in my case Chrome.

The first thing I've tried is using IPFuck Chrome extension, but it failed. Chrome was aware that it is sending an additional header, and the site was blocking this behavior using the Access-Control-Allow-Headers option. 


The solution in my case was to add a NGINX reverse proxy to handle the header addition. I have setup a local NGINX to proxy the request to their original target.

The NGINX run script is using docker:



docker stop faker
docker rm faker
docker run --name faker --network host -v ${PWD}/empty:/docker-entrypoint.d -v ${PWD}/nginx.conf:/etc/nginx/nginx.conf nginx



And the folder of the script contains a folder named "empty", as well as nginx.conf file:



user  nginx;
worker_processes 1;

error_log /dev/stdout debug;
pid /var/run/nginx.pid;

events {
worker_connections 1024;
}

http {
include /etc/nginx/mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /dev/stdout;

sendfile on;
keepalive_timeout 65;

server {
listen 8080;

location / {
resolver 10.221.1.47;
proxy_pass http://$http_host$uri$is_args$args;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For 52.14.41.49;
}
}
}






To make the browser use the NGINX reverse proxy, I had to setup it to use the proxy localhost:8080.



Final Note

Notice that this is working for HTTP sites. 

HTTPS sites should have additional configuration for the SSL support.