Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Monday, March 31, 2025

Wrong Job Interview

 


Lately I've heard about a question in a job interview:

You are given a shuffled list of 2*N+1 numbers, that contains N pairs of numbers, and one number that dos not have a pair. Find the non-paired number using only 2 integer variables.

Scroll down only when you want to know the answer...




















The solution to this, is to use one integer variable as index to scan the list, and the second variable as XOR based storage, so the algorithm would be:


for i in list:

    x = x XOR list[i]


The paired numbers XOR themselves to zero, and the only non-paired number remains in x.

This is since:

v XOR v = 0

and

v XOR 0 = v



Now, while this is a nice question, with a nice trick, the real question is what is the benefit of asking this question in a job interview? 

What do you understand if the interviewed person did managed to find the answer?
What do you understand if that person failed to find the answer?

Nothing.


In an interview we should pursue 3 main goals:

  1. Get a feeling about the kind of person. Would you have a beer with that person?
  2. Test the knowledge of the person in a specific field or programming language.
  3. See how does this person cope with thinking of complex and changing problems.

The XOR question does not contribute to these goals, but instead only tell you if that person had the luck to think about the solution. So it is only testing if that person is lucky.

Listed below are interview related posts I've previously posted which you might find useful.

Monday, March 24, 2025

Auto Update of Argo Deployment

 



As part of the CI/CD, I need to update a deployment on argo, and then run system tests on this deployment. Instead of doing this manually, I've created a small Go code to handle the version replace, the sync after the update, and the waiting for the sync completion. The code is below. Feel free to copy and get inspiration from it.




type Test struct {
automationbase.AutomationBase
webClient *web.Client
token string
}

func TestValidation(_ *testing.T) {
t := Test{
AutomationBase: *automationbase.ProduceAutomationBase(),
webClient: web.CreateClient(0),
}

t.AutomationWorker = t.check
t.RunAutomation()
}

func (t *Test) check() {
t.login()

summary := t.getSummary()
updatedParameters := t.updateVersionInSummary(summary)
t.setSummary(updatedParameters)

t.sync()

for {
time.Sleep(5 * time.Second)

summary = t.getSummary()
if t.isSynced(summary) {
t.Log("sync done")
return
}
}
}

func (t *Test) getEnvSecure(
key string,
) string {
value := os.Getenv(key)
if value == "" {
kiterr.RaiseIfError(fmt.Errorf("%v environment variable is empty", key))
}
return value
}

func (t *Test) login() {
password := t.getEnvSecure("PIB_PASSWORD")
body := map[string]string{
"username": "admin",
"password": password,
}
response := t.sendRequestToArgo("POST", "/api/v1/session", body)
responseMap := t.interfaceJsonFromString(response)
token := responseMap["token"]
t.token = token.(string)
}

func (t *Test) sync() string {
version := t.getEnvSecure("PIB_VERSION")
fullVersion := fmt.Sprintf("%v-dev-%v", project, version)
data := fmt.Sprintf(`{"revision":"%v","prune":false,"dryRun":false,"strategy":{"hook":{"force":false}},"resources":null,"syncOptions":{"items":["CreateNamespace=true"]}}`, fullVersion)
bodyJson := t.interfaceJsonFromString(data)
return t.sendRequestToArgo("POST", "/api/v1/applications/"+project+"/sync", bodyJson)
}

func (t *Test) getSummary() string {
return t.sendRequestToArgo("GET", "/api/v1/applications/"+project, nil)
}

func (t *Test) setSummary(
parameters map[string]interface{},
) {
t.sendRequestToArgo("PUT", "/api/v1/applications/"+project, parameters)
}

func (t *Test) updateVersionInSummary(
summary string,
) map[string]interface{} {

parametersMap := t.interfaceJsonFromString(summary)
spec := t.interfaceJsonFromMap(parametersMap, "spec")
source := t.interfaceJsonFromMap(spec, "source")
helm := t.interfaceJsonFromMap(source, "helm")
version := t.getEnvSecure("PIB_VERSION")
fullVersion := fmt.Sprintf("%v-dev-%v", project, version)
source["targetRevision"] = fullVersion
helm["values"] = fmt.Sprintf("global:\n image:\n version: :dev-%v\n\n", version)
return parametersMap
}

func (t *Test) sendRequestToArgo(
method string,
path string,
body interface{},
) string {

var requestHeaders *web.SectionHeaders
if t.token != "" {
cookie := fmt.Sprintf("argocd.token=%v", t.token)
requestHeaders = web.ProduceSectionHeaders()
requestHeaders.SetHeader("Cookie", cookie)
}

t.Log("sending %v %v with body:\n%v", method, path, body)

fullPath := fmt.Sprintf("http://pib8.cloud-ng.net:31390%v", path)
var response string
t.webClient.SendRequestWithHeaders(
method,
fullPath,
body,
requestHeaders,
&response,
)

if len(response) > 0 {
jsonData := t.interfaceJsonFromString(response)
t.Log("response is:\n%v", kitjson.ObjectToStringIndented(jsonData))
}

// don't rush argo
time.Sleep(time.Second)

return response

}

func (t *Test) interfaceJsonFromMap(
input map[string]interface{},
key string,
) map[string]interface{} {
value := input[key]
if value == nil {
kiterr.RaiseIfError(fmt.Errorf("key not found: %v", key))
}
valueMap, ok := value.(map[string]interface{})
if !ok {
kiterr.RaiseIfError(fmt.Errorf("convert key %v failed for value:\n%v", key, kitjson.ObjectToStringIndented(value)))
}

return valueMap
}
func (t *Test) interfaceJsonArrayFromMap(
input map[string]interface{},
key string,
) []interface{} {
value := input[key]
if value == nil {
kiterr.RaiseIfError(fmt.Errorf("key not found: %v", key))
}
array, ok := value.([]interface{})
if !ok {
kiterr.RaiseIfError(fmt.Errorf("convert key %v failed", key))
}

return array
}

func (t *Test) interfaceJsonFromString(
data string,
) map[string]interface{} {
var jsonMap map[string]interface{}
err := json.Unmarshal([]byte(data), &jsonMap)
if err != nil {
kiterr.RaiseIfError(fmt.Errorf("unmarshalling failed: %v", data))
}
return jsonMap
}

func (t *Test) isSynced(
summary string,
) bool {
parametersMap := t.interfaceJsonFromString(summary)
status := t.interfaceJsonFromMap(parametersMap, "status")
if !t.isSyncOperationsDone(status) {
return false
}

if !t.isResourcesSyncDone(status) {
return false
}

return true
}

func (t *Test) isResourcesSyncDone(status map[string]interface{}) bool {
resources := t.interfaceJsonArrayFromMap(status, "resources")
for _, resource := range resources {
resourceMap, ok := resource.(map[string]interface{})
if !ok {
kiterr.RaiseIfError(fmt.Errorf("convert failed"))
}
kind := resourceMap["kind"]
if kind == "Job" || kind == "Role" || kind == "RoleBinding" {
// never synced
continue
}
resourceStatus := resourceMap["status"]
if resourceStatus != nil && resourceStatus != "Synced" {
t.Log("pending sync for:\n%v", kitjson.ObjectToStringIndented(resourceMap))
return false
}

if kind == "Deployment" || kind == "StatefulSet" {
health := t.interfaceJsonFromMap(resourceMap, "health")
heathStatus := health["status"]
if heathStatus != "Healthy" {
t.Log("pending sync for:\n%v", kitjson.ObjectToStringIndented(resourceMap))
return false
}
}
}
return true
}

func (t *Test) isSyncOperationsDone(status map[string]interface{}) bool {
operationState := t.interfaceJsonFromMap(status, "operationState")
phase := operationState["phase"]

if phase == "Succeeded" {
return true
}

t.Log("sync state %v", phase)
return false
}

Monday, March 17, 2025

Basic Must Have Training For New Software Engineer



 


In this post we will review the items a new software engineer arriving at a new work place should learn. I don't pretend that I can setup a list that would match any work place, but I do believe this match 80% of the jobs.


Listed below the items that the new comer should find short courses to learn about. The minimum investment time for each subject is also specified, as well a an example of a short free course.


Saturday, March 1, 2025

Kafka Batch Consume Using confluent-kafka-go


 


In this post we will show a performance test for kafka batch consumer.

First, let review a basic wrapper for the confluent kafka library.


package kafka

import (
"fmt"
kafkaApi "github.com/confluentinc/confluent-kafka-go/kafka"
"sync"
"time"
)

type Producer struct {
kafkaTopic string
producer *kafkaApi.Producer
errorsCount int
mutex sync.Mutex
}

type Consumer struct {
consumer *kafkaApi.Consumer
lastMessage *kafkaApi.Message
}

func CreateKafkaProducer(
kafkaBroker string,
kafkaTopic string,
) *Producer {
config := make(kafkaApi.ConfigMap)
config["bootstrap.servers"] = kafkaBroker
producer, err := kafkaApi.NewProducer(&config)
if err != nil {
panic(err)
}

go func() {
for {
event := <-producer.Events()
}
}()

return &Producer{
kafkaTopic: kafkaTopic,
producer: producer,
}
}

func (p *Producer) ProduceMessage(
key string,
messageData []byte,
) {
message := kafkaApi.Message{
Key: []byte(key),
Value: messageData,
TopicPartition: kafkaApi.TopicPartition{
Topic: &p.kafkaTopic,
Partition: kafkaApi.PartitionAny,
},
}

err := p.producer.Produce(&message, nil)
if err != nil {
panic(err)
}
}

func (p *Producer) Close() {
p.producer.Close()
}

func CreateKafkaConsumer(
kafkaBroker string,
kafkaTopic string,
consumerGroup string,
) *Consumer {
config := make(kafkaApi.ConfigMap)
config["bootstrap.servers"] = kafkaBroker
config["group.id"] = consumerGroup
//config["fetch.max.bytes"] = 50 * 1024 * 1024
//config["max.partition.fetch.bytes"] = 50 * 1024 * 1024
//config["auto.offset.reset"] = "earliest"
//config["api.version.request"] = false
//config["debug"] = "all"
consumer, err := kafkaApi.NewConsumer(&config)
if err != nil {
panic(err)
}

go func() {
for {
event := <-consumer.Events()
fmt.Printf("kafka consumer event: %v\n", event)
}
}()

err = consumer.Subscribe(kafkaTopic, nil)
if err != nil {
panic(err)
}
return &Consumer{
consumer: consumer,
}
}

func (c *Consumer) ReadMessage() []byte {
msg, err := c.consumer.ReadMessage(-1)
if err != nil {
panic(err)
}
c.lastMessage = msg
return msg.Value
}

func (c *Consumer) CommitLastMessage() {
_, err := c.consumer.CommitMessage(c.lastMessage)
if err != nil {
panic(err)
}
}

func (c *Consumer) CommitBulk() {
lastPartition := c.lastMessage.TopicPartition
partitions := []kafkaApi.TopicPartition{
{
Topic: lastPartition.Topic,
Partition: lastPartition.Partition,
Offset: lastPartition.Offset + 1,
},
}
_, err := c.consumer.CommitOffsets(partitions)
if err != nil {
panic(err)
}
}


To test this we run a consumer and a producer in parallel:



const broker = "localhost:9092"
const topic = "my-topic"

type Data struct {
MyId int64
A00 string
A01 string
A02 string
A03 string
A04 string
A05 string
A06 string
A07 string
A08 string
A09 string
}
type Test struct {
stubs.Stubs
id int64
}

func TestValidation(t *testing.T) {
test := Test{
Stubs: stubs.ProduceStubs(t),
}
defer test.TestCleanup()

test.check()
}

func (t *Test) check() {
if false {
return
}

go t.runProducer()
t.runConsumer()
}

func (t *Test) runConsumer() {
consumer := kafka.CreateKafkaConsumer(broker, topic, "my-group")
consumeLogger := progress.ProduceProgress(0, "consume")
consumeLogger.OnlyDelta = true
bulk := 0
startTime := time.Now()
lastLog := time.Now()
var totalConsume int64
for {
bytes := consumer.ReadMessage()
totalConsume++
var data Data
err := json.Unmarshal(bytes, &data)
kiterr.RaiseIfError(err)
consumeLogger.Increment()
bulk++
if bulk > 1000 {
bulk = 0
consumer.CommitBulk()
if time.Now().Sub(lastLog) > time.Second*10 {
lastLog = time.Now()
passed := time.Since(startTime)
perSecond := totalConsume / int64(passed.Seconds())
t.Log("average consume %v messages/sec", perSecond)
}
}
}
}

func (t *Test) runProducer() {
producer := kafka.CreateKafkaProducer(t.NowTime, broker, topic)
produceLogger := progress.ProduceProgress(0, "produce")
produceLogger.OnlyDelta = true
data := Data{
A00: kitstring.GetRandomString(100),
A01: kitstring.GetRandomString(100),
A02: kitstring.GetRandomString(100),
A03: kitstring.GetRandomString(100),
A04: kitstring.GetRandomString(100),
A05: kitstring.GetRandomString(100),
A06: kitstring.GetRandomString(100),
A07: kitstring.GetRandomString(100),
A08: kitstring.GetRandomString(100),
A09: kitstring.GetRandomString(100),
}
bytes := kitjson.ObjectToBytes(data)
for {
err := producer.ProduceMessage("", bytes)
if err != nil {
t.Log("ignoring error: %v", err)
time.Sleep(5 * time.Second)
}
produceLogger.Increment()
}
}


The kafka can be run as a docker container:

docker run -d --name=kafka -p 9092:9092 apache/kafka


The consumer reaches a rate of 120K on one core.