From 1e8c775bb61a3e9097de6549fc0defbd36c60ce5 Mon Sep 17 00:00:00 2001 From: trunglt251292 Date: Wed, 16 Feb 2022 17:47:57 +0700 Subject: [PATCH] [Update] Add task timeout --- config.go | 4 ++++ instance.go | 3 +++ task.go | 10 ++++++++++ 3 files changed, 17 insertions(+) diff --git a/config.go b/config.go index 9aa4f88..60ce5fe 100644 --- a/config.go +++ b/config.go @@ -1,5 +1,7 @@ package queue +import "time" + // Config ... type Config struct { // For message queue @@ -10,6 +12,8 @@ type Config struct { // https://github.com/hibiken/asynq/wiki/Queue-Priority Concurrency int Priority ConfigPriority + + TaskTimeout time.Duration } // ConfigRedis ... diff --git a/instance.go b/instance.go index 5462e6b..d8b93ad 100644 --- a/instance.go +++ b/instance.go @@ -12,6 +12,8 @@ type Instance struct { Client *asynq.Client Server *asynq.ServeMux Scheduler *asynq.Scheduler + + Configs Config } var instance Instance @@ -29,6 +31,7 @@ func NewInstance(cfg Config) Instance { instance.Server = initServer(redisConn, cfg) instance.Scheduler = initScheduler(redisConn) instance.Client = initClient(redisConn) + instance.Configs = cfg // Return instance return instance diff --git a/task.go b/task.go index b5b93bd..03c15cc 100644 --- a/task.go +++ b/task.go @@ -2,6 +2,11 @@ package queue import ( "github.com/hibiken/asynq" + "time" +) + +const ( + NoTimeout time.Duration = 0 ) // RunTask ... @@ -22,6 +27,11 @@ func (i Instance) RunTask(typename string, payload []byte, priority string, retr } options = append(options, asynq.MaxRetry(retryTimes)) + // Task timeout + if i.Configs.TaskTimeout != 0 { + options = append(options, asynq.Timeout(i.Configs.TaskTimeout)) + } + // Enqueue task return i.Client.Enqueue(task, options...) }