From 66c1ef938754010886ff09d617e4430e03efdb7d Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 25 Mar 2022 14:03:17 +0700 Subject: [PATCH] init-schedule --- appier.go | 2 ++ go.mod | 5 ++-- go.sum | 6 ++++ schedule.go | 58 +++++++++++++++++++++++++++++++++++++ sync_to_service.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++ util.go | 5 ++++ 6 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 schedule.go create mode 100644 sync_to_service.go diff --git a/appier.go b/appier.go index f9820d3..519e141 100644 --- a/appier.go +++ b/appier.go @@ -41,6 +41,8 @@ func NewClient(config Config) (*Client, error) { return nil, fmt.Errorf("redis connect failed: %v", err) } + // Init schedule + initSchedule() return client, nil } diff --git a/go.mod b/go.mod index df359ec..d492967 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,12 @@ go 1.17 require ( github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc - github.com/Selly-Modules/redisdb v1.0.0 + github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c + github.com/robfig/cron/v3 v3.0.1 ) require ( - github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-redis/redis/v8 v8.11.2 // indirect diff --git a/go.sum b/go.sum index 2d382b4..d97cab5 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,12 @@ github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc h1:PyhRe7aEE8 github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ= github.com/Selly-Modules/redisdb v1.0.0 h1:sFz8NSHIR3HdBaYzk+aASqDMtQsD1sZenbKaJ8C7MCQ= github.com/Selly-Modules/redisdb v1.0.0/go.mod h1:J2GWyoHN5b8RfZXUJmGnEFw7Z4UIogUM4Ry76DstktQ= +github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c h1:GP+wjV8yUWN0sWFHNKqsPM+oW8RhyQGhkjgLrgQvvMk= +github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c/go.mod h1:J2GWyoHN5b8RfZXUJmGnEFw7Z4UIogUM4Ry76DstktQ= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -64,6 +68,8 @@ github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= diff --git a/schedule.go b/schedule.go new file mode 100644 index 0000000..5f7e693 --- /dev/null +++ b/schedule.go @@ -0,0 +1,58 @@ +package appier + +import ( + "fmt" + "log" + "time" + + "github.com/robfig/cron/v3" +) + +// Job ... +type Job struct { + Spec string + Name string + Cmd func() +} + +// Scheduler ... +type Scheduler struct { + cron *cron.Cron + jobs []*Job +} + +// New ... +func newSchedule(jobs ...*Job) *Scheduler { + l, _ := time.LoadLocation("Asia/Ho_Chi_Minh") + c := cron.New(cron.WithLocation(l)) + + return &Scheduler{ + cron: c, + jobs: jobs, + } +} + +// startSchedule ... +func (s *Scheduler) startSchedule() { + for _, job := range s.jobs { + if _, err := s.cron.AddFunc(job.Spec, job.Cmd); err != nil { + log.Fatalf("Add job err: %v", err) + } + fmt.Printf("Job %s is started: %s\n", job.Name, job.Spec) + } + s.cron.Start() +} + +// initSchedule .. +func initSchedule() { + jobs := []*Job{ + { + Spec: "*/30 * * * *", + Name: "Start job sync data to service appier", + Cmd: SyncToService{}.syncData, + }, + } + + s := newSchedule(jobs...) + s.startSchedule() +} diff --git a/sync_to_service.go b/sync_to_service.go new file mode 100644 index 0000000..e243281 --- /dev/null +++ b/sync_to_service.go @@ -0,0 +1,71 @@ +package appier + +import ( + "context" + "fmt" + + "github.com/Selly-Modules/redisdb" +) + +// SyncToService ... +type SyncToService struct{} + +const limitGetKeyRedis int64 = 20 + +var listTypeSync = []string{ + RedisSyncProduct, + RedisSyncSupplier, + RedisSyncInventory, + RedisSyncBrand, + RedisSyncCategory, + RedisSyncSubCategory, + RedisSyncProperty, + RedisSyncPropertyValue, +} + +// syncData ... +func (s SyncToService) syncData() { + for _, item := range listTypeSync { + keyProductPattern := getRedisPrefixPattern(item) + handleSyncDataToServiceAppier(keyProductPattern, item) + } + fmt.Println("Sync data to service done!") +} + +// handleSyncDataToServiceAppier ... +func handleSyncDataToServiceAppier(pattern string, typeData string) { + for { + keys, values := redisdb.GetWithPrefixPattern(pattern, limitGetKeyRedis) + if len(keys) == 0 { + return + } + + payload := toBytes(values) + // Convert data + switch typeData { + case RedisSyncProduct: + Pull{}.ProductUpsert(payload) + case RedisSyncSKU: + Pull{}.SKUUpsert(payload) + case RedisSyncCategory: + Pull{}.CategoryUpsert(payload) + case RedisSyncSubCategory: + Pull{}.SubCategoryUpsert(payload) + case RedisSyncInventory: + Pull{}.InventoryUpsert(payload) + case RedisSyncBrand: + Pull{}.BrandUpsert(payload) + case RedisSyncSupplier: + Pull{}.SupplierUpsert(payload) + case RedisSyncProperty: + Pull{}.PropertyUpsert(payload) + case RedisSyncPropertyValue: + Pull{}.PropertyValueUpsert(payload) + } + + // Del keys + for _, key := range keys { + redisdb.DelKey(context.Background(), key) + } + } +} diff --git a/util.go b/util.go index 660a91d..3919375 100644 --- a/util.go +++ b/util.go @@ -15,3 +15,8 @@ func toBytes(data interface{}) []byte { func getRedisKey(prefix string, targetID string) string { return fmt.Sprintf("%s%s_%s", RedisSyncAppierPrefix, prefix, targetID) } + +// getRedisPrefixPattern ... +func getRedisPrefixPattern(prefix string) string { + return fmt.Sprintf("%s%s_*", RedisSyncAppierPrefix, prefix) +}