From 04a8b182fb92104238b6131f921ed13f41b557a9 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Thu, 2 Dec 2021 10:29:52 +0700 Subject: [PATCH] refactor --- jetstream_pubsub.go | 32 +++++++++++++++++++++++ jetstream_queue.go | 20 +++++++++++++++ stream.go => jetstream_stream.go | 28 ++++++++++---------- natsio.go | 30 ++++++++++++++-------- pubsub.go | 44 -------------------------------- queue.go | 20 --------------- request.go | 15 ----------- server_reqres.go | 36 ++++++++++++++++++++++++++ 8 files changed, 122 insertions(+), 103 deletions(-) create mode 100644 jetstream_pubsub.go create mode 100644 jetstream_queue.go rename stream.go => jetstream_stream.go (56%) delete mode 100644 pubsub.go delete mode 100644 queue.go delete mode 100644 request.go create mode 100644 server_reqres.go diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go new file mode 100644 index 0000000..069f551 --- /dev/null +++ b/jetstream_pubsub.go @@ -0,0 +1,32 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// Publish ... +func (js JetStream) Publish(stream, subject string, payload []byte) error { + channel := combineStreamAndSubjectName(stream, subject) + + _, err := js.instance.PublishAsync(channel, payload) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject %s error: %s", channel, err.Error()) + return errors.New(msg) + } + return nil +} + +// Subscribe ... +func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + + sub, err := js.instance.Subscribe(channel, cb) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - subscribe subject %s error: %s", channel, err.Error()) + return nil, errors.New(msg) + } + return sub, nil +} diff --git a/jetstream_queue.go b/jetstream_queue.go new file mode 100644 index 0000000..08ff2f0 --- /dev/null +++ b/jetstream_queue.go @@ -0,0 +1,20 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// QueueSubscribe ... +func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.MsgHandler) error { + channel := combineStreamAndSubjectName(stream, subject) + + _, err := js.instance.QueueSubscribe(channel, queueName, cb) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject %s error: %s", channel, err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/stream.go b/jetstream_stream.go similarity index 56% rename from stream.go rename to jetstream_stream.go index e5ee1df..1f05b8c 100644 --- a/stream.go +++ b/jetstream_stream.go @@ -8,21 +8,21 @@ import ( ) // GetStreamInfo ... -func GetStreamInfo(name string) (*nats.StreamInfo, error) { - return natsJS.StreamInfo(name) +func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) { + return js.instance.StreamInfo(name) } // AddStream add new stream, with default config // Due to subject must have a unique name, subject name will be combined with stream name // E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1 -func AddStream(name string, subjects []string) error { +func (js JetStream) AddStream(name string, subjects []string) error { // Get info about the stream - stream, _ := GetStreamInfo(name) + stream, _ := js.GetStreamInfo(name) // If stream not found, create new if stream == nil { subjectNames := generateSubjectNames(name, subjects) - _, err := natsJS.AddStream(&nats.StreamConfig{ + _, err := js.instance.AddStream(&nats.StreamConfig{ Name: name, Subjects: subjectNames, Retention: nats.WorkQueuePolicy, @@ -32,7 +32,7 @@ func AddStream(name string, subjects []string) error { NoAck: false, }) if err != nil { - msg := fmt.Sprintf("add stream error: %s", err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg) } } @@ -41,20 +41,20 @@ func AddStream(name string, subjects []string) error { } // DeleteStream ... -func DeleteStream(name string) error { - if err := natsJS.DeleteStream(name); err != nil { - msg := fmt.Sprintf("delete stream error: %s", err.Error()) +func (js JetStream) DeleteStream(name string) error { + if err := js.instance.DeleteStream(name); err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error()) return errors.New(msg) } return nil } // AddStreamSubjects ... -func AddStreamSubjects(name string, subjects []string) error { +func (js JetStream) AddStreamSubjects(name string, subjects []string) error { // Get info about the stream - stream, _ := GetStreamInfo(name) + stream, _ := js.GetStreamInfo(name) if stream == nil { - msg := fmt.Sprintf("error when adding stream %s subjects: stream not found", name) + msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", name) return errors.New(msg) } @@ -62,12 +62,12 @@ func AddStreamSubjects(name string, subjects []string) error { subjectNames := generateSubjectNames(name, subjects) newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) - _, err := natsJS.UpdateStream(&nats.StreamConfig{ + _, err := js.instance.UpdateStream(&nats.StreamConfig{ Name: name, Subjects: newSubjects, }) if err != nil { - msg := fmt.Sprintf("add stream error: %s", err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg) } return nil diff --git a/natsio.go b/natsio.go index f3f7845..33bdbb2 100644 --- a/natsio.go +++ b/natsio.go @@ -8,9 +8,19 @@ import ( "github.com/nats-io/nats.go" ) +// Server ... +type Server struct { + instance *nats.Conn +} + +// JetStream ... +type JetStream struct { + instance nats.JetStreamContext +} + var ( - natsClient *nats.Conn - natsJS nats.JetStreamContext + natsServer Server + natsJetStream JetStream ) // Connect ... @@ -42,25 +52,25 @@ func Connect(cfg Config) error { fmt.Println(aurora.Green("*** CONNECTED TO NATS: " + cfg.URL)) // Set client - natsClient = nc + natsServer.instance = nc // Create jet stream context - js, err := natsClient.JetStream(nats.PublishAsyncMaxPending(256)) + js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) return errors.New(msg) } - natsJS = js + natsJetStream.instance = js return nil } -// GetClient ... -func GetClient() *nats.Conn { - return natsClient +// GetServer ... +func GetServer() Server { + return natsServer } // GetJetStream ... -func GetJetStream() nats.JetStreamContext { - return natsJS +func GetJetStream() JetStream { + return natsJetStream } diff --git a/pubsub.go b/pubsub.go deleted file mode 100644 index b6bae11..0000000 --- a/pubsub.go +++ /dev/null @@ -1,44 +0,0 @@ -package natsio - -import ( - "errors" - "fmt" - - "github.com/nats-io/nats.go" -) - -// Publish ... -func Publish(stream, subject string, payload []byte) error { - channel := combineStreamAndSubjectName(stream, subject) - - _, err := natsJS.PublishAsync(channel, payload) - if err != nil { - msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error()) - return errors.New(msg) - } - return nil -} - -// PublishWithResponse ... -func PublishWithResponse(stream, subject string, payload []byte) (*nats.Msg, error) { - channel := combineStreamAndSubjectName(stream, subject) - - resp, err := natsJS.PublishAsync(channel, payload) - if err != nil { - msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error()) - return nil, errors.New(msg) - } - return resp.Msg(), nil -} - -// Subscribe ... -func Subscribe(stream, subject string, cb nats.MsgHandler) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(stream, subject) - - sub, err := natsJS.Subscribe(channel, cb) - if err != nil { - msg := fmt.Sprintf("subscribe subject %s error: %s", channel, err.Error()) - return nil, errors.New(msg) - } - return sub, nil -} diff --git a/queue.go b/queue.go deleted file mode 100644 index 6364efc..0000000 --- a/queue.go +++ /dev/null @@ -1,20 +0,0 @@ -package natsio - -import ( - "errors" - "fmt" - - "github.com/nats-io/nats.go" -) - -// QueueSubscribe ... -func QueueSubscribe(stream, subject, queueName string, cb nats.MsgHandler) error { - channel := combineStreamAndSubjectName(stream, subject) - - _, err := natsJS.QueueSubscribe(channel, queueName, cb) - if err != nil { - msg := fmt.Sprintf("queue subscribe with subject %s error: %s", channel, err.Error()) - return errors.New(msg) - } - return nil -} diff --git a/request.go b/request.go deleted file mode 100644 index 99bd1ff..0000000 --- a/request.go +++ /dev/null @@ -1,15 +0,0 @@ -package natsio - -import ( - "time" - - "github.com/nats-io/nats.go" -) - -// Default timeout 10s -const requestTimeout = 10 * time.Second - -// Request ... -func Request(subject string, payload []byte) (*nats.Msg, error) { - return natsClient.Request(subject, payload, requestTimeout) -} diff --git a/server_reqres.go b/server_reqres.go new file mode 100644 index 0000000..303c954 --- /dev/null +++ b/server_reqres.go @@ -0,0 +1,36 @@ +package natsio + +import ( + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go" +) + +// Default timeout 10s +const requestTimeout = 10 * time.Second + +// Request ... +func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) { + return sv.instance.Request(subject, payload, requestTimeout) +} + +// Reply ... +func (sv Server) Reply(msg *nats.Msg, payload []byte) error { + err := sv.instance.Publish(msg.Reply, payload) + + // Ack message + msg.Ack() + return err +} + +// Subscribe ... +func (sv Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := sv.instance.Subscribe(subject, cb) + if err != nil { + msg := fmt.Sprintf("[NATS SERVER] - subscribe subject %s error: %s", subject, err.Error()) + return nil, errors.New(msg) + } + return sub, nil +}