Wednesday, March 11, 2020

Go Scheduler





What if you want to have a scheduler in a GO application?
At first glance it seems that you have a builtin solution that is provided by the GO tickers, as shown in the GO by example page.

But this solution is naive.
It does not avoid parallel run of the task, so in case the scheduled task is delayed for more than the schedule interval, another invocation of the task will run in parallel.

For example:
Task X is configured to run every 10 seconds.
But due to a system stress, it runs for 15 seconds.
The result would be as following:

00:00:00 - The application starts
00:00:10 - Task X is starting in a thread#1. We have 1 running task
00:00:20 - Task X is starting in a thread#2. We have 2 running tasks
00:00:25 - Task X is is complete in a thread#1. We have 1 running task
00:00:30 - Task X is starting in a thread#3. We have 2 running tasks

In real life, parallel invocation of tasks would cause slower performance, and hence might cause accumulation of more than just 2 tasks.

Example for Scheduler Usage


The solution is to use a scheduler that starts a new task only after the previous was completed.
A simple usage of such scheduler is:


package main
import (
   "fmt"
   "math/rand"
   "scheduler-demo/scheduler"
   "time"
)

func main() {
   s := scheduler.Create(myScheduledItem, time.Second)
   s.Start()

   time.Sleep(10 * time.Second)
   s.Stop()
}

func myScheduledItem() {
   format := "2006-01-02 15:04:05.000"
   sleepTime := time.Duration(rand.Intn(2000)) * time.Millisecond
   fmt.Printf("start running at %v, sleeping %v\n", time.Now().UTC().Format(format), sleepTime)
   time.Sleep(sleepTime)
   fmt.Printf("end running at %v\n===\n", time.Now().UTC().Format(format))
}


This usage schedules a task to run every second, but the task lasts up to 2 seconds.


The Scheduler Library


The scheduler should avoid parallel invocations of this task.
This is handled in the scheduler library:


package scheduler
import (
   "sync"
   "time"
)

type Worker func()


type Scheduler struct {
   interval time.Duration
   waitGroup   sync.WaitGroup
   worker      Worker
   stopChannel chan int
}

func Create( worker Worker,interval time.Duration) *Scheduler {
   return &Scheduler{
      worker:      worker,
      stopChannel: make(chan int, 1),
      interval:interval,
   }
}

func (s *Scheduler) Start() {
   s.waitGroup.Add(1)
   go s.schedule()
}

func (s *Scheduler) Stop() {
   s.stopChannel <- 0
   s.waitGroup.Wait()
}

func (s *Scheduler) schedule() {
   timer := time.NewTimer(time.Nanosecond)
   for {
      select {
      case <-timer.C:
         s.runWorker(timer)

      case <-s.stopChannel:
         s.waitGroup.Done()
         return
      }
   }
}

func (s *Scheduler) runWorker(timer *time.Timer) {
   startTime := time.Now()
   s.worker()
   passedTime := time.Now().Sub(startTime)
   waitTime := s.interval - passedTime
   if waitTime < 0 {
      waitTime = time.Nanosecond
   }
   timer.Reset(waitTime)
}


We can see that the scheduler starts the next task only after the current task is complete.
It will try to tune the run toward 1 second since the last start time, but in case of delays, it will wait longer. An example of run output is below.


start running at 2020-03-12 05:17:17.684, sleeping 81ms
end running at 2020-03-12 05:17:17.765
===
start running at 2020-03-12 05:17:18.684, sleeping 1.887s
end running at 2020-03-12 05:17:20.571
===
start running at 2020-03-12 05:17:20.571, sleeping 1.847s
end running at 2020-03-12 05:17:22.418
===
start running at 2020-03-12 05:17:22.418, sleeping 59ms
end running at 2020-03-12 05:17:22.478
===
start running at 2020-03-12 05:17:23.419, sleeping 81ms
end running at 2020-03-12 05:17:23.500
===
start running at 2020-03-12 05:17:24.419, sleeping 1.318s
end running at 2020-03-12 05:17:25.737
===
start running at 2020-03-12 05:17:25.737, sleeping 425ms
end running at 2020-03-12 05:17:26.162
===
start running at 2020-03-12 05:17:26.737, sleeping 540ms
end running at 2020-03-12 05:17:27.277
===


Final Notes


We have shown a simple scheduler, which avoids parallel invocations of tasks.
The scheduler is only a simple example, and can be further improved by allowing parallel task, while limiting the amount of parallel tasks.





No comments:

Post a Comment