This commit is contained in:
Nam Huynh 2021-10-18 14:35:19 +07:00
parent 98802f49fa
commit ae3e595b51
4 changed files with 107 additions and 118 deletions

View File

@ -1,20 +0,0 @@
package queue
import "github.com/hibiken/asynq"
// NewClient ...
func NewClient(cfg Config) *asynq.Client {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Init client
if client := asynq.NewClient(redisConn); client == nil {
panic("error when initializing queue CLIENT")
} else {
return client
}
}

107
instance.go Normal file
View File

@ -0,0 +1,107 @@
package queue
import (
"fmt"
"time"
"github.com/hibiken/asynq"
)
// Instance ...
type Instance struct {
Client *asynq.Client
Server *asynq.ServeMux
Scheduler *asynq.Scheduler
}
// NewInstance ...
func NewInstance(cfg Config) Instance {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Init instance
instance := Instance{}
instance.Server = initServer(redisConn, cfg)
instance.Scheduler = initScheduler(redisConn)
instance.Client = initClient(redisConn)
// Return instance
return instance
}
func initServer(redisConn asynq.RedisClientOpt, cfg Config) *asynq.ServeMux {
// Set default for concurrency
if cfg.Concurrency == 0 {
cfg.Concurrency = 100
}
// Set default for priority
if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 {
cfg.Priority.Critical = 6
cfg.Priority.Default = 3
cfg.Priority.Low = 1
cfg.Priority.StrictMode = false
}
// Init server
server := asynq.NewServer(redisConn, asynq.Config{
Concurrency: cfg.Concurrency,
Queues: map[string]int{
priorityCritical: cfg.Priority.Critical,
priorityDefault: cfg.Priority.Default,
priorityLow: cfg.Priority.Low,
},
StrictPriority: cfg.Priority.StrictMode,
// TODO:
// This is default option, retry after 10s, will add to config later
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
return 10 * time.Second
},
})
// Init mux server
mux := asynq.NewServeMux()
// Run server
go func() {
if err := server.Run(mux); err != nil {
msg := fmt.Sprintf("error when initializing queue SERVER: %s", err.Error())
panic(msg)
}
}()
return mux
}
func initScheduler(redisConn asynq.RedisClientOpt) *asynq.Scheduler {
// Always run at HCM timezone
l, _ := time.LoadLocation("Asia/Ho_Chi_Minh")
// Init scheduler
scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{
Location: l,
})
// Run scheduler
go func() {
if err := scheduler.Run(); err != nil {
msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error())
panic(msg)
}
}()
return scheduler
}
func initClient(redisConn asynq.RedisClientOpt) *asynq.Client {
client := asynq.NewClient(redisConn)
if client == nil {
panic("error when initializing queue CLIENT")
}
return client
}

View File

@ -1,36 +0,0 @@
package queue
import (
"fmt"
"time"
"github.com/hibiken/asynq"
)
// NewScheduler ...
func NewScheduler(cfg Config) *asynq.Scheduler {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Always run at HCM timezone
l, _ := time.LoadLocation("Asia/Ho_Chi_Minh")
// Init scheduler
scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{
Location: l,
})
// Run scheduler
go func() {
if err := scheduler.Run(); err != nil {
msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error())
panic(msg)
}
}()
// Return
return scheduler
}

View File

@ -1,62 +0,0 @@
package queue
import (
"fmt"
"time"
"github.com/hibiken/asynq"
)
// NewWorker ...
func NewWorker(cfg Config) *asynq.ServeMux {
// Init redis connection
redisConn := asynq.RedisClientOpt{
Addr: cfg.Redis.URL,
Password: cfg.Redis.Password,
DB: 0,
}
// Set default for concurrency
if cfg.Concurrency == 0 {
cfg.Concurrency = 100
}
// Set default for priority
if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 {
cfg.Priority.Critical = 6
cfg.Priority.Default = 3
cfg.Priority.Low = 1
cfg.Priority.StrictMode = false
}
// Init worker
worker := asynq.NewServer(redisConn, asynq.Config{
Concurrency: cfg.Concurrency,
Queues: map[string]int{
priorityCritical: cfg.Priority.Critical,
priorityDefault: cfg.Priority.Default,
priorityLow: cfg.Priority.Low,
},
StrictPriority: cfg.Priority.StrictMode,
// TODO:
// This is default option, retry after 10s, will add to config later
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
return 10 * time.Second
},
})
// Init mux server
mux := asynq.NewServeMux()
// Run server
go func() {
if err := worker.Run(mux); err != nil {
msg := fmt.Sprintf("error when initializing queue WORKER: %s", err.Error())
panic(msg)
}
}()
// Return
return mux
}