queue/worker.go

63 lines
1.3 KiB
Go
Raw Normal View History

2021-10-18 07:16:42 +00:00
package queue
import (
"fmt"
2021-10-18 07:20:20 +00:00
"time"
2021-10-18 07:16:42 +00:00
"github.com/hibiken/asynq"
)
// NewWorker ...
func NewWorker(cfg Config) *asynq.ServeMux {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Set default for concurrency
if cfg.Concurrency == 0 {
cfg.Concurrency = 100
}
// Set default for priority
if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 {
cfg.Priority.Critical = 6
cfg.Priority.Default = 3
cfg.Priority.Low = 1
cfg.Priority.StrictMode = false
}
// Init worker
worker := asynq.NewServer(redisConn, asynq.Config{
Concurrency: cfg.Concurrency,
Queues: map[string]int{
priorityCritical: cfg.Priority.Critical,
priorityDefault: cfg.Priority.Default,
priorityLow: cfg.Priority.Low,
},
StrictPriority: cfg.Priority.StrictMode,
2021-10-18 07:20:20 +00:00
// TODO:
// This is default option, retry after 10s, will add to config later
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
return 10 * time.Second
},
2021-10-18 07:16:42 +00:00
})
// Init mux server
mux := asynq.NewServeMux()
// Run server
go func() {
if err := worker.Run(mux); err != nil {
msg := fmt.Sprintf("error when initializing queue WORKER: %s", err.Error())
panic(msg)
}
}()
// Return
return mux
}