From 1e8c775bb61a3e9097de6549fc0defbd36c60ce5 Mon Sep 17 00:00:00 2001 From: trunglt251292 Date: Wed, 16 Feb 2022 17:47:57 +0700 Subject: [PATCH 1/3] [Update] Add task timeout --- config.go | 4 ++++ instance.go | 3 +++ task.go | 10 ++++++++++ 3 files changed, 17 insertions(+) diff --git a/config.go b/config.go index 9aa4f88..60ce5fe 100644 --- a/config.go +++ b/config.go @@ -1,5 +1,7 @@ package queue +import "time" + // Config ... type Config struct { // For message queue @@ -10,6 +12,8 @@ type Config struct { // https://github.com/hibiken/asynq/wiki/Queue-Priority Concurrency int Priority ConfigPriority + + TaskTimeout time.Duration } // ConfigRedis ... diff --git a/instance.go b/instance.go index 5462e6b..d8b93ad 100644 --- a/instance.go +++ b/instance.go @@ -12,6 +12,8 @@ type Instance struct { Client *asynq.Client Server *asynq.ServeMux Scheduler *asynq.Scheduler + + Configs Config } var instance Instance @@ -29,6 +31,7 @@ func NewInstance(cfg Config) Instance { instance.Server = initServer(redisConn, cfg) instance.Scheduler = initScheduler(redisConn) instance.Client = initClient(redisConn) + instance.Configs = cfg // Return instance return instance diff --git a/task.go b/task.go index b5b93bd..03c15cc 100644 --- a/task.go +++ b/task.go @@ -2,6 +2,11 @@ package queue import ( "github.com/hibiken/asynq" + "time" +) + +const ( + NoTimeout time.Duration = 0 ) // RunTask ... @@ -22,6 +27,11 @@ func (i Instance) RunTask(typename string, payload []byte, priority string, retr } options = append(options, asynq.MaxRetry(retryTimes)) + // Task timeout + if i.Configs.TaskTimeout != 0 { + options = append(options, asynq.Timeout(i.Configs.TaskTimeout)) + } + // Enqueue task return i.Client.Enqueue(task, options...) } -- 2.34.1 From 7c1ecfa89c3105ead6621620dc3b996de8cc2057 Mon Sep 17 00:00:00 2001 From: trunglt251292 Date: Wed, 16 Feb 2022 17:56:22 +0700 Subject: [PATCH 2/3] [Update] Change name config in struct --- instance.go | 4 ++-- task.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/instance.go b/instance.go index d8b93ad..08104f0 100644 --- a/instance.go +++ b/instance.go @@ -13,7 +13,7 @@ type Instance struct { Server *asynq.ServeMux Scheduler *asynq.Scheduler - Configs Config + Config Config } var instance Instance @@ -31,7 +31,7 @@ func NewInstance(cfg Config) Instance { instance.Server = initServer(redisConn, cfg) instance.Scheduler = initScheduler(redisConn) instance.Client = initClient(redisConn) - instance.Configs = cfg + instance.Config = cfg // Return instance return instance diff --git a/task.go b/task.go index 03c15cc..966bf66 100644 --- a/task.go +++ b/task.go @@ -28,8 +28,8 @@ func (i Instance) RunTask(typename string, payload []byte, priority string, retr options = append(options, asynq.MaxRetry(retryTimes)) // Task timeout - if i.Configs.TaskTimeout != 0 { - options = append(options, asynq.Timeout(i.Configs.TaskTimeout)) + if i.Config.TaskTimeout != 0 { + options = append(options, asynq.Timeout(i.Config.TaskTimeout)) } // Enqueue task -- 2.34.1 From fd4283c531816b9f4b4515998958b65ed49784ae Mon Sep 17 00:00:00 2001 From: trunglt251292 Date: Wed, 16 Feb 2022 17:56:59 +0700 Subject: [PATCH 3/3] [Update] Check valid time duration --- task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task.go b/task.go index 966bf66..541820c 100644 --- a/task.go +++ b/task.go @@ -28,7 +28,7 @@ func (i Instance) RunTask(typename string, payload []byte, priority string, retr options = append(options, asynq.MaxRetry(retryTimes)) // Task timeout - if i.Config.TaskTimeout != 0 { + if i.Config.TaskTimeout > 0 { options = append(options, asynq.Timeout(i.Config.TaskTimeout)) } -- 2.34.1