Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Wednesday, November 27, 2024

Redpanda Connect Introduction

 



Redpanda Connect, previously known as Benthos, is a streaming pipeline that reads and write messages from/to many connectors. It enables transforming the messages using built-in processors. The goal of this framework is to enable us connect and convert data streams without writing a proprietary code, but only using a configuration file with minimal dedicated processing code. 


To install use the following:

curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
unzip rpk-linux-amd64.zip
sudo mv rpk /usr/local/bin/
rm rpk-linux-amd64.zip


Create a file named connect.yaml:


input:
stdin: {}

pipeline:
processors:
- mapping: root = content().uppercase()

output:
stdout: {}


And run it:

rpk connect run connect.yaml


Now any input to the STDIN is copied to the STDOUT.


While Redpanda connect can manage string messages, most f its abilities are built toward JSON messages handling. For example, consider the following connect.yaml file:


input:
generate:
interval: 1s
count: 0
mapping: |
let first_name = fake("first_name")
let last_name = fake("last_name")

root.id = counter()
root.name = ($first_name + " " + $last_name)
root.timestamp = now()

pipeline:
processors:
- sleep:
duration: 100ms
- group_by:
- check: this.id % 2 == 0
processors:
- mapping: |
root.original_doc = this
root.encoded = this.name.hash("sha256").encode("base64")

output:
stdout: {}


It will generate the following output:

{"id":1,"name":"Ivah Mohr","timestamp":"2024-11-27T16:36:37.542410543+02:00"}
{"encoded":"oaaBKF/7oz0N6j6VlSZs14u8FD2dAwPSBAoJvIIMpWI=","original_doc":{"id":2,"name":"Darrion Miller","timestamp":"2024-11-27T16:36:38.541429372+02:00"}}
{"id":3,"name":"Maddison Paucek","timestamp":"2024-11-27T16:36:39.542047832+02:00"}
{"encoded":"+e6EyBEILJYStdV+DH6cohEVvfW04VTo1q2YLXp4ft8=","original_doc":{"id":4,"name":"Eleazar Sporer","timestamp":"2024-11-27T16:36:40.542410537+02:00"}}
{"id":5,"name":"Junius Renner","timestamp":"2024-11-27T16:36:41.542298415+02:00"}
{"encoded":"A4o97ySPf9yWFMXcutPBiI6a6Fd19ofqBmK/7s84ZZ4=","original_doc":{"id":6,"name":"Carlie Osinski","timestamp":"2024-11-27T16:36:42.542277315+02:00"}}
{"id":7,"name":"Raphaelle Reichel","timestamp":"2024-11-27T16:36:43.542382662+02:00"}
{"encoded":"U+pLsYjRaWH87nUFdZmRLJwvGrIfUOCYeUrazKGoEHA=","original_doc":{"id":8,"name":"Elmira Douglas","timestamp":"2024-11-27T16:36:44.542245761+02:00"}}
{"id":9,"name":"Ambrose Hudson","timestamp":"2024-11-27T16:36:45.542307678+02:00"}
{"encoded":"J2NxrIaC1cvHtqIduSOx85TlyFLQrT48QaYw8iR9To0=","original_doc":{"id":10,"name":"Jedidiah Veum","timestamp":"2024-11-27T16:36:46.542276779+02:00"}}



Final Words

Yeah, that's cool: You save time coding and fixing bugs. This is nice if you need a simple fast and reliable pipeline.
However, in real life you will usually require more the the builtin abilities, and then you need to extend Redpanda Connect with your own code in GO. Is it better than just writing all in GO? I think it depends on how much bugs do you expect from the programmer.







Monday, November 18, 2024

Go ReadLine for Long Lines


 


In this post we present the correct method of reading lines from a long text in GO.


The naive method of reading lines in go is using bufio.Scanner:

scanner := bufio.NewScanner(file)
for scanner.Scan() {
fmt.Println(scanner.Text())
}


This does not work in case of long lines. Instead, we should use bufio.Reader. However, the compiling of long lines is cumbersome, and a KISS wrapper is missing in the GO standard library, hence I've created it myself.


import (
"bufio"
"io"
"strings"
)

type LineReader struct {
reader *bufio.Reader
isEof bool
}

func ProduceLineReader(
text string,
) *LineReader {
reader := bufio.NewReader(strings.NewReader(text))

return &LineReader{
reader: reader,
isEof: false,
}
}

func (r *LineReader) GetLine() string {
var longLineBuffer *strings.Builder
multiLines := false
for {
line, isPrefix, err := r.reader.ReadLine()
if err == io.EOF {
r.isEof = true
return ""
}

if isPrefix {
multiLines = true
}

if !multiLines {
// simple single line
return string(line)
}

if longLineBuffer == nil {
// create only if needed - better performance
longLineBuffer = &strings.Builder{}
}

longLineBuffer.Write(line)
if !isPrefix {
// end of long line
return longLineBuffer.String()
}
}
}

func (r *LineReader) IsEof() bool {
return r.isEof
}


An example of usage is:

reader := kitstring.ProduceLineReader(text)
for {
line := reader.GetLine()
if reader.IsEof() {
break
}
t.Log("read line: %v", line)
}



Monday, November 11, 2024

OpenAPI Schema In Go



 

In this post we present a utilty to produce an OpenAPI file using GO commands. This is useful in case the application detects the schema, and is required to supply an OpenAPI file of the detected schema.



package openapi

import (
"gopkg.in/yaml.v3"
"strings"
)

const applicationJson = "application/json"

type InLocation int

const (
InLocationQuery InLocation = iota + 1
InLocationHeader
InLocationCookie
InLocationPath
)

type SchemaType int

const (
SchemaTypeString SchemaType = iota + 1
SchemaTypeInt
SchemaTypeObject
SchemaTypeArray
)

type Info struct {
Title string `yaml:"title,omitempty"`
Description string `yaml:"description,omitempty"`
Version string `yaml:"version,omitempty"`
}

type Schema struct {
Type string `yaml:"type,omitempty"`
Enum []string `yaml:"enum,omitempty"`
Properties map[string]*Schema `yaml:"properties,omitempty"`
Items *Schema `yaml:"items,omitempty"`
}

type Parameter struct {
In string `yaml:"in,omitempty"`
Name string `yaml:"name,omitempty"`
Description string `yaml:"description,omitempty"`
Required bool `yaml:"required,omitempty"`
Schema *Schema `yaml:"schema,omitempty"`
}

type Content map[string]*Schema

type RequestBody struct {
Description string `yaml:"description,omitempty"`
Required bool `yaml:"required,omitempty"`
Content *Content `yaml:"content,omitempty"`
}

type Response struct {
Description string `yaml:"description,omitempty"`
Content *Content `yaml:"content,omitempty"`
}

type Method struct {
Summary string `yaml:"summary,omitempty"`
Description string `yaml:"description,omitempty"`
Deprecated string `yaml:"deprecated,omitempty"`
Parameters []*Parameter `yaml:"parameters,omitempty"`
RequestBody *RequestBody `yaml:"requestBody,omitempty"`
Responses map[string]*Response `yaml:"responses,omitempty"`
}

type Path map[string]*Method

type OpenApi struct {
OpenApi string `yaml:"openapi,omitempty"`
Info *Info `yaml:"info,omitempty"`
Paths map[string]*Path `yaml:"paths,omitempty"`
}

func produceSchema() *Schema {
return &Schema{
Properties: make(map[string]*Schema),
}
}

func ProduceOpenApi() *OpenApi {
return &OpenApi{
OpenApi: "3.0.0",
Paths: make(map[string]*Path),
}
}

func (o *OpenApi) CreateYamlBytes() []byte {
bytes, err := yaml.Marshal(o)
kiterr.RaiseIfError(err)
return bytes
}

func (o *OpenApi) CreateYamlString() string {
return string(o.CreateYamlBytes())
}

func (o *OpenApi) SetPath(
path string,
) *Path {
for pathUrl, pathObject := range o.Paths {
if pathUrl == path {
return pathObject
}
}
pathObject := make(Path)
o.Paths[path] = &pathObject
return &pathObject
}

func (p *Path) SetMethod(
method string,
) *Method {
method = strings.ToLower(method)

pathObject := *p
existingMethod := pathObject[method]
if existingMethod != nil {
return existingMethod
}
methodObject := Method{
Responses: make(map[string]*Response),
}
pathObject[method] = &methodObject
return &methodObject
}

func (m *Method) SetParameter(
name string,
) *Parameter {
for _, parameter := range m.Parameters {
if parameter.Name == name {
return parameter
}
}

parameter := Parameter{
Name: name,
}
m.Parameters = append(m.Parameters, &parameter)
return &parameter
}

func (p *Parameter) SetInLocation(
in InLocation,
) *Parameter {
switch in {
case InLocationQuery:
p.In = "query"
case InLocationCookie:
p.In = "cookie"
case InLocationHeader:
p.In = "header"
case InLocationPath:
p.In = "path"
}
return p
}

func (p *Parameter) SetSchema(
schemaType SchemaType,
) *Parameter {
schema := p.Schema
if schema == nil {
schema = produceSchema()
p.Schema = schema
}

schema.SetType(schemaType)
return p
}

func (s *Schema) SetType(
schemaType SchemaType,
) *Schema {
switch schemaType {
case SchemaTypeString:
s.Type = "string"
case SchemaTypeInt:
s.Type = "integer"
case SchemaTypeObject:
s.Type = "object"
case SchemaTypeArray:
s.Type = "array"
}
return s
}

func (s *Schema) SetProperty(
name string,
schemaType SchemaType,
) *Schema {
property := s.Properties[name]
if property == nil {
property = produceSchema()
s.Properties[name] = property
}

property.SetType(schemaType)
return property
}

func (s *Schema) SetPropertyArray(
name string,
) *Schema {
array := s.SetProperty(name, SchemaTypeArray)
array.Items = produceSchema()
return array.Items
}

func (m *Method) SetRequestContent(
contentType string,
) *Schema {
body := m.RequestBody
if body == nil {
body = &RequestBody{}
m.RequestBody = body
}

content := body.Content
if content == nil {
content = &Content{}
body.Content = content
}

contentObject := *content
schema := contentObject[contentType]
if schema == nil {
schema = produceSchema()
contentObject[contentType] = schema
}

return schema
}

func (m *Method) SetContentApplicationJson() *Schema {
return m.SetRequestContent(applicationJson)
}

func (m *Method) SetResponseContent(
responseCode string,
contentType string,
) *Schema {
response := m.Responses[responseCode]
if response == nil {
response = &Response{}
m.Responses[responseCode] = response
}

content := response.Content
if content == nil {
content = &Content{}
response.Content = content
}

contentObject := *content
schema := contentObject[contentType]
if schema == nil {
schema = produceSchema()
contentObject[contentType] = schema
}

return schema
}

func (m *Method) SetResponseSuccessContentApplicationJson() *Schema {
return m.SetResponseContent("200", applicationJson)
}



The sample usage below creates 2 endpoints of list-items and add-item.


api := openapi.ProduceOpenApi()

method := api.SetPath("/api/list-items").SetMethod("GET")
method.Description = "list all store items"

method.SetParameter("store-id").
SetInLocation(openapi.InLocationQuery).
SetSchema(openapi.SchemaTypeInt)

listStoreSchema := method.SetResponseSuccessContentApplicationJson()
existingItemSchema := listStoreSchema.SetPropertyArray("items")
existingItemSchema.SetProperty("id", openapi.SchemaTypeInt)
existingItemSchema.SetProperty("name", openapi.SchemaTypeString)
existingItemSchema.SetProperty("price", openapi.SchemaTypeInt)

method = api.SetPath("/api/add-item").SetMethod("POST")
method.Description = "add item to store"
addItemSchema := method.SetContentApplicationJson()
addItemSchema.SetType(openapi.SchemaTypeObject)
newItemSchema := addItemSchema.SetProperty("item", openapi.SchemaTypeObject)
newItemSchema.SetProperty("name", openapi.SchemaTypeString)
newItemSchema.SetProperty("price", openapi.SchemaTypeInt)

t.Log("Schema is:\n\n%v", api.CreateYamlString())


The result openAPI file is:


openapi: 3.0.0
paths:
/api/add-item:
post:
description: add item to store
requestBody:
content:
application/json:
type: object
properties:
item:
type: object
properties:
name:
type: string
price:
type: integer
/api/list-items:
get:
description: list all store items
parameters:
- in: query
name: store-id
schema:
type: integer
responses:
"200":
content:
application/json:
properties:
items:
type: array
items:
properties:
id:
type: integer
name:
type: string
price:
type: integer



Tuesday, November 5, 2024

Streamlit


 


In this post we present a simple example of a streamlit based application. Streamlit is a framework providing simple and fast GUI development for internal use. The nice thing is that it is very simple, the coding is pure python without any Javascript and without multiple processes, Notice that the GUI is not for the end user, but mostly for internal developers use, since due to its simplicity it is also limited.



The following code handles configuration for a long running process, and rerunning the process if and only if the configuration changes. Some screenshots are below:








The simple python code is below.


import random
import time
from datetime import datetime

import pandas as pd
import streamlit as st


def display_bars(amount):
data = []
for i in range(amount):
data.append(['person {}'.format(i), random.randint(10, 50)])
df = pd.DataFrame(data, columns=['Name', 'Age'])
st.bar_chart(df, x="Name", y="Age")


@st.cache_data
def run_long_processing(compute_time):
progress = st.empty()

with progress.container():
st.text('starting with {} times'.format(compute_time))
for i in range(compute_time):
time.sleep(1)
st.text('iteration {}'.format(i))
time.sleep(1)
progress.empty()

st.text('task with {} times is complete'.format(compute_time))
display_bars(compute_time)


@st.fragment(run_every='1s')
def display_time():
st.write(datetime.now())


def main():
st.title('Demo Application')
display_time()

if st.button('clear cache'):
st.cache_data.clear()

tab1, tab2 = st.tabs(['config', 'results'])

with tab1:
st.session_state['compute_time'] = st.slider('select compute time', 1, 10, 3)
with tab2:
compute_time = st.session_state['compute_time']
run_long_processing(compute_time)


main()