queue/instance.go

129 lines
2.8 KiB
Go

package queue
import (
"fmt"
"time"
"github.com/hibiken/asynq"
)
// Instance ...
type Instance struct {
Client *asynq.Client
Server *asynq.ServeMux
Scheduler *asynq.Scheduler
Inspector *asynq.Inspector
Config Config
}
var instance Instance
// NewInstance ...
func NewInstance(cfg Config) Instance {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Init instance
instance.Server = initServer(redisConn, cfg)
instance.Scheduler = initScheduler(redisConn)
instance.Client = initClient(redisConn)
instance.Inspector = initInspector(redisConn)
instance.Config = cfg
// Return instance
return instance
}
func initServer(redisConn asynq.RedisClientOpt, cfg Config) *asynq.ServeMux {
// Set default for concurrency
if cfg.Concurrency == 0 {
cfg.Concurrency = 100
}
// Set default for priority
if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 {
cfg.Priority.Critical = 6
cfg.Priority.Default = 3
cfg.Priority.Low = 1
cfg.Priority.StrictMode = false
}
retryDelayFunc := cfg.RetryDelayFunc
if retryDelayFunc == nil {
// Default delay in 10s
retryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration {
return 10 * time.Second
}
}
// Init server
server := asynq.NewServer(redisConn, asynq.Config{
Concurrency: cfg.Concurrency,
Queues: map[string]int{
cfg.QueuePrefix + PriorityCritical: cfg.Priority.Critical,
cfg.QueuePrefix + PriorityDefault: cfg.Priority.Default,
cfg.QueuePrefix + PriorityLow: cfg.Priority.Low,
},
StrictPriority: cfg.Priority.StrictMode,
RetryDelayFunc: retryDelayFunc,
})
// Init mux server
mux := asynq.NewServeMux()
if len(cfg.ServerMiddlewares) > 0 {
mux.Use(cfg.ServerMiddlewares...)
}
// Run server
go func() {
if err := server.Run(mux); err != nil {
msg := fmt.Sprintf("error when initializing queue SERVER: %s", err.Error())
panic(msg)
}
}()
return mux
}
func initScheduler(redisConn asynq.RedisClientOpt) *asynq.Scheduler {
// Always run at HCM timezone
l, _ := time.LoadLocation("Asia/Ho_Chi_Minh")
// Init scheduler
scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{
Location: l,
})
// Run scheduler
go func() {
if err := scheduler.Run(); err != nil {
msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error())
panic(msg)
}
}()
return scheduler
}
func initClient(redisConn asynq.RedisClientOpt) *asynq.Client {
client := asynq.NewClient(redisConn)
if client == nil {
panic("error when initializing queue CLIENT")
}
return client
}
func initInspector(redisConn asynq.RedisClientOpt) *asynq.Inspector {
return asynq.NewInspector(redisConn)
}
// GetInstance ...
func GetInstance() Instance {
return instance
}