From ae3e595b51d70e23ed17c405eef8440ef9f46ebf Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Mon, 18 Oct 2021 14:35:19 +0700 Subject: [PATCH] update --- client.go | 20 ---------- instance.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++ scheduler.go | 36 ----------------- worker.go | 62 ----------------------------- 4 files changed, 107 insertions(+), 118 deletions(-) delete mode 100644 client.go create mode 100644 instance.go delete mode 100644 scheduler.go delete mode 100644 worker.go diff --git a/client.go b/client.go deleted file mode 100644 index bc108b1..0000000 --- a/client.go +++ /dev/null @@ -1,20 +0,0 @@ -package queue - -import "github.com/hibiken/asynq" - -// NewClient ... -func NewClient(cfg Config) *asynq.Client { - // Init redis connection - redisConn := asynq.RedisClientOpt{ - Addr: cfg.Redis.URL, - Password: cfg.Redis.Password, - DB: 0, - } - - // Init client - if client := asynq.NewClient(redisConn); client == nil { - panic("error when initializing queue CLIENT") - } else { - return client - } -} diff --git a/instance.go b/instance.go new file mode 100644 index 0000000..c82af94 --- /dev/null +++ b/instance.go @@ -0,0 +1,107 @@ +package queue + +import ( + "fmt" + "time" + + "github.com/hibiken/asynq" +) + +// Instance ... +type Instance struct { + Client *asynq.Client + Server *asynq.ServeMux + Scheduler *asynq.Scheduler +} + +// NewInstance ... +func NewInstance(cfg Config) Instance { + // Init redis connection + redisConn := asynq.RedisClientOpt{ + Addr: cfg.Redis.URL, + Password: cfg.Redis.Password, + DB: 0, + } + + // Init instance + instance := Instance{} + instance.Server = initServer(redisConn, cfg) + instance.Scheduler = initScheduler(redisConn) + instance.Client = initClient(redisConn) + + // Return instance + return instance +} + +func initServer(redisConn asynq.RedisClientOpt, cfg Config) *asynq.ServeMux { + // Set default for concurrency + if cfg.Concurrency == 0 { + cfg.Concurrency = 100 + } + + // Set default for priority + if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 { + cfg.Priority.Critical = 6 + cfg.Priority.Default = 3 + cfg.Priority.Low = 1 + cfg.Priority.StrictMode = false + } + + // Init server + server := asynq.NewServer(redisConn, asynq.Config{ + Concurrency: cfg.Concurrency, + Queues: map[string]int{ + priorityCritical: cfg.Priority.Critical, + priorityDefault: cfg.Priority.Default, + priorityLow: cfg.Priority.Low, + }, + StrictPriority: cfg.Priority.StrictMode, + + // TODO: + // This is default option, retry after 10s, will add to config later + RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration { + return 10 * time.Second + }, + }) + + // Init mux server + mux := asynq.NewServeMux() + + // Run server + go func() { + if err := server.Run(mux); err != nil { + msg := fmt.Sprintf("error when initializing queue SERVER: %s", err.Error()) + panic(msg) + } + }() + + return mux +} + +func initScheduler(redisConn asynq.RedisClientOpt) *asynq.Scheduler { + // Always run at HCM timezone + l, _ := time.LoadLocation("Asia/Ho_Chi_Minh") + + // Init scheduler + scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{ + Location: l, + }) + + // Run scheduler + go func() { + if err := scheduler.Run(); err != nil { + msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error()) + panic(msg) + } + }() + + return scheduler +} + +func initClient(redisConn asynq.RedisClientOpt) *asynq.Client { + client := asynq.NewClient(redisConn) + if client == nil { + panic("error when initializing queue CLIENT") + } + return client +} diff --git a/scheduler.go b/scheduler.go deleted file mode 100644 index 396ba52..0000000 --- a/scheduler.go +++ /dev/null @@ -1,36 +0,0 @@ -package queue - -import ( - "fmt" - "time" - - "github.com/hibiken/asynq" -) - -// NewScheduler ... -func NewScheduler(cfg Config) *asynq.Scheduler { - // Init redis connection - redisConn := asynq.RedisClientOpt{ - Addr: cfg.Redis.URL, - Password: cfg.Redis.Password, - DB: 0, - } - - // Always run at HCM timezone - l, _ := time.LoadLocation("Asia/Ho_Chi_Minh") - // Init scheduler - scheduler := asynq.NewScheduler(redisConn, &asynq.SchedulerOpts{ - Location: l, - }) - - // Run scheduler - go func() { - if err := scheduler.Run(); err != nil { - msg := fmt.Sprintf("error when initializing queue SCHEDULER: %s", err.Error()) - panic(msg) - } - }() - - // Return - return scheduler -} diff --git a/worker.go b/worker.go deleted file mode 100644 index 39d7092..0000000 --- a/worker.go +++ /dev/null @@ -1,62 +0,0 @@ -package queue - -import ( - "fmt" - "time" - - "github.com/hibiken/asynq" -) - -// NewWorker ... -func NewWorker(cfg Config) *asynq.ServeMux { - // Init redis connection - redisConn := asynq.RedisClientOpt{ - Addr: cfg.Redis.URL, - Password: cfg.Redis.Password, - DB: 0, - } - - // Set default for concurrency - if cfg.Concurrency == 0 { - cfg.Concurrency = 100 - } - - // Set default for priority - if cfg.Priority.Critical == 0 || cfg.Priority.Default == 0 || cfg.Priority.Low == 0 { - cfg.Priority.Critical = 6 - cfg.Priority.Default = 3 - cfg.Priority.Low = 1 - cfg.Priority.StrictMode = false - } - - // Init worker - worker := asynq.NewServer(redisConn, asynq.Config{ - Concurrency: cfg.Concurrency, - Queues: map[string]int{ - priorityCritical: cfg.Priority.Critical, - priorityDefault: cfg.Priority.Default, - priorityLow: cfg.Priority.Low, - }, - StrictPriority: cfg.Priority.StrictMode, - - // TODO: - // This is default option, retry after 10s, will add to config later - RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration { - return 10 * time.Second - }, - }) - - // Init mux server - mux := asynq.NewServeMux() - - // Run server - go func() { - if err := worker.Run(mux); err != nil { - msg := fmt.Sprintf("error when initializing queue WORKER: %s", err.Error()) - panic(msg) - } - }() - - // Return - return mux -}