Wednesday, March 11, 2020

Go Scheduler

What if you want to have a scheduler in a GO application?
At first glance it seems that you have a builtin solution that is provided by the GO tickers, as shown in the GO by example page.

But this solution is naive.
It does not avoid parallel run of the task, so in case the scheduled task is delayed for more than the schedule interval, another invocation of the task will run in parallel.

For example:
Task X is configured to run every 10 seconds.
But due to a system stress, it runs for 15 seconds.
The result would be as following:

00:00:00 - The application starts
00:00:10 - Task X is starting in a thread#1. We have 1 running task
00:00:20 - Task X is starting in a thread#2. We have 2 running tasks
00:00:25 - Task X is is complete in a thread#1. We have 1 running task
00:00:30 - Task X is starting in a thread#3. We have 2 running tasks

In real life, parallel invocation of tasks would cause slower performance, and hence might cause accumulation of more than just 2 tasks.

Example for Scheduler Usage

The solution is to use a scheduler that starts a new task only after the previous was completed.
A simple usage of such scheduler is:

package main
import (

func main() {
   s := scheduler.Create(myScheduledItem, time.Second)

   time.Sleep(10 * time.Second)

func myScheduledItem() {
   format := "2006-01-02 15:04:05.000"
   sleepTime := time.Duration(rand.Intn(2000)) * time.Millisecond
   fmt.Printf("start running at %v, sleeping %v\n", time.Now().UTC().Format(format), sleepTime)
   fmt.Printf("end running at %v\n===\n", time.Now().UTC().Format(format))

This usage schedules a task to run every second, but the task lasts up to 2 seconds.

The Scheduler Library

The scheduler should avoid parallel invocations of this task.
This is handled in the scheduler library:

package scheduler
import (

type Worker func()

type Scheduler struct {
   interval time.Duration
   waitGroup   sync.WaitGroup
   worker      Worker
   stopChannel chan int

func Create( worker Worker,interval time.Duration) *Scheduler {
   return &Scheduler{
      worker:      worker,
      stopChannel: make(chan int, 1),

func (s *Scheduler) Start() {
   go s.schedule()

func (s *Scheduler) Stop() {
   s.stopChannel <- 0

func (s *Scheduler) schedule() {
   timer := time.NewTimer(time.Nanosecond)
   for {
      select {
      case <-timer.C:

      case <-s.stopChannel:

func (s *Scheduler) runWorker(timer *time.Timer) {
   startTime := time.Now()
   passedTime := time.Now().Sub(startTime)
   waitTime := s.interval - passedTime
   if waitTime < 0 {
      waitTime = time.Nanosecond

We can see that the scheduler starts the next task only after the current task is complete.
It will try to tune the run toward 1 second since the last start time, but in case of delays, it will wait longer. An example of run output is below.

start running at 2020-03-12 05:17:17.684, sleeping 81ms
end running at 2020-03-12 05:17:17.765
start running at 2020-03-12 05:17:18.684, sleeping 1.887s
end running at 2020-03-12 05:17:20.571
start running at 2020-03-12 05:17:20.571, sleeping 1.847s
end running at 2020-03-12 05:17:22.418
start running at 2020-03-12 05:17:22.418, sleeping 59ms
end running at 2020-03-12 05:17:22.478
start running at 2020-03-12 05:17:23.419, sleeping 81ms
end running at 2020-03-12 05:17:23.500
start running at 2020-03-12 05:17:24.419, sleeping 1.318s
end running at 2020-03-12 05:17:25.737
start running at 2020-03-12 05:17:25.737, sleeping 425ms
end running at 2020-03-12 05:17:26.162
start running at 2020-03-12 05:17:26.737, sleeping 540ms
end running at 2020-03-12 05:17:27.277

Final Notes

We have shown a simple scheduler, which avoids parallel invocations of tasks.
The scheduler is only a simple example, and can be further improved by allowing parallel task, while limiting the amount of parallel tasks.

No comments:

Post a Comment