diff --git a/constants.go b/constants.go index 81055aa..3239e26 100644 --- a/constants.go +++ b/constants.go @@ -1,13 +1,23 @@ package elasticsearch const ( - SubjectSyncData = "elasticsearch/sync_data" - SubjectSearch = "elasticsearch/search" - SubjectUpdateDocument = "elasticsearch/update_document" - SubjectCreateIndex = "elasticsearch/create_index" - SubjectDeleteDocument = "elasticsearch/delete_document" + SubjectRequestProductUpsert = "selly.request.product.upsert" + SubjectPullProductUpsert = "selly.pull.product.upsert" + SubjectRequestProductSearch = "selly.request.product.search" + + SubjectRequestOrderUpsert = "selly.request.order.upsert" + SubjectPullOrderUpsert = "selly.pull.order.upsert" + SubjectRequestOrderSearch = "selly.request.order.search" + + SubjectRequestUserUpsert = "selly.request.user.upsert" + SubjectPullUserUpsert = "selly.pull.user.upsert" + SubjectRequestUserSearch = "selly.request.user.search" + + SubjectRequestKeywordUpsert = "selly.request.keyword.upsert" + SubjectPullKeywordUpsert = "selly.pull.keyword.upsert" + SubjectRequestKeywordSearch = "selly.request.keyword.search" ) const ( - JetStreamSearchService = "JetStreamSearchService" + JetStreamSearchService = "Service_Search" ) diff --git a/elasticsearch.go b/elasticsearch.go index 1c3d6dc..2fcf915 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -13,8 +13,16 @@ type Client struct { Config Config natsServer natsio.Server natsJetStream natsio.JetStream + Queue Queue + Pull Pull + Request Request + Push Push } +var ( + client *Client +) + // NewClient // Init client elasticsearch func NewClient(config Config) (*Client, error) { @@ -28,66 +36,30 @@ func NewClient(config Config) (*Client, error) { return nil, fmt.Errorf("nats connect failed: %v", err) } - c := &Client{ + client = &Client{ Config: config, natsServer: natsio.GetServer(), natsJetStream: natsio.GetJetStream(), + Queue: Queue{}, + Pull: Pull{}, + Request: Request{}, + Push: Push{}, } - return c, nil + return client, nil } -// SyncData -// Sync data to services ES -func (c *Client) SyncData(data SyncData) (bool, error) { +// requestNats +// publish message to nats and waiting response +func requestNats(subject string, data []byte) (*Response, error) { var ( req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(data), + ApiKey: client.Config.ApiKey, + Body: data, } res *Response ) - msg, err := c.natsServer.Request(SubjectSyncData, toBytes(req)) - if err != nil { - return false, err - } - if err = json.Unmarshal(msg.Data, &res); err != nil { - return false, err - } - if res.Message != "" { - return false, errors.New(res.Message) - } - return res.Success, nil -} - -// SyncDataWithJetStream -// Sync data to services ES with JetStream -func (c *Client) SyncDataWithJetStream(data SyncData) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(data), - } - ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectSyncData, toBytes(req)) - if err != nil { - return false, err - } - - return true, nil -} - -// Search -// Request search to service es -func (c *Client) Search(query ESQuery) (*Response, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(query), - } - res *Response - ) - msg, err := c.natsServer.Request(SubjectSearch, toBytes(req)) + msg, err := client.natsServer.Request(subject, toBytes(req)) if err != nil { return nil, err } @@ -100,61 +72,23 @@ func (c *Client) Search(query ESQuery) (*Response, error) { return res, nil } -// UpdateDocument -// Insert or update document to ES -func (c *Client) UpdateDocument(query UpdateDataPayload) (bool, error) { +// publishWithJetStream +// Sync data to services ES with JetStream +func publishWithJetStream(streamName, subject string, data []byte) (bool, error) { var ( req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(query), + ApiKey: client.Config.ApiKey, + Body: data, } ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectUpdateDocument, toBytes(req)) + err := client.natsJetStream.Publish(streamName, subject, toBytes(req)) if err != nil { return false, err } - return true, nil -} -// DeleteDocument -// Delete document to ES -func (c *Client) DeleteDocument(payload DeleteDataPayload) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(payload), - } - ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectUpdateDocument, toBytes(req)) - if err != nil { - return false, err - } return true, nil } -// CreateIndex -// Create index to ES -func (c *Client) CreateIndex(name string) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(name), - } - res *Response - ) - msg, err := c.natsServer.Request(SubjectCreateIndex, toBytes(req)) - if err != nil { - return false, err - } - if err = json.Unmarshal(msg.Data, &res); err != nil { - return false, err - } - if res.Message != "" { - return false, errors.New(res.Message) - } - return res.Success, nil -} - func toBytes(data interface{}) []byte { b, _ := json.Marshal(data) return b diff --git a/go.mod b/go.mod index 555267f..f612c46 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,13 @@ module github.com/Selly-Modules/elasticsearch go 1.17 -require github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92 +require github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7 require ( github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/nats-io/nats.go v1.13.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/thoas/go-funk v0.9.1 // indirect - golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect + github.com/thoas/go-funk v0.9.2 // indirect + golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect ) diff --git a/go.sum b/go.sum index beec991..c180bf0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92 h1:tZMu1uKbo2l5G7aYRkCtyjQl4Gog4X98mYT/QZC94+8= -github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ= +github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7 h1:t4kLJt+Q4U4m79Ms9S6zjmIfTXSyrzxzVwLU6nGrQOo= +github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -41,24 +41,29 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk= +github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pull.go b/pull.go new file mode 100644 index 0000000..9ec9170 --- /dev/null +++ b/pull.go @@ -0,0 +1,25 @@ +package elasticsearch + +// Pull ... +type Pull struct { +} + +// ProductUpsert ... +func (Pull) ProductUpsert(payload Payload) (bool, error) { + return publishWithJetStream(JetStreamSearchService, SubjectPullProductUpsert, toBytes(payload)) +} + +// UserUpsert ... +func (Pull) UserUpsert(payload Payload) (bool, error) { + return publishWithJetStream(JetStreamSearchService, SubjectPullUserUpsert, toBytes(payload)) +} + +// OrderUpsert ... +func (Pull) OrderUpsert(payload Payload) (bool, error) { + return publishWithJetStream(JetStreamSearchService, SubjectPullOrderUpsert, toBytes(payload)) +} + +// KeywordUpsert ... +func (Pull) KeywordUpsert(payload Payload) (bool, error) { + return publishWithJetStream(JetStreamSearchService, SubjectPullKeywordUpsert, toBytes(payload)) +} diff --git a/push.go b/push.go new file mode 100644 index 0000000..7ebf746 --- /dev/null +++ b/push.go @@ -0,0 +1,5 @@ +package elasticsearch + +// Push ... +type Push struct { +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..bc1675a --- /dev/null +++ b/queue.go @@ -0,0 +1,5 @@ +package elasticsearch + +// Queue ... +type Queue struct { +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..743cf1a --- /dev/null +++ b/request.go @@ -0,0 +1,45 @@ +package elasticsearch + +// Request ... +type Request struct { +} + +// ProductSearch ... +func (Request) ProductSearch(query ESQuery) (*Response, error) { + return requestNats(SubjectRequestProductSearch, toBytes(query)) +} + +// ProductUpsert ... +func (Request) ProductUpsert(payload Payload) (*Response, error) { + return requestNats(SubjectRequestProductUpsert, toBytes(payload)) +} + +// UserSearch ... +func (Request) UserSearch(query ESQuery) (*Response, error) { + return requestNats(SubjectRequestUserSearch, toBytes(query)) +} + +// UserUpsert ... +func (Request) UserUpsert(payload Payload) (*Response, error) { + return requestNats(SubjectRequestUserUpsert, toBytes(payload)) +} + +// OrderSearch ... +func (Request) OrderSearch(query ESQuery) (*Response, error) { + return requestNats(SubjectRequestOrderSearch, toBytes(query)) +} + +// OrderUpsert ... +func (Request) OrderUpsert(payload Payload) (*Response, error) { + return requestNats(SubjectRequestOrderUpsert, toBytes(payload)) +} + +// KeywordSearch ... +func (Request) KeywordSearch(query ESQuery) (*Response, error) { + return requestNats(SubjectRequestKeywordSearch, toBytes(query)) +} + +// KeywordUpsert ... +func (Request) KeywordUpsert(payload Payload) (*Response, error) { + return requestNats(SubjectRequestKeywordUpsert, toBytes(payload)) +} diff --git a/struct.go b/struct.go index 602f8a8..c4e5ddc 100644 --- a/struct.go +++ b/struct.go @@ -11,28 +11,21 @@ type RequestBody struct { // Response // response to service es type Response struct { - Success bool `json:"success"` - Data []string `json:"data,omitempty"` - Total int64 `json:"total,omitempty"` - Page int64 `json:"page,omitempty"` - Limit int64 `json:"limit,omitempty"` - Message string `json:"message"` + Success bool `json:"success"` + Data []byte `json:"data,omitempty"` + Total int64 `json:"total,omitempty"` + Page int64 `json:"page,omitempty"` + Limit int64 `json:"limit,omitempty"` + Message string `json:"message"` } -// SyncData -// Payload for sync data to service es -type SyncData struct { +// Payload ... +// payload for sync data to service es +type Payload struct { Index string Data []byte } -// UpdateDataPayload -// Payload for insert or update document -type UpdateDataPayload struct { - Index string - Body []byte -} - // DeleteDataPayload // Payload for delete document type DeleteDataPayload struct { @@ -60,6 +53,8 @@ type ESQuery struct { Type string ServiceDelivery string SourceDelivery string + Brands []string + NoBrand string Banned string ListUser []string ListNotUser []string