What if you want to have a scheduler in a GO application?
At first glance it seems that you have a builtin solution that is provided by the GO tickers, as shown in the GO by example page.
But this solution is naive.
It does not avoid parallel run of the task, so in case the scheduled task is delayed for more than the schedule interval, another invocation of the task will run in parallel.
For example:
Task X is configured to run every 10 seconds.
But due to a system stress, it runs for 15 seconds.
The result would be as following:
00:00:00 - The application starts
00:00:10 - Task X is starting in a thread#1. We have 1 running task
00:00:20 - Task X is starting in a thread#2. We have 2 running tasks
00:00:25 - Task X is is complete in a thread#1. We have 1 running task
00:00:30 - Task X is starting in a thread#3. We have 2 running tasks
In real life, parallel invocation of tasks would cause slower performance, and hence might cause accumulation of more than just 2 tasks.
Example for Scheduler Usage
The solution is to use a scheduler that starts a new task only after the previous was completed.
A simple usage of such scheduler is:
package main import ( "fmt" "math/rand" "scheduler-demo/scheduler" "time" ) func main() { s := scheduler.Create(myScheduledItem, time.Second) s.Start() time.Sleep(10 * time.Second) s.Stop() } func myScheduledItem() { format := "2006-01-02 15:04:05.000" sleepTime := time.Duration(rand.Intn(2000)) * time.Millisecond fmt.Printf("start running at %v, sleeping %v\n", time.Now().UTC().Format(format), sleepTime) time.Sleep(sleepTime) fmt.Printf("end running at %v\n===\n", time.Now().UTC().Format(format)) }
This usage schedules a task to run every second, but the task lasts up to 2 seconds.
The Scheduler Library
This is handled in the scheduler library:
package scheduler import ( "sync" "time" ) type Worker func() type Scheduler struct { interval time.Duration waitGroup sync.WaitGroup worker Worker stopChannel chan int } func Create( worker Worker,interval time.Duration) *Scheduler { return &Scheduler{ worker: worker, stopChannel: make(chan int, 1), interval:interval, } } func (s *Scheduler) Start() { s.waitGroup.Add(1) go s.schedule() } func (s *Scheduler) Stop() { s.stopChannel <- 0 s.waitGroup.Wait() } func (s *Scheduler) schedule() { timer := time.NewTimer(time.Nanosecond) for { select { case <-timer.C: s.runWorker(timer) case <-s.stopChannel: s.waitGroup.Done() return } } } func (s *Scheduler) runWorker(timer *time.Timer) { startTime := time.Now() s.worker() passedTime := time.Now().Sub(startTime) waitTime := s.interval - passedTime if waitTime < 0 { waitTime = time.Nanosecond } timer.Reset(waitTime) }
We can see that the scheduler starts the next task only after the current task is complete.
It will try to tune the run toward 1 second since the last start time, but in case of delays, it will wait longer. An example of run output is below.
start running at 2020-03-12 05:17:17.684, sleeping 81ms end running at 2020-03-12 05:17:17.765 === start running at 2020-03-12 05:17:18.684, sleeping 1.887s end running at 2020-03-12 05:17:20.571 === start running at 2020-03-12 05:17:20.571, sleeping 1.847s end running at 2020-03-12 05:17:22.418 === start running at 2020-03-12 05:17:22.418, sleeping 59ms end running at 2020-03-12 05:17:22.478 === start running at 2020-03-12 05:17:23.419, sleeping 81ms end running at 2020-03-12 05:17:23.500 === start running at 2020-03-12 05:17:24.419, sleeping 1.318s end running at 2020-03-12 05:17:25.737 === start running at 2020-03-12 05:17:25.737, sleeping 425ms end running at 2020-03-12 05:17:26.162 === start running at 2020-03-12 05:17:26.737, sleeping 540ms end running at 2020-03-12 05:17:27.277 ===
Final Notes
We have shown a simple scheduler, which avoids parallel invocations of tasks.
The scheduler is only a simple example, and can be further improved by allowing parallel task, while limiting the amount of parallel tasks.
No comments:
Post a Comment