diff --git a/constants.go b/constants.go index 81055aa..f997694 100644 --- a/constants.go +++ b/constants.go @@ -1,13 +1,23 @@ package elasticsearch const ( - SubjectSyncData = "elasticsearch/sync_data" - SubjectSearch = "elasticsearch/search" - SubjectUpdateDocument = "elasticsearch/update_document" - SubjectCreateIndex = "elasticsearch/create_index" - SubjectDeleteDocument = "elasticsearch/delete_document" + SubjectRequestProductUpsert = "elasticsearch/selly.request.product.upsert" + SubjectQueueProductUpsert = "elasticsearch/selly.queue.product.upsert" + SubjectPullProductUpsert = "elasticsearch/selly.pull.product.upsert" + SubjectRequestProductSearch = "elasticsearch/selly.request.product.search" + + SubjectRequestOrderUpsert = "elasticsearch/selly.request.order.upsert" + SubjectQueueOrderUpsert = "elasticsearch/selly.queue.order.upsert" + SubjectPullOrderUpsert = "elasticsearch/selly.pull.order.upsert" + SubjectRequestOrderSearch = "elasticsearch/selly.request.order.search" + + SubjectRequestUserUpsert = "elasticsearch/selly.request.order.upsert" + SubjectQueueUserUpsert = "elasticsearch/selly.queue.order.upsert" + SubjectPushUserUpsert = "elasticsearch/selly.push.order.upsert" + SubjectPullUserUpsert = "elasticsearch/selly.pull.order.upsert" + SubjectRequestUserSearch = "elasticsearch/selly.request.user.search" ) const ( - JetStreamSearchService = "JetStreamSearchService" + JetStreamSearchService = "Service_Search" ) diff --git a/elasticsearch.go b/elasticsearch.go index 1c3d6dc..c9e73d2 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -13,8 +13,16 @@ type Client struct { Config Config natsServer natsio.Server natsJetStream natsio.JetStream + Queue Queue + Pull Pull + Request Request + Push Push } +var ( + client *Client +) + // NewClient // Init client elasticsearch func NewClient(config Config) (*Client, error) { @@ -28,66 +36,34 @@ func NewClient(config Config) (*Client, error) { return nil, fmt.Errorf("nats connect failed: %v", err) } - c := &Client{ + client = &Client{ Config: config, natsServer: natsio.GetServer(), natsJetStream: natsio.GetJetStream(), + Queue: Queue{}, + Pull: Pull{}, + Request: Request{}, + Push: Push{}, } - return c, nil + return client, nil } -// SyncData -// Sync data to services ES -func (c *Client) SyncData(data SyncData) (bool, error) { +func GetClient() *Client { + return client +} + +// RequestNats +// publish message to nats and waiting response +func (c *Client) RequestNats(subject string, data []byte) (*Response, error) { var ( req = RequestBody{ ApiKey: c.Config.ApiKey, - Body: toBytes(data), + Body: data, } res *Response ) - msg, err := c.natsServer.Request(SubjectSyncData, toBytes(req)) - if err != nil { - return false, err - } - if err = json.Unmarshal(msg.Data, &res); err != nil { - return false, err - } - if res.Message != "" { - return false, errors.New(res.Message) - } - return res.Success, nil -} - -// SyncDataWithJetStream -// Sync data to services ES with JetStream -func (c *Client) SyncDataWithJetStream(data SyncData) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(data), - } - ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectSyncData, toBytes(req)) - if err != nil { - return false, err - } - - return true, nil -} - -// Search -// Request search to service es -func (c *Client) Search(query ESQuery) (*Response, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(query), - } - res *Response - ) - msg, err := c.natsServer.Request(SubjectSearch, toBytes(req)) + msg, err := c.natsServer.Request(subject, toBytes(req)) if err != nil { return nil, err } @@ -100,61 +76,23 @@ func (c *Client) Search(query ESQuery) (*Response, error) { return res, nil } -// UpdateDocument -// Insert or update document to ES -func (c *Client) UpdateDocument(query UpdateDataPayload) (bool, error) { +// PublishWithJetStream +// Sync data to services ES with JetStream +func (c *Client) PublishWithJetStream(streamName, subject string, data []byte) (bool, error) { var ( req = RequestBody{ ApiKey: c.Config.ApiKey, - Body: toBytes(query), + Body: data, } ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectUpdateDocument, toBytes(req)) + err := c.natsJetStream.Publish(streamName, subject, toBytes(req)) if err != nil { return false, err } - return true, nil -} -// DeleteDocument -// Delete document to ES -func (c *Client) DeleteDocument(payload DeleteDataPayload) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(payload), - } - ) - err := c.natsJetStream.Publish(JetStreamSearchService, SubjectUpdateDocument, toBytes(req)) - if err != nil { - return false, err - } return true, nil } -// CreateIndex -// Create index to ES -func (c *Client) CreateIndex(name string) (bool, error) { - var ( - req = RequestBody{ - ApiKey: c.Config.ApiKey, - Body: toBytes(name), - } - res *Response - ) - msg, err := c.natsServer.Request(SubjectCreateIndex, toBytes(req)) - if err != nil { - return false, err - } - if err = json.Unmarshal(msg.Data, &res); err != nil { - return false, err - } - if res.Message != "" { - return false, errors.New(res.Message) - } - return res.Success, nil -} - func toBytes(data interface{}) []byte { b, _ := json.Marshal(data) return b diff --git a/go.mod b/go.mod index 555267f..f612c46 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,13 @@ module github.com/Selly-Modules/elasticsearch go 1.17 -require github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92 +require github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7 require ( github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/nats-io/nats.go v1.13.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/thoas/go-funk v0.9.1 // indirect - golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect + github.com/thoas/go-funk v0.9.2 // indirect + golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect ) diff --git a/go.sum b/go.sum index beec991..7580e6f 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92 h1:tZMu1uKbo2l5G7aYRkCtyjQl4Gog4X98mYT/QZC94+8= github.com/Selly-Modules/natsio v0.0.0-20211202032952-04a8b182fb92/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ= +github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7 h1:t4kLJt+Q4U4m79Ms9S6zjmIfTXSyrzxzVwLU6nGrQOo= +github.com/Selly-Modules/natsio v0.0.0-20220318042849-ee34e47598b7/go.mod h1:NG55g9ip18nvN5tfP6PcSEKec10/lOeIOZC8HqBVNlQ= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -43,22 +45,29 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk= +github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pull.go b/pull.go new file mode 100644 index 0000000..0d3a1b9 --- /dev/null +++ b/pull.go @@ -0,0 +1,20 @@ +package elasticsearch + +// Pull ... +type Pull struct { +} + +// ProductUpsert ... +func (r Pull) ProductUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectPullProductUpsert, toBytes(query)) +} + +// UserUpsert ... +func (r Pull) UserUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectPullUserUpsert, toBytes(query)) +} + +// OrderUpsert ... +func (r Pull) OrderUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectPullOrderUpsert, toBytes(query)) +} diff --git a/push.go b/push.go new file mode 100644 index 0000000..7ebf746 --- /dev/null +++ b/push.go @@ -0,0 +1,5 @@ +package elasticsearch + +// Push ... +type Push struct { +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..0c3fbbd --- /dev/null +++ b/queue.go @@ -0,0 +1,20 @@ +package elasticsearch + +// Queue ... +type Queue struct { +} + +// ProductUpsert ... +func (r Queue) ProductUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectQueueProductUpsert, toBytes(query)) +} + +// UserUpsert ... +func (r Queue) UserUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectQueueUserUpsert, toBytes(query)) +} + +// OrderUpsert ... +func (r Queue) OrderUpsert(query UpdateDataPayload) (bool, error) { + return GetClient().PublishWithJetStream(JetStreamSearchService, SubjectQueueOrderUpsert, toBytes(query)) +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..597969f --- /dev/null +++ b/request.go @@ -0,0 +1,35 @@ +package elasticsearch + +// Request ... +type Request struct { +} + +// ProductSearch ... +func (r Request) ProductSearch(query ESQuery) (*Response, error) { + return GetClient().RequestNats(SubjectRequestProductSearch, toBytes(query)) +} + +// ProductUpsert ... +func (r Request) ProductUpsert(query UpdateDataPayload) (*Response, error) { + return GetClient().RequestNats(SubjectRequestProductUpsert, toBytes(query)) +} + +// UserSearch ... +func (r Request) UserSearch(query ESQuery) (*Response, error) { + return GetClient().RequestNats(SubjectRequestUserSearch, toBytes(query)) +} + +// UserUpsert ... +func (r Request) UserUpsert(query UpdateDataPayload) (*Response, error) { + return GetClient().RequestNats(SubjectRequestUserUpsert, toBytes(query)) +} + +// OrderSearch ... +func (r Request) OrderSearch(query ESQuery) (*Response, error) { + return GetClient().RequestNats(SubjectRequestOrderSearch, toBytes(query)) +} + +// OrderUpsert ... +func (r Request) OrderUpsert(query UpdateDataPayload) (*Response, error) { + return GetClient().RequestNats(SubjectRequestOrderUpsert, toBytes(query)) +}