init-schedule #10
			
				
			
		
		
		
	| 
						 | 
				
			
			@ -41,6 +41,8 @@ func NewClient(config Config) (*Client, error) {
 | 
			
		|||
		return nil, fmt.Errorf("redis connect failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Init schedule
 | 
			
		||||
	initSchedule()
 | 
			
		||||
	return client, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										5
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										5
									
								
								go.mod
								
								
								
								
							| 
						 | 
				
			
			@ -4,11 +4,12 @@ go 1.17
 | 
			
		|||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc
 | 
			
		||||
	github.com/Selly-Modules/redisdb v1.0.0
 | 
			
		||||
	github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c
 | 
			
		||||
	github.com/robfig/cron/v3 v3.0.1
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 | 
			
		||||
	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 | 
			
		||||
	github.com/davecgh/go-spew v1.1.1 // indirect
 | 
			
		||||
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 | 
			
		||||
	github.com/go-redis/redis/v8 v8.11.2 // indirect
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										6
									
								
								go.sum
								
								
								
								
							
							
						
						
									
										6
									
								
								go.sum
								
								
								
								
							| 
						 | 
				
			
			@ -2,8 +2,12 @@ github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc h1:PyhRe7aEE8
 | 
			
		|||
github.com/Selly-Modules/natsio v0.0.0-20220321031929-3fe4271f1bbc/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ=
 | 
			
		||||
github.com/Selly-Modules/redisdb v1.0.0 h1:sFz8NSHIR3HdBaYzk+aASqDMtQsD1sZenbKaJ8C7MCQ=
 | 
			
		||||
github.com/Selly-Modules/redisdb v1.0.0/go.mod h1:J2GWyoHN5b8RfZXUJmGnEFw7Z4UIogUM4Ry76DstktQ=
 | 
			
		||||
github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c h1:GP+wjV8yUWN0sWFHNKqsPM+oW8RhyQGhkjgLrgQvvMk=
 | 
			
		||||
github.com/Selly-Modules/redisdb v1.0.1-0.20220325035325-562d8c6f642c/go.mod h1:J2GWyoHN5b8RfZXUJmGnEFw7Z4UIogUM4Ry76DstktQ=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +68,8 @@ github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
 | 
			
		|||
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
			
		||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
 | 
			
		||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 | 
			
		||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 | 
			
		||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 | 
			
		||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,58 @@
 | 
			
		|||
package appier
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/robfig/cron/v3"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Job ...
 | 
			
		||||
type Job struct {
 | 
			
		||||
	Spec string
 | 
			
		||||
	Name string
 | 
			
		||||
	Cmd  func()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Scheduler ...
 | 
			
		||||
type Scheduler struct {
 | 
			
		||||
	cron *cron.Cron
 | 
			
		||||
	jobs []*Job
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New ...
 | 
			
		||||
func newSchedule(jobs ...*Job) *Scheduler {
 | 
			
		||||
	l, _ := time.LoadLocation("Asia/Ho_Chi_Minh")
 | 
			
		||||
	c := cron.New(cron.WithLocation(l))
 | 
			
		||||
 | 
			
		||||
	return &Scheduler{
 | 
			
		||||
		cron: c,
 | 
			
		||||
		jobs: jobs,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// startSchedule ...
 | 
			
		||||
func (s *Scheduler) startSchedule() {
 | 
			
		||||
	for _, job := range s.jobs {
 | 
			
		||||
		if _, err := s.cron.AddFunc(job.Spec, job.Cmd); err != nil {
 | 
			
		||||
			log.Fatalf("Add job err: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Printf("Job %s is started: %s\n", job.Name, job.Spec)
 | 
			
		||||
	}
 | 
			
		||||
	s.cron.Start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// initSchedule ..
 | 
			
		||||
func initSchedule() {
 | 
			
		||||
	jobs := []*Job{
 | 
			
		||||
		{
 | 
			
		||||
			Spec: "*/30 * * * *",
 | 
			
		||||
			Name: "Start job sync data to service appier",
 | 
			
		||||
			Cmd:  SyncToService{}.syncData,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s := newSchedule(jobs...)
 | 
			
		||||
	s.startSchedule()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,71 @@
 | 
			
		|||
package appier
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"github.com/Selly-Modules/redisdb"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// SyncToService ...
 | 
			
		||||
type SyncToService struct{}
 | 
			
		||||
 | 
			
		||||
const limitGetKeyRedis int64 = 20
 | 
			
		||||
 | 
			
		||||
var listTypeSync = []string{
 | 
			
		||||
	RedisSyncProduct,
 | 
			
		||||
	RedisSyncSupplier,
 | 
			
		||||
	RedisSyncInventory,
 | 
			
		||||
	RedisSyncBrand,
 | 
			
		||||
	RedisSyncCategory,
 | 
			
		||||
	RedisSyncSubCategory,
 | 
			
		||||
	RedisSyncProperty,
 | 
			
		||||
	RedisSyncPropertyValue,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// syncData ...
 | 
			
		||||
func (s SyncToService) syncData() {
 | 
			
		||||
	for _, item := range listTypeSync {
 | 
			
		||||
		keyProductPattern := getRedisPrefixPattern(item)
 | 
			
		||||
		handleSyncDataToServiceAppier(keyProductPattern, item)
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Println("Sync data to service done!")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handleSyncDataToServiceAppier ...
 | 
			
		||||
func handleSyncDataToServiceAppier(pattern string, typeData string) {
 | 
			
		||||
	for {
 | 
			
		||||
		keys, values := redisdb.GetWithPrefixPattern(pattern, limitGetKeyRedis)
 | 
			
		||||
		if len(keys) == 0 {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		payload := toBytes(values)
 | 
			
		||||
		// Convert data
 | 
			
		||||
		switch typeData {
 | 
			
		||||
		case RedisSyncProduct:
 | 
			
		||||
			Pull{}.ProductUpsert(payload)
 | 
			
		||||
		case RedisSyncSKU:
 | 
			
		||||
			Pull{}.SKUUpsert(payload)
 | 
			
		||||
		case RedisSyncCategory:
 | 
			
		||||
			Pull{}.CategoryUpsert(payload)
 | 
			
		||||
		case RedisSyncSubCategory:
 | 
			
		||||
			Pull{}.SubCategoryUpsert(payload)
 | 
			
		||||
		case RedisSyncInventory:
 | 
			
		||||
			Pull{}.InventoryUpsert(payload)
 | 
			
		||||
		case RedisSyncBrand:
 | 
			
		||||
			Pull{}.BrandUpsert(payload)
 | 
			
		||||
		case RedisSyncSupplier:
 | 
			
		||||
			Pull{}.SupplierUpsert(payload)
 | 
			
		||||
		case RedisSyncProperty:
 | 
			
		||||
			Pull{}.PropertyUpsert(payload)
 | 
			
		||||
		case RedisSyncPropertyValue:
 | 
			
		||||
			Pull{}.PropertyValueUpsert(payload)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Del keys
 | 
			
		||||
		for _, key := range keys {
 | 
			
		||||
			redisdb.DelKey(context.Background(), key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										5
									
								
								util.go
								
								
								
								
							
							
						
						
									
										5
									
								
								util.go
								
								
								
								
							| 
						 | 
				
			
			@ -15,3 +15,8 @@ func toBytes(data interface{}) []byte {
 | 
			
		|||
func getRedisKey(prefix string, targetID string) string {
 | 
			
		||||
	return fmt.Sprintf("%s%s_%s", RedisSyncAppierPrefix, prefix, targetID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getRedisPrefixPattern ...
 | 
			
		||||
func getRedisPrefixPattern(prefix string) string {
 | 
			
		||||
	return fmt.Sprintf("%s%s_*", RedisSyncAppierPrefix, prefix)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue