From c180bd8489fd6a06c09bacca13a371bbd50d2199 Mon Sep 17 00:00:00 2001 From: QuanTT0110 Date: Thu, 27 Apr 2023 11:19:33 +0700 Subject: [PATCH 1/2] update source --- .idea/.gitignore | 8 +++++ .idea/modules.xml | 8 +++++ .idea/natsio.iml | 9 +++++ .idea/vcs.xml | 6 ++++ config.go | 54 +++++++++++++++++++++++++++++ constant.go | 13 +++++++ go.mod | 21 ++++++++++++ go.sum | 46 +++++++++++++++++++++++++ jetstream_consumer.go | 35 +++++++++++++++++++ jetstream_pubsub.go | 59 ++++++++++++++++++++++++++++++++ jetstream_queue.go | 23 +++++++++++++ jetstream_stream.go | 72 +++++++++++++++++++++++++++++++++++++++ model.go | 68 +++++++++++++++++++++++++++++++++++++ natsio.go | 79 +++++++++++++++++++++++++++++++++++++++++++ node.go | 28 +++++++++++++++ server_reqres.go | 74 ++++++++++++++++++++++++++++++++++++++++ stream_utilties.go | 14 ++++++++ utils.go | 59 ++++++++++++++++++++++++++++++++ 18 files changed, 676 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/modules.xml create mode 100644 .idea/natsio.iml create mode 100644 .idea/vcs.xml create mode 100644 config.go create mode 100644 constant.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 jetstream_consumer.go create mode 100644 jetstream_pubsub.go create mode 100644 jetstream_queue.go create mode 100644 jetstream_stream.go create mode 100644 model.go create mode 100644 natsio.go create mode 100644 node.go create mode 100644 server_reqres.go create mode 100644 stream_utilties.go create mode 100644 utils.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..765526d --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/natsio.iml b/.idea/natsio.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/natsio.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..fe86d20 --- /dev/null +++ b/config.go @@ -0,0 +1,54 @@ +package natsio + +import ( + "errors" + "time" +) + +// Config ... +type Config struct { + // Connect url + URL string + + // Auth user + User string + + // Auth password + Password string + + // TLS config + TLS *TLSConfig + + // RequestTimeout + RequestTimeout time.Duration + + // Stream name + StreamName string + + // Debug + Debug bool +} + +func (c Config) validate() error { + if c.URL == "" { + return errors.New("connect URL is required") + } + + if c.StreamName == "" { + return errors.New("stream name is required") + } + + return nil +} + +// TLSConfig ... +type TLSConfig struct { + // Cert file + CertFilePath string + + // Key file + KeyFilePath string + + // Root CA + RootCAFilePath string +} diff --git a/constant.go b/constant.go new file mode 100644 index 0000000..21abcda --- /dev/null +++ b/constant.go @@ -0,0 +1,13 @@ +package natsio + +var AllStreams = struct { + ChangeStream string +}{ + ChangeStream: "change_stream", +} + +var AllServers = struct { + MongoCDC string +}{ + MongoCDC: "mongo_cdc", +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e9b345e --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module natsio + +go 1.18 + +require ( + github.com/json-iterator/go v1.1.12 + github.com/nats-io/nats.go v1.25.0 + github.com/thoas/go-funk v0.9.3 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nats-server/v2 v2.9.16 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.8.0 // indirect + golang.org/x/sys v0.7.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..574af73 --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc= +github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0= +github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= +github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= +github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/jetstream_consumer.go b/jetstream_consumer.go new file mode 100644 index 0000000..97651f4 --- /dev/null +++ b/jetstream_consumer.go @@ -0,0 +1,35 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// GetConsumerInfo ... +func (js JetStream) GetConsumerInfo(consumerName string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(js.streamName, consumerName) +} + +// AddConsumer ... +func (js JetStream) AddConsumer(consumerName, filterSubject string) error { + // get consumer first, return if existed + consumer, _ := js.GetConsumerInfo(consumerName) + if consumer != nil { + return nil + } + + // add + _, err := js.instance.AddConsumer(js.streamName, &nats.ConsumerConfig{ + Durable: consumerName, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: filterSubject, + }) + + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] add consumer %s to stream #%s error: %s", consumerName, js.streamName, err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go new file mode 100644 index 0000000..8cc1848 --- /dev/null +++ b/jetstream_pubsub.go @@ -0,0 +1,59 @@ +package natsio + +import ( + "errors" + "fmt" + "github.com/nats-io/nats.go" +) + +// Publish ... +func (js JetStream) Publish(subject string, payload []byte) error { + _, err := js.instance.PublishAsync(subject, payload) + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] publish message to subject #%s error: %s", subject, err.Error()) + return errors.New(msg) + } + + if js.cfg.Debug { + fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject) + } + + return nil +} + +// Subscribe ... +func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := js.instance.Subscribe(subject, cb) + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] subscribe subject %s error: %s", subject, err.Error()) + return nil, errors.New(msg) + } + + if js.cfg.Debug { + fmt.Printf("[natsio.JetStream] subscribe to subject #%s \n", subject) + } + + return sub, nil +} + +func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, error) { + // check if consumer existed + con, err := js.GetConsumerInfo(consumer) + if con == nil || err != nil { + msg := fmt.Sprintf("[natsio.JetStream] pull subscribe consumer %s not existed in stream %s", consumer, js.streamName) + return nil, errors.New(msg) + } + + // pull + sub, err := js.instance.PullSubscribe(subject, consumer) + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] pull subscribe subject #%s - consumer #%s error: %s", subject, consumer, err.Error()) + return nil, errors.New(msg) + } + + if js.cfg.Debug { + fmt.Printf("[natsio.JetStream] pull subscribe to subject #%s \n", subject) + } + + return sub, nil +} diff --git a/jetstream_queue.go b/jetstream_queue.go new file mode 100644 index 0000000..20941ed --- /dev/null +++ b/jetstream_queue.go @@ -0,0 +1,23 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// QueueSubscribe ... +func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error { + _, err := js.instance.QueueSubscribe(subject, queueName, cb) + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] queue subscribe with subject #%s error: %s", subject, err.Error()) + return errors.New(msg) + } + + if js.cfg.Debug { + fmt.Printf("[natsio.JetStream] queue subscribe to subject #%s \n", subject) + } + + return nil +} diff --git a/jetstream_stream.go b/jetstream_stream.go new file mode 100644 index 0000000..def0646 --- /dev/null +++ b/jetstream_stream.go @@ -0,0 +1,72 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig { + cfg := nats.StreamConfig{ + Name: stream, + Subjects: subjects, + Retention: nats.WorkQueuePolicy, + MaxConsumers: -1, + MaxMsgSize: -1, + MaxMsgs: -1, + NoAck: false, + } + return &cfg +} + +// GetStreamInfo ... +func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) { + return js.instance.StreamInfo(js.streamName) +} + +// addStream add new stream, with default config +func (js JetStream) addStream() { + // get info about the stream + stream, _ := js.GetStreamInfo() + + // if stream not found, create new + if stream == nil { + _, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{})) + if err != nil { + fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error()) + } + } +} + +// DeleteStream ... +func (js JetStream) DeleteStream() error { + if err := js.instance.DeleteStream(js.streamName); err != nil { + msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error()) + return errors.New(msg) + } + + return nil +} + +// AddSubjects ... +func (js JetStream) AddSubjects(subjects []string) error { + // get stream info + stream, _ := js.GetStreamInfo() + if stream == nil { + msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName) + return errors.New(msg) + } + + // merge current and new subjects + newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) + + // update + _, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects)) + if err != nil { + msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error()) + return errors.New(msg) + } + + return nil +} diff --git a/model.go b/model.go new file mode 100644 index 0000000..c249cda --- /dev/null +++ b/model.go @@ -0,0 +1,68 @@ +package natsio + +import ( + "errors" +) + +// +// Common +// + +type IDAndName struct { + ID string `json:"_id"` + Name string `json:"name"` +} + +// +// USER +// + +type User struct { + ID string `json:"_id"` + Name string `json:"name"` + Type string `json:"type"` + Avatar *FilePhoto `json:"avatar"` +} + +func ParseToUser(data interface{}, result *User) (err error) { + b := InterfaceToBytes(data) + if len(b) > 0 { + err = json.Unmarshal(b, result) + } else { + err = errors.New("[natsio.ParseUser] cannot read data") + } + + return +} + +// +// FILE PHOTO +// + +type FilePhoto struct { + ID string `json:"_id"` + Name string `json:"name"` + Dimensions *FileDimensions `json:"dimensions"` +} + +type FileSize struct { + Width int `json:"width"` + Height int `json:"height"` + URL string `json:"url,omitempty"` +} + +type FileDimensions struct { + Small *FileSize `json:"sm"` + Medium *FileSize `json:"md"` +} + +func ParseToPhoto(data interface{}, result *FilePhoto) (err error) { + b := InterfaceToBytes(data) + if len(b) > 0 { + err = json.Unmarshal(b, result) + } else { + err = errors.New("[natsio.ParsePhoto] cannot read data") + } + + return +} diff --git a/natsio.go b/natsio.go new file mode 100644 index 0000000..d7351dd --- /dev/null +++ b/natsio.go @@ -0,0 +1,79 @@ +package natsio + +import ( + "errors" + "fmt" + + jsoniter "github.com/json-iterator/go" + "github.com/nats-io/nats.go" +) + +// Server ... +type Server struct { + instance *nats.Conn + cfg Config +} + +// JetStream ... +type JetStream struct { + instance nats.JetStreamContext + cfg Config + streamName string +} + +var ( + natsServer Server + natsJetStream JetStream + + // builtin json alternative + json = jsoniter.ConfigCompatibleWithStandardLibrary +) + +// Connect ... +func Connect(cfg Config) (*Server, *JetStream, error) { + // validate + if err := cfg.validate(); err != nil { + return nil, nil, err + } + + // connect options + opts := make([]nats.Option, 0) + + // has authentication + if cfg.User != "" { + opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) + } + + // if it has TLS + if cfg.TLS != nil { + opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath)) + opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath)) + } + + nc, err := nats.Connect(cfg.URL, opts...) + if err != nil { + msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error()) + return nil, nil, errors.New(msg) + } + + // set client + natsServer.instance = nc + natsServer.cfg = cfg + + // create jet stream context + js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) + if err != nil { + msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) + return nil, nil, errors.New(msg) + } + natsJetStream.instance = js + natsJetStream.cfg = cfg + natsJetStream.streamName = cfg.StreamName + + // add stream + natsJetStream.addStream() + + fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) + + return &natsServer, &natsJetStream, nil +} diff --git a/node.go b/node.go new file mode 100644 index 0000000..48d2797 --- /dev/null +++ b/node.go @@ -0,0 +1,28 @@ +package natsio + +import ( + "fmt" + + "github.com/nats-io/nats.go" +) + +// Node include all necessary things of a client +type Node struct { + Sv Server + Js JetStream +} + +func (n Node) ServerQueueSubscribe(subjectName string, h nats.MsgHandler) { + queueName := GenerateQueueNameFromSubject(subjectName) + if _, err := n.Sv.QueueSubscribe(subjectName, queueName, h); err != nil { + fmt.Printf("[natsio.Node.ServerQueueSubscribe] error: %s \n", err.Error()) + } + +} + +func (n Node) JetStreamQueueSubscribe(subjectName string, h nats.MsgHandler) { + queueName := GenerateQueueNameFromSubject(subjectName) + if err := n.Js.QueueSubscribe(subjectName, queueName, h); err != nil { + fmt.Printf("[natsio.Node.ServerQueueSubscribe] error: %s \n", err.Error()) + } +} diff --git a/server_reqres.go b/server_reqres.go new file mode 100644 index 0000000..60d6450 --- /dev/null +++ b/server_reqres.go @@ -0,0 +1,74 @@ +package natsio + +import ( + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go" +) + +// Default timeout 10s +const requestTimeout = 10 * time.Second + +// Request ... +func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) { + timeout := requestTimeout + if s.cfg.RequestTimeout > 0 { + timeout = s.cfg.RequestTimeout + } + msg, err := s.instance.Request(subject, payload, timeout) + if errors.Is(err, nats.ErrNoResponders) { + fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject) + } else if s.cfg.Debug { + fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject) + } + + return msg, err +} + +// RequestWithBindData ... +func (s Server) RequestWithBindData(subject string, payload []byte, result interface{}) error { + msg, err := s.Request(subject, payload) + if msg == nil || err != nil { + return err + } + + // map + return BytesToInterface(msg.Data, result) +} + +// Reply ... +func (s Server) Reply(msg *nats.Msg, payload []byte) error { + return s.instance.Publish(msg.Reply, payload) +} + +// Subscribe ... +func (s Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := s.instance.Subscribe(subject, cb) + if err != nil { + msg := fmt.Sprintf("[natsio.Server] subscribe subject %s error: %s", subject, err.Error()) + return nil, errors.New(msg) + } + + if s.cfg.Debug { + fmt.Printf("[natsio.Server] subscribe to subject #%s \n", subject) + } + + return sub, nil +} + +// QueueSubscribe ... +func (s Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := s.instance.QueueSubscribe(subject, queue, cb) + if err != nil { + msg := fmt.Sprintf("[natsio.Server] queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error()) + return nil, errors.New(msg) + } + + if s.cfg.Debug { + fmt.Printf("[natsio.Server] queue subscribe to subject #%s \n", subject) + } + + return sub, nil +} diff --git a/stream_utilties.go b/stream_utilties.go new file mode 100644 index 0000000..0dda510 --- /dev/null +++ b/stream_utilties.go @@ -0,0 +1,14 @@ +package natsio + +type StreamUtilities struct { + Stream string + Server string +} + +func (s StreamUtilities) GenerateReqrepSubject(subject string) string { + return GenerateReqrepSubject(s.Stream, s.Server, subject) +} + +func (s StreamUtilities) GenerateJetStreamSubject(subject string) string { + return GenerateJetStreamSubject(s.Stream, s.Server, subject) +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..01e0316 --- /dev/null +++ b/utils.go @@ -0,0 +1,59 @@ +package natsio + +import ( + "fmt" + "strings" + + "github.com/nats-io/nats.go" + "github.com/thoas/go-funk" +) + +// mergeAndUniqueArrayStrings ... +func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { + var result = make([]string, 0) + result = append(result, arr1...) + result = append(result, arr2...) + result = funk.UniqString(result) + return result +} + +func combineStreamAndSubjectName(stream, subject string) string { + return fmt.Sprintf("%s.%s", stream, subject) +} + +// GenerateJetStreamSubject ... +func GenerateJetStreamSubject(stream, server, subject string) string { + return fmt.Sprintf("%s.jetstream.%s.%s", stream, server, subject) +} + +// GenerateReqrepSubject ... +func GenerateReqrepSubject(stream, server, subject string) string { + return fmt.Sprintf("%s.reqrep.%s.%s", stream, server, subject) +} + +func GenerateQueueNameFromSubject(subject string) string { + return strings.ReplaceAll(subject, ".", "_") +} + +// InterfaceToBytes ... +func InterfaceToBytes(data interface{}) []byte { + b, err := json.Marshal(data) + if err != nil { + fmt.Printf("[natsio.InterfaceToBytes] error: %v with data: %v\n", err, data) + } + return b +} + +func BytesToInterface(b []byte, dest interface{}) error { + err := json.Unmarshal(b, &dest) + if err != nil { + fmt.Printf("[natsio.BytesToInterface] error: %v with data: %s\n", err, string(b)) + } + return err +} + +func MsgRespond(msg *nats.Msg, data interface{}) { + if err := msg.Respond(InterfaceToBytes(data)); err != nil { + fmt.Printf("[natsio.MsgRespond] error when response msg %s", msg.Reply) + } +} -- 2.34.1 From 48ccc330fd182b73271e54766525226b030bccc7 Mon Sep 17 00:00:00 2001 From: QuanTT0110 Date: Thu, 27 Apr 2023 11:57:20 +0700 Subject: [PATCH 2/2] update source --- .gitignore | 17 +++++++++++++++++ .idea/.gitignore | 8 -------- .idea/modules.xml | 8 -------- .idea/natsio.iml | 9 --------- .idea/vcs.xml | 6 ------ go.mod | 2 +- 6 files changed, 18 insertions(+), 32 deletions(-) create mode 100644 .gitignore delete mode 100644 .idea/.gitignore delete mode 100644 .idea/modules.xml delete mode 100644 .idea/natsio.iml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bbfd2cb --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +.idea \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 765526d..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/natsio.iml b/.idea/natsio.iml deleted file mode 100644 index 5e764c4..0000000 --- a/.idea/natsio.iml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/go.mod b/go.mod index e9b345e..e370e29 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module natsio +module git.selly.red/Cashbag-Modules/natsio go 1.18 -- 2.34.1