From 685d3247884ce319d62c7cda99d8171579e460f3 Mon Sep 17 00:00:00 2001 From: namhq1989 Date: Thu, 15 Dec 2022 17:34:47 +0700 Subject: [PATCH] refactor --- config.go | 7 ++++++ jetstream_consumer.go | 16 +++++++------- jetstream_pubsub.go | 51 +++++++++++++++++++------------------------ jetstream_queue.go | 11 ++++++---- jetstream_stream.go | 38 +++++++++++++++----------------- natsio.go | 4 ++++ server_reqres.go | 32 +++++++++++++++------------ utils.go | 10 --------- 8 files changed, 84 insertions(+), 85 deletions(-) diff --git a/config.go b/config.go index 26ce58e..fe86d20 100644 --- a/config.go +++ b/config.go @@ -24,6 +24,9 @@ type Config struct { // Stream name StreamName string + + // Debug + Debug bool } func (c Config) validate() error { @@ -31,6 +34,10 @@ func (c Config) validate() error { return errors.New("connect URL is required") } + if c.StreamName == "" { + return errors.New("stream name is required") + } + return nil } diff --git a/jetstream_consumer.go b/jetstream_consumer.go index b49fff4..97651f4 100644 --- a/jetstream_consumer.go +++ b/jetstream_consumer.go @@ -8,27 +8,27 @@ import ( ) // GetConsumerInfo ... -func (js JetStream) GetConsumerInfo(name string) (*nats.ConsumerInfo, error) { - return js.instance.ConsumerInfo(js.streamName, name) +func (js JetStream) GetConsumerInfo(consumerName string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(js.streamName, consumerName) } // AddConsumer ... -func (js JetStream) AddConsumer(name, filterSubject string) error { - // Get consumer first, return if existed - consumer, _ := js.GetConsumerInfo(name) +func (js JetStream) AddConsumer(consumerName, filterSubject string) error { + // get consumer first, return if existed + consumer, _ := js.GetConsumerInfo(consumerName) if consumer != nil { return nil } - // Add + // add _, err := js.instance.AddConsumer(js.streamName, &nats.ConsumerConfig{ - Durable: name, + Durable: consumerName, AckPolicy: nats.AckExplicitPolicy, FilterSubject: filterSubject, }) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, js.streamName, err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] add consumer %s to stream #%s error: %s", consumerName, js.streamName, err.Error()) return errors.New(msg) } return nil diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index ba27220..c0c2e9c 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -3,62 +3,57 @@ package natsio import ( "errors" "fmt" - "github.com/nats-io/nats.go" ) // Publish ... func (js JetStream) Publish(subject string, payload []byte) error { - channel := combineStreamAndSubjectName(js.streamName, subject) - - _, err := js.instance.PublishAsync(channel, payload) + _, err := js.instance.PublishAsync(subject, payload) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject #%s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] publish message to subject #%s error: %s", subject, err.Error()) return errors.New(msg) } + + if js.debug { + fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject) + } + return nil } // Subscribe ... func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(js.streamName, subject) - - sub, err := js.instance.Subscribe(channel, cb) + sub, err := js.instance.Subscribe(subject, cb) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - subscribe subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] subscribe subject %s error: %s", subject, err.Error()) return nil, errors.New(msg) } + + if js.debug { + fmt.Printf("[natsio.JetStream] subscribe new message from subject #%s \n", subject) + } + return sub, nil } -// PullSubscribe ... -// -// Example: -// -// js := natsio.GetJetStream() -// -// sub, err := js.PullSubscribe("A_SUBJECT", "A_SUBJECT", "A_CONSUMER") -// -// for { -// messages, err := sub.Fetch(10) -// // process each messages -// } func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(js.streamName, subject) - - // Check if consumer existed + // check if consumer existed con, err := js.GetConsumerInfo(consumer) if con == nil || err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, js.streamName) + msg := fmt.Sprintf("[natsio.JetStream] pull subscribe consumer %s not existed in stream %s", consumer, js.streamName) return nil, errors.New(msg) } - // Pull - sub, err := js.instance.PullSubscribe(channel, consumer) + // pull + sub, err := js.instance.PullSubscribe(subject, consumer) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - consumer #%s error: %s", channel, consumer, err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] pull subscribe subject #%s - consumer #%s error: %s", subject, consumer, err.Error()) return nil, errors.New(msg) } + if js.debug { + fmt.Printf("[natsio.JetStream] pull subscribe new message from subject #%s \n", subject) + } + return sub, nil } diff --git a/jetstream_queue.go b/jetstream_queue.go index 3db5689..abcd581 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -9,12 +9,15 @@ import ( // QueueSubscribe ... func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error { - channel := combineStreamAndSubjectName(js.streamName, subject) - - _, err := js.instance.QueueSubscribe(channel, queueName, cb) + _, err := js.instance.QueueSubscribe(subject, queueName, cb) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject #%s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] queue subscribe with subject #%s error: %s", subject, err.Error()) return errors.New(msg) } + + if js.debug { + fmt.Printf("[natsio.JetStream] queue subscribe new message from subject #%s \n", subject) + } + return nil } diff --git a/jetstream_stream.go b/jetstream_stream.go index 8ee6b67..def0646 100644 --- a/jetstream_stream.go +++ b/jetstream_stream.go @@ -25,52 +25,48 @@ func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) { return js.instance.StreamInfo(js.streamName) } -// AddStream add new stream, with default config -// Due to subject must have a unique name, subject name will be combined with stream name -// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1 -func (js JetStream) AddStream(subjects []string) error { - // Get info about the stream +// addStream add new stream, with default config +func (js JetStream) addStream() { + // get info about the stream stream, _ := js.GetStreamInfo() - // If stream not found, create new + // if stream not found, create new if stream == nil { - subjectNames := generateSubjectNames(js.streamName, subjects) - _, err := js.instance.AddStream(generateStreamConfig(js.streamName, subjectNames)) + _, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{})) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) - return errors.New(msg) + fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error()) } } - - return nil } // DeleteStream ... func (js JetStream) DeleteStream() error { if err := js.instance.DeleteStream(js.streamName); err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error()) return errors.New(msg) } + return nil } -// AddStreamSubjects ... -func (js JetStream) AddStreamSubjects(subjects []string) error { - // Get info about the stream +// AddSubjects ... +func (js JetStream) AddSubjects(subjects []string) error { + // get stream info stream, _ := js.GetStreamInfo() if stream == nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", js.streamName) + msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName) return errors.New(msg) } - // Merge current and new subjects - subjectNames := generateSubjectNames(js.streamName, subjects) - newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) + // merge current and new subjects + newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) + // update _, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects)) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) + msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error()) return errors.New(msg) } + return nil } diff --git a/natsio.go b/natsio.go index 3183d5e..c379e80 100644 --- a/natsio.go +++ b/natsio.go @@ -10,11 +10,13 @@ import ( // Server ... type Server struct { instance *nats.Conn + debug bool } // JetStream ... type JetStream struct { instance nats.JetStreamContext + debug bool streamName string } @@ -53,6 +55,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) { // set client natsServer.instance = nc + natsServer.debug = cfg.Debug // create jet stream context js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) @@ -61,6 +64,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) { return nil, nil, errors.New(msg) } natsJetStream.instance = js + natsJetStream.debug = cfg.Debug natsJetStream.streamName = cfg.StreamName // assign client type diff --git a/server_reqres.go b/server_reqres.go index ff1b975..8cfb871 100644 --- a/server_reqres.go +++ b/server_reqres.go @@ -3,7 +3,6 @@ package natsio import ( "errors" "fmt" - "log" "time" "github.com/nats-io/nats.go" @@ -13,21 +12,24 @@ import ( const requestTimeout = 10 * time.Second // Request ... -func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) { +func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) { timeout := requestTimeout if globalConfig.RequestTimeout > 0 { timeout = globalConfig.RequestTimeout } - msg, err := sv.instance.Request(subject, payload, timeout) + msg, err := s.instance.Request(subject, payload, timeout) if errors.Is(err, nats.ErrNoResponders) { - log.Printf("[NATS SERVER]: request - no responders for subject: %s", subject) + fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject) + } else if s.debug { + fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject) } + return msg, err } // RequestWithBindData ... -func (sv Server) RequestWithBindData(subject string, payload []byte, result interface{}) error { - msg, err := sv.Request(subject, payload) +func (s Server) RequestWithBindData(subject string, payload []byte, result interface{}) error { + msg, err := s.Request(subject, payload) if msg == nil || err != nil { return err } @@ -37,26 +39,28 @@ func (sv Server) RequestWithBindData(subject string, payload []byte, result inte } // Reply ... -func (sv Server) Reply(msg *nats.Msg, payload []byte) error { - return sv.instance.Publish(msg.Reply, payload) +func (s Server) Reply(msg *nats.Msg, payload []byte) error { + return s.instance.Publish(msg.Reply, payload) } // Subscribe ... -func (sv Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { - sub, err := sv.instance.Subscribe(subject, cb) +func (s Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := s.instance.Subscribe(subject, cb) if err != nil { - msg := fmt.Sprintf("[NATS SERVER] - subscribe subject %s error: %s", subject, err.Error()) + msg := fmt.Sprintf("[natsio.Server] subscribe subject %s error: %s", subject, err.Error()) return nil, errors.New(msg) } + return sub, nil } // QueueSubscribe ... -func (sv Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) { - sub, err := sv.instance.QueueSubscribe(subject, queue, cb) +func (s Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) { + sub, err := s.instance.QueueSubscribe(subject, queue, cb) if err != nil { - msg := fmt.Sprintf("[NATS SERVER] - queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error()) + msg := fmt.Sprintf("[natsio.Server] queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error()) return nil, errors.New(msg) } + return sub, nil } diff --git a/utils.go b/utils.go index fc25b06..229ed1e 100644 --- a/utils.go +++ b/utils.go @@ -16,16 +16,6 @@ func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { return result } -// generateSubjectNames ... -func generateSubjectNames(streamName string, subjects []string) []string { - var result = make([]string, 0) - for _, subject := range subjects { - name := combineStreamAndSubjectName(streamName, subject) - result = append(result, name) - } - return result -} - func combineStreamAndSubjectName(stream, subject string) string { return fmt.Sprintf("%s.%s", stream, subject) }