queue/instance.go

129 lines
2.8 KiB
Go
Raw Permalink Normal View History

2021-10-18 07:35:19 +00:00
package queue
import (
"fmt"
"time"
"github.com/hibiken/asynq"
)
// Instance ...
type Instance struct {
Client *asynq.Client
Server *asynq.ServeMux
Scheduler *asynq.Scheduler
2022-11-18 10:04:42 +00:00
Inspector *asynq.Inspector
2022-02-16 10:47:57 +00:00
2022-02-16 10:56:22 +00:00
Config Config
2021-10-18 07:35:19 +00:00
}
2021-11-30 15:21:26 +00:00
var instance Instance
2021-10-18 07:35:19 +00:00
// NewInstance ...
func NewInstance(cfg Config) Instance {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Init instance
instance.Server = initServer(redisConn, cfg)
instance.Scheduler = initScheduler(redisConn)
instance.Client = initClient(redisConn)
2022-11-18 10:04:42 +00:00
instance.Inspector = initInspector(redisConn)
2022-02-16 10:56:22 +00:00
instance.Config = cfg
2021-10-18 07:35:19 +00:00
// Return instance
return instance
}
func initServer(redisConn asynq.RedisClientOpt, cfg Config) *asynq.ServeMux {
// Set default for concurrency
if cfg.Concurrency == 0 {
cfg.Concurrency = 100
}
// Set default for priority
if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 {
cfg.Priority.Critical = 6
cfg.Priority.Default = 3
cfg.Priority.Low = 1
cfg.Priority.StrictMode = false
}
2022-08-10 10:17:34 +00:00
retryDelayFunc := cfg.RetryDelayFunc
if retryDelayFunc == nil {
// Default delay in 10s
retryDelayFunc = func(n int, e error, t *asynq.Task) time.Duration {
return 10 * time.Second
}
}
2021-10-18 07:35:19 +00:00
// Init server
server := asynq.NewServer(redisConn, asynq.Config{
Concurrency: cfg.Concurrency,
Queues: map[string]int{
cfg.QueuePrefix + PriorityCritical: cfg.Priority.Critical,
cfg.QueuePrefix + PriorityDefault: cfg.Priority.Default,
cfg.QueuePrefix + PriorityLow: cfg.Priority.Low,
2021-10-18 07:35:19 +00:00
},
StrictPriority: cfg.Priority.StrictMode,
2022-08-10 10:17:34 +00:00
RetryDelayFunc: retryDelayFunc,
2021-10-18 07:35:19 +00:00
})
// Init mux server
mux := asynq.NewServeMux()
2022-11-18 10:04:42 +00:00
if len(cfg.ServerMiddlewares) > 0 {
mux.Use(cfg.ServerMiddlewares...)
}
2021-10-18 07:35:19 +00:00
// Run server
go func() {
if err := server.Run(mux); err != nil {
msg := fmt.Sprintf("error when initializing queue SERVER: %s", err.Error())
panic(msg)
}
}()
return mux
}
func initScheduler(redisConn asynq.RedisClientOpt) *asynq.Scheduler {
// Always run at HCM timezone
l, _ := time.LoadLocation("Asia/Ho_Chi_Minh")
// Init scheduler
scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{
Location: l,
})
// Run scheduler
go func() {
if err := scheduler.Run(); err != nil {
msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error())
panic(msg)
}
}()
return scheduler
}
func initClient(redisConn asynq.RedisClientOpt) *asynq.Client {
client := asynq.NewClient(redisConn)
if client == nil {
panic("error when initializing queue CLIENT")
}
return client
}
2022-03-21 08:57:31 +00:00
2022-11-18 10:04:42 +00:00
func initInspector(redisConn asynq.RedisClientOpt) *asynq.Inspector {
return asynq.NewInspector(redisConn)
}
2022-03-21 08:57:31 +00:00
// GetInstance ...
func GetInstance() Instance {
2022-08-10 10:17:34 +00:00
return instance
2022-03-21 08:57:31 +00:00
}