-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer.go
More file actions
709 lines (637 loc) · 21.1 KB
/
queuer.go
File metadata and controls
709 lines (637 loc) · 21.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
package queuer
import (
"context"
"database/sql"
"encoding/json"
"log"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/siherrmann/queuer/core"
"github.com/siherrmann/queuer/database"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// Queuer represents the main queuing system.
// It manages job scheduling, execution, and error handling.
// It provides methods to start, stop, and manage jobs and workers.
// It also handles database connections and listeners for job events.
type Queuer struct {
DB *sql.DB
JobPollInterval time.Duration
WorkerPollInterval time.Duration
RetentionArchive time.Duration
// Context
ctx context.Context
cancel context.CancelFunc
// Runners
activeRunners sync.Map
// Logger
log *slog.Logger
// Worker
worker *model.Worker
workerMu sync.RWMutex
// DBs
dbConfig *helper.DatabaseConfiguration
dbJob database.JobDBHandlerFunctions
dbWorker database.WorkerDBHandlerFunctions
dbMaster database.MasterDBHandlerFunctions
// Job DB listeners
jobDbListener *database.QueuerListener
jobArchiveDbListener *database.QueuerListener
// Job listeners
jobInsertListener *core.Listener[*model.Job]
jobUpdateListener *core.Listener[*model.Job]
jobDeleteListener *core.Listener[*model.Job]
// Available functions
tasks map[string]*model.Task
nextIntervalFuncs map[string]model.NextIntervalFunc
}
// NewQueuer creates a new Queuer instance with the given name and max concurrency.
// It wraps NewQueuerWithDB to initialize the queuer without an external db config and encryption key.
// The encryption key for the database is taken from an environment variable (QUEUER_ENCRYPTION_KEY),
// if not provided, it defaults to unencrypted results.
func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer {
return NewQueuerWithDB(name, maxConcurrency, os.Getenv("QUEUER_ENCRYPTION_KEY"), nil, options...)
}
// NewQueuerWithDB creates a new Queuer instance with the given name and max concurrency.
// It initializes the database connection and worker.
// If options are provided, it creates a worker with those options.
//
// It takes the db configuration from environment variables if dbConfig is nil.
// - QUEUER_DB_HOST (required)
// - QUEUER_DB_PORT (required)
// - QUEUER_DB_DATABASE (required)
// - QUEUER_DB_USERNAME (required)
// - QUEUER_DB_PASSWORD (required)
// - QUEUER_DB_SCHEMA (required)
// - QUEUER_DB_SSLMODE (optional, defaults to "require")
//
// If the encryption key is empty, it defaults to unencrypted results.
//
// If any error occurs during initialization, it logs a panic error and exits the program.
// It returns a pointer to the newly created Queuer instance.
func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer {
// Logger
opts := helper.PrettyHandlerOptions{
SlogOpts: slog.HandlerOptions{
Level: slog.LevelInfo,
},
}
logger := slog.New(helper.NewPrettyHandler(os.Stdout, opts))
// Database
var err error
var dbCon *helper.Database
if dbConfig != nil {
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
} else {
var err error
dbConfig, err = helper.NewDatabaseConfiguration()
if err != nil {
log.Panicf("error creating database configuration: %s", err.Error())
}
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
}
// DBs
dbJob, err := database.NewJobDBHandler(dbCon, dbConfig)
if err != nil {
log.Panicf("error creating job db handler: %s", err.Error())
}
dbWorker, err := database.NewWorkerDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating worker db handler: %s", err.Error())
}
dbMaster, err := database.NewMasterDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating master db handler: %s", err.Error())
}
// Inserting worker
var newWorker *model.Worker
if len(options) > 0 {
newWorker, err = model.NewWorkerWithOptions(name, maxConcurrency, options[0])
if err != nil {
log.Panicf("error creating new worker with options: %s", err.Error())
}
} else {
newWorker, err = model.NewWorker(name, maxConcurrency)
if err != nil {
log.Panicf("error creating new worker: %s", err.Error())
}
}
// Worker
worker, err := dbWorker.InsertWorker(newWorker)
if err != nil {
log.Panicf("error inserting worker: %s", err.Error())
}
logger.Info("Queuer with worker created", slog.String("worker_name", newWorker.Name), slog.String("worker_rid", worker.RID.String()))
return &Queuer{
log: logger,
worker: worker,
DB: dbCon.Instance,
dbConfig: dbConfig,
dbJob: dbJob,
dbWorker: dbWorker,
dbMaster: dbMaster,
JobPollInterval: 1 * time.Minute,
WorkerPollInterval: 30 * time.Second,
tasks: map[string]*model.Task{},
nextIntervalFuncs: map[string]model.NextIntervalFunc{},
}
}
// NewStaticQueuer creates a new Queuer instance without a worker.
// It initializes the database connection and other necessary components.
// If any error occurs during initialization, it logs a panic error and exits the program.
// It returns a pointer to the newly created Queuer instance.
// This queuer instance does not listen to the db nor does it run jobs.
// It is primarily used for static operations like adding jobs, getting job status etc.
func NewStaticQueuer(logLevel slog.Leveler, dbConfig *helper.DatabaseConfiguration) *Queuer {
// Logger
opts := helper.PrettyHandlerOptions{
SlogOpts: slog.HandlerOptions{
Level: logLevel,
},
}
logger := slog.New(helper.NewPrettyHandler(os.Stdout, opts))
// Database
var err error
var dbCon *helper.Database
if dbConfig != nil {
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
} else {
var err error
dbConfig, err = helper.NewDatabaseConfiguration()
if err != nil {
log.Panicf("error creating database configuration: %s", err.Error())
}
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
}
// DBs
dbJob, err := database.NewJobDBHandler(dbCon, dbConfig)
if err != nil {
log.Panicf("error creating job db handler: %s", err.Error())
}
dbWorker, err := database.NewWorkerDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating worker db handler: %s", err.Error())
}
dbMaster, err := database.NewMasterDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating master db handler: %s", err.Error())
}
return &Queuer{
log: logger,
DB: dbCon.Instance,
dbConfig: dbConfig,
dbJob: dbJob,
dbWorker: dbWorker,
dbMaster: dbMaster,
JobPollInterval: 1 * time.Minute,
WorkerPollInterval: 30 * time.Second,
tasks: map[string]*model.Task{},
nextIntervalFuncs: map[string]model.NextIntervalFunc{},
}
}
// Start starts the queuer by initializing the job listeners and starting the job poll ticker.
// If masterSettings are provided, it also starts the master poll ticker.
// If masterSettings contain 0 values, they are set to default values.
// It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program.
// It runs the job processing in a separate goroutine and listens for job events.
//
// Detailed steps include:
// 1. Create job and job archive database listeners.
// 2. Create broadcasters for job insert, update, and delete events.
// 3. Start the job listeners to listen for job events.
// 4. Start the job poll ticker to periodically check for new jobs.
// 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds.
func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings) {
if ctx == nil || ctx == context.TODO() || cancel == nil {
panic("ctx and cancel must be set")
}
q.ctx = ctx
q.cancel = cancel
// DB listeners
var err error
q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job")
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job"))
q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive")
if err != nil {
log.Panicf("error creating job archive listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job_archive"))
// Broadcasters for job updates and deletes
broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT")
q.jobInsertListener, err = core.NewListener(broadcasterJobInsert)
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE")
q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate)
if err != nil {
log.Panicf("error creating job update listener: %s", err.Error())
}
broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE")
q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete)
if err != nil {
log.Panicf("error creating job delete listener: %s", err.Error())
}
// Update worker to running
q.workerMu.Lock()
q.worker.Status = model.WorkerStatusRunning
q.worker, err = q.dbWorker.UpdateWorker(q.worker)
q.workerMu.Unlock()
if err != nil {
log.Panicf("error updating worker status to running: %s", err.Error())
}
// Start pollers
ready := make(chan struct{})
go func() {
q.listen(ctx, cancel)
err = q.heartbeatTicker(ctx)
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting heartbeat ticker", slog.String("error", err.Error()))
return
}
err := q.pollJobTicker(ctx)
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting job poll ticker", slog.String("error", err.Error()))
return
}
if len(masterSettings) > 0 && masterSettings[0] != nil {
masterSettings[0].SetDefault()
err = q.pollMasterTicker(ctx, masterSettings[0])
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting master poll ticker", slog.String("error", err.Error()))
return
}
}
close(ready)
<-ctx.Done()
err = q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
}()
select {
case <-ready:
q.log.Info("Queuer started")
return
case <-time.After(5 * time.Second):
q.log.Error("Queuer failed to start within 5 seconds")
}
}
// Start starts the queuer by initializing the job listeners and starting the job poll ticker.
// It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program.
// It runs the job processing in a separate goroutine and listens for job events.
//
// This version does not run the job processing, allowing the queuer to be started without a worker.
// Is is useful if you want to run a queuer instance in a separate service without a worker,
// for example to handle listening to job events and providing a central frontend.
func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings) {
q.ctx = ctx
q.cancel = cancel
// Job listeners
var err error
if !withoutListeners {
q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job")
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job"))
q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive")
if err != nil {
log.Panicf("error creating job archive listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job_archive"))
}
// Broadcasters for job updates and deletes
broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT")
q.jobInsertListener, err = core.NewListener(broadcasterJobInsert)
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE")
q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate)
if err != nil {
log.Panicf("error creating job update listener: %s", err.Error())
}
broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE")
q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete)
if err != nil {
log.Panicf("error creating job delete listener: %s", err.Error())
}
// Start job listeners
ready := make(chan struct{})
go func() {
if !withoutListeners {
q.listenWithoutRunning(ctx, cancel)
}
if len(masterSettings) > 0 && masterSettings[0] != nil {
err = q.pollMasterTicker(ctx, masterSettings[0])
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting master poll ticker", slog.String("error", err.Error()))
return
}
}
close(ready)
<-ctx.Done()
err := q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
}()
select {
case <-ready:
q.log.Info("Queuer without worker started")
return
case <-time.After(5 * time.Second):
q.log.Error("Queuer failed to start within 5 seconds")
}
}
// Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs,
// and cancelling the context to stop the queuer.
func (q *Queuer) Stop() error {
// Check if already stopped
if q.DB == nil {
return nil // Already stopped
}
// Close db listeners
if q.jobDbListener != nil {
err := q.jobDbListener.Listener.Close()
if err != nil && !strings.Contains(err.Error(), "Listener has been closed") {
return helper.NewError("closing job insert listener", err)
}
}
if q.jobArchiveDbListener != nil {
err := q.jobArchiveDbListener.Listener.Close()
if err != nil && !strings.Contains(err.Error(), "Listener has been closed") {
return helper.NewError("closing job archive listener", err)
}
}
// Update worker status to stopped
var err error
var workerRID uuid.UUID
q.workerMu.Lock()
if q.worker != nil {
q.worker.Status = model.WorkerStatusStopped
q.worker, err = q.dbWorker.UpdateWorker(q.worker)
if err == nil && q.worker != nil {
workerRID = q.worker.RID
}
}
q.workerMu.Unlock()
if err != nil {
return helper.NewError("updating worker status to stopped", err)
}
// Cancel all queued and running jobs (only if we have a valid worker RID)
if workerRID != uuid.Nil {
err = q.CancelAllJobsByWorker(workerRID, 100)
if err != nil {
return helper.NewError("cancelling all jobs by worker", err)
}
}
q.activeRunners.Range(func(key, value interface{}) bool {
value.(*core.Runner).Cancel()
q.activeRunners.Delete(key)
return true
})
// Cancel the context to stop the queuer
if q.ctx != nil {
q.cancel()
}
// Wait a moment for background goroutines to finish gracefully
time.Sleep(100 * time.Millisecond)
// Close database connection
if q.DB != nil {
q.log.Info("Closing database connection")
err = q.DB.Close()
if err != nil {
q.log.Error("error closing database connection", slog.String("error", err.Error()))
}
}
q.log.Info("Queuer stopped")
return nil
}
// Internal
// listen listens to job events and runs the initial job processing.
func (q *Queuer) listen(ctx context.Context, cancel context.CancelFunc) {
readyJob := make(chan struct{})
readyJobArchive := make(chan struct{})
go func() {
close(readyJob)
q.jobDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
switch job.Status {
case model.JobStatusQueued, model.JobStatusScheduled:
q.jobInsertListener.Notify(job.ToJob())
err = q.runJobInitial()
if err != nil {
q.log.Error("Error running job", slog.String("error", err.Error()))
return
}
default:
q.jobUpdateListener.Notify(job.ToJob())
}
})
}()
<-readyJob
go func() {
close(readyJobArchive)
q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
switch job.Status {
case model.JobStatusCancelled:
runner, ok := q.activeRunners.Load(job.RID)
if ok {
q.log.Info("Canceling running job", slog.String("job_id", job.RID.String()))
runner.(*core.Runner).Cancel()
q.activeRunners.Delete(job.RID)
}
default:
q.jobDeleteListener.Notify(job.ToJob())
}
})
}()
<-readyJobArchive
}
func (q *Queuer) listenWithoutRunning(ctx context.Context, cancel context.CancelFunc) {
readyJob := make(chan struct{})
readyJobArchive := make(chan struct{})
go func() {
close(readyJob)
q.jobDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
if job.Status == model.JobStatusQueued || job.Status == model.JobStatusScheduled {
q.jobInsertListener.Notify(job.ToJob())
} else {
q.jobUpdateListener.Notify(job.ToJob())
}
})
}()
<-readyJob
go func() {
close(readyJobArchive)
q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
q.jobDeleteListener.Notify(job.ToJob())
})
}()
<-readyJobArchive
}
func (q *Queuer) heartbeatTicker(ctx context.Context) error {
ticker, err := core.NewTicker(
q.WorkerPollInterval,
func() {
q.log.Debug("Sending worker heartbeat...")
q.workerMu.RLock()
worker := q.worker
q.workerMu.RUnlock()
if worker == nil {
return
}
workerFromDb, err := q.dbWorker.SelectWorker(worker.RID)
if err != nil {
q.log.Error("Error selecting worker for heartbeat", slog.String("error", err.Error()))
return
}
switch workerFromDb.Status {
case model.WorkerStatusStopped:
q.log.Info("Stopping worker...", slog.String("worker_status", string(workerFromDb.Status)))
err = q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
return
case model.WorkerStatusStopping:
if workerFromDb.MaxConcurrency != 0 {
q.log.Info("Gracefully stopping worker...", slog.String("worker_status", string(workerFromDb.Status)))
workerFromDb.MaxConcurrency = 0
workerFromDb, err = q.dbWorker.UpdateWorker(workerFromDb)
if err != nil {
q.log.Error("Error updating worker concurrency", slog.String("error", err.Error()))
return
}
} else if helper.LenSyncMap(&q.activeRunners) == 0 {
q.log.Info("All running jobs finished, stopping worker...", slog.String("worker_status", string(workerFromDb.Status)))
workerFromDb.Status = model.WorkerStatusStopped
workerFromDb, err = q.dbWorker.UpdateWorker(workerFromDb)
if err != nil {
q.log.Error("Error updating worker status to stopped", slog.String("error", err.Error()))
return
}
err = q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
return
}
default:
workerFromDb, err = q.dbWorker.UpdateWorker(worker)
if err != nil {
q.log.Error("Error updating worker heartbeat", slog.String("error", err.Error()))
return
}
}
q.workerMu.Lock()
q.worker = workerFromDb
q.workerMu.Unlock()
},
)
if err != nil {
return helper.NewError("creating heartbeat ticker", err)
}
q.log.Info("Starting worker heartbeat ticker...")
go ticker.Go(ctx)
return nil
}
func (q *Queuer) pollJobTicker(ctx context.Context) error {
ticker, err := core.NewTicker(
q.JobPollInterval,
func() {
q.log.Info("Polling jobs...")
err := q.runJobInitial()
if err != nil {
q.log.Error("Error running job", slog.String("error", err.Error()))
}
},
)
if err != nil {
return helper.NewError("creating ticker", err)
}
q.log.Info("Starting job poll ticker...")
go ticker.Go(ctx)
return nil
}
func (q *Queuer) pollMasterTicker(ctx context.Context, masterSettings *model.MasterSettings) error {
ctxInner, cancel := context.WithCancel(ctx)
ticker, err := core.NewTicker(
masterSettings.MasterPollInterval,
func() {
q.log.Info("Polling master...")
q.workerMu.RLock()
worker := q.worker
workerRID := q.worker.RID
q.workerMu.RUnlock()
master, err := q.dbMaster.UpdateMaster(worker, masterSettings)
if err != nil {
q.log.Error("Error updating master", slog.String("error", err.Error()))
}
if master != nil {
q.log.Debug("New master", slog.String("worker_id", workerRID.String()))
err := q.masterTicker(ctx, master, masterSettings)
if err != nil {
q.log.Error("Error starting master ticker", slog.String("error", err.Error()))
} else {
cancel()
}
}
},
)
if err != nil {
return helper.NewError("creating ticker", err)
}
q.log.Info("Starting master poll ticker...")
go ticker.Go(ctxInner)
return nil
}