queue/task.go

83 lines
2.1 KiB
Go
Raw Permalink Normal View History

2021-10-18 07:16:42 +00:00
package queue
import (
2022-02-16 10:47:57 +00:00
"time"
"github.com/hibiken/asynq"
2022-02-16 10:47:57 +00:00
)
const (
NoTimeout time.Duration = 0
2021-10-18 07:16:42 +00:00
)
// RunTask ...
2021-11-30 16:02:50 +00:00
func (i Instance) RunTask(typename string, payload []byte, priority string, retryTimes int) (*asynq.TaskInfo, error) {
2021-10-18 07:16:42 +00:00
// Create task and options
task := asynq.NewTask(typename, payload)
options := make([]asynq.Option, 0)
2021-11-30 16:02:50 +00:00
// Priority
if priority != PriorityCritical && priority != PriorityDefault && priority != PriorityLow {
priority = PriorityDefault
}
priority = i.Config.QueuePrefix + priority
2021-11-30 16:08:19 +00:00
options = append(options, asynq.Queue(priority))
2021-11-30 16:02:50 +00:00
2021-10-18 07:16:42 +00:00
// Retry times
if retryTimes < 0 {
retryTimes = 0
}
options = append(options, asynq.MaxRetry(retryTimes))
2022-02-16 10:47:57 +00:00
// Task timeout
2022-02-16 10:56:59 +00:00
if i.Config.TaskTimeout > 0 {
2022-02-16 10:56:22 +00:00
options = append(options, asynq.Timeout(i.Config.TaskTimeout))
2022-02-16 10:47:57 +00:00
}
2021-10-18 07:16:42 +00:00
// Enqueue task
2021-11-30 15:21:26 +00:00
return i.Client.Enqueue(task, options...)
2021-10-18 07:16:42 +00:00
}
func (i Instance) Enqueue(typename string, payload []byte, priority string, retryTimes int, processIn time.Duration) (*asynq.TaskInfo, error) {
// Create task and options
task := asynq.NewTask(typename, payload)
options := make([]asynq.Option, 0)
// Priority
if priority != PriorityCritical && priority != PriorityDefault && priority != PriorityLow {
priority = PriorityDefault
}
priority = i.Config.QueuePrefix + priority
options = append(options, asynq.Queue(priority))
// Retry times
if retryTimes < 0 {
retryTimes = 0
}
options = append(options, asynq.MaxRetry(retryTimes))
// Task timeout
if i.Config.TaskTimeout > 0 {
options = append(options, asynq.Timeout(i.Config.TaskTimeout))
}
if processIn > 0 {
options = append(options, asynq.ProcessIn(processIn))
}
// Enqueue task
return i.Client.Enqueue(task, options...)
}
2021-10-18 07:16:42 +00:00
// ScheduledTask create new task and run at specific time
// cronSpec follow cron expression
// https://www.freeformatter.com/cron-expression-generator-quartz.html
2021-11-30 15:21:26 +00:00
func (i Instance) ScheduledTask(typename string, payload []byte, cronSpec string) (string, error) {
2021-10-18 07:16:42 +00:00
// Create task and options
task := asynq.NewTask(typename, payload)
// TODO: Support options later
// options := make([]asynq.Option, 0)
2021-11-30 15:21:26 +00:00
return i.Scheduler.Register(cronSpec, task)
2021-10-18 07:16:42 +00:00
}