From b3519790e2a758ad37d006e04c806449e640f0a5 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Sun, 4 Dec 2022 22:10:05 +0700 Subject: [PATCH] assign stream name to config --- config.go | 3 +++ jetstream_consumer.go | 12 ++++++------ jetstream_pubsub.go | 25 ++++++++++++------------- jetstream_queue.go | 4 ++-- jetstream_stream.go | 26 +++++++++++++------------- natsio.go | 5 ++++- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/config.go b/config.go index 898b767..1da9939 100644 --- a/config.go +++ b/config.go @@ -18,6 +18,9 @@ type Config struct { // RequestTimeout RequestTimeout time.Duration + + // Stream name + StreamName string } // TLSConfig ... diff --git a/jetstream_consumer.go b/jetstream_consumer.go index c8bf7df..b49fff4 100644 --- a/jetstream_consumer.go +++ b/jetstream_consumer.go @@ -8,27 +8,27 @@ import ( ) // GetConsumerInfo ... -func (js JetStream) GetConsumerInfo(stream, name string) (*nats.ConsumerInfo, error) { - return js.instance.ConsumerInfo(stream, name) +func (js JetStream) GetConsumerInfo(name string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(js.streamName, name) } // AddConsumer ... -func (js JetStream) AddConsumer(stream, name, filterSubject string) error { +func (js JetStream) AddConsumer(name, filterSubject string) error { // Get consumer first, return if existed - consumer, _ := js.GetConsumerInfo(stream, name) + consumer, _ := js.GetConsumerInfo(name) if consumer != nil { return nil } // Add - _, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ + _, err := js.instance.AddConsumer(js.streamName, &nats.ConsumerConfig{ Durable: name, AckPolicy: nats.AckExplicitPolicy, FilterSubject: filterSubject, }) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, stream, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, js.streamName, err.Error()) return errors.New(msg) } return nil diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index dc4b142..ba27220 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -8,8 +8,8 @@ import ( ) // Publish ... -func (js JetStream) Publish(stream, subject string, payload []byte) error { - channel := combineStreamAndSubjectName(stream, subject) +func (js JetStream) Publish(subject string, payload []byte) error { + channel := combineStreamAndSubjectName(js.streamName, subject) _, err := js.instance.PublishAsync(channel, payload) if err != nil { @@ -20,8 +20,8 @@ func (js JetStream) Publish(stream, subject string, payload []byte) error { } // Subscribe ... -func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(stream, subject) +func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(js.streamName, subject) sub, err := js.instance.Subscribe(channel, cb) if err != nil { @@ -39,18 +39,17 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // // sub, err := js.PullSubscribe("A_SUBJECT", "A_SUBJECT", "A_CONSUMER") // -// for { -// messages, err := sub.Fetch(10) -// // process each messages -// } -// -func (js JetStream) PullSubscribe(stream, subject, consumer string) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(stream, subject) +// for { +// messages, err := sub.Fetch(10) +// // process each messages +// } +func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(js.streamName, subject) // Check if consumer existed - con, err := js.GetConsumerInfo(stream, consumer) + con, err := js.GetConsumerInfo(consumer) if con == nil || err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, stream) + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, js.streamName) return nil, errors.New(msg) } diff --git a/jetstream_queue.go b/jetstream_queue.go index 10db4d7..3db5689 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -8,8 +8,8 @@ import ( ) // QueueSubscribe ... -func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.MsgHandler) error { - channel := combineStreamAndSubjectName(stream, subject) +func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error { + channel := combineStreamAndSubjectName(js.streamName, subject) _, err := js.instance.QueueSubscribe(channel, queueName, cb) if err != nil { diff --git a/jetstream_stream.go b/jetstream_stream.go index e4469ee..8ee6b67 100644 --- a/jetstream_stream.go +++ b/jetstream_stream.go @@ -21,21 +21,21 @@ func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig { } // GetStreamInfo ... -func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) { - return js.instance.StreamInfo(name) +func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) { + return js.instance.StreamInfo(js.streamName) } // AddStream add new stream, with default config // Due to subject must have a unique name, subject name will be combined with stream name // E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1 -func (js JetStream) AddStream(name string, subjects []string) error { +func (js JetStream) AddStream(subjects []string) error { // Get info about the stream - stream, _ := js.GetStreamInfo(name) + stream, _ := js.GetStreamInfo() // If stream not found, create new if stream == nil { - subjectNames := generateSubjectNames(name, subjects) - _, err := js.instance.AddStream(generateStreamConfig(name, subjectNames)) + subjectNames := generateSubjectNames(js.streamName, subjects) + _, err := js.instance.AddStream(generateStreamConfig(js.streamName, subjectNames)) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg) @@ -46,8 +46,8 @@ func (js JetStream) AddStream(name string, subjects []string) error { } // DeleteStream ... -func (js JetStream) DeleteStream(name string) error { - if err := js.instance.DeleteStream(name); err != nil { +func (js JetStream) DeleteStream() error { + if err := js.instance.DeleteStream(js.streamName); err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error()) return errors.New(msg) } @@ -55,19 +55,19 @@ func (js JetStream) DeleteStream(name string) error { } // AddStreamSubjects ... -func (js JetStream) AddStreamSubjects(name string, subjects []string) error { +func (js JetStream) AddStreamSubjects(subjects []string) error { // Get info about the stream - stream, _ := js.GetStreamInfo(name) + stream, _ := js.GetStreamInfo() if stream == nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", name) + msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", js.streamName) return errors.New(msg) } // Merge current and new subjects - subjectNames := generateSubjectNames(name, subjects) + subjectNames := generateSubjectNames(js.streamName, subjects) newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) - _, err := js.instance.UpdateStream(generateStreamConfig(name, newSubjects)) + _, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects)) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg) diff --git a/natsio.go b/natsio.go index 0c2bca7..f2375b1 100644 --- a/natsio.go +++ b/natsio.go @@ -3,6 +3,7 @@ package natsio import ( "errors" "fmt" + "github.com/nats-io/nats.go" ) @@ -14,7 +15,8 @@ type Server struct { // JetStream ... type JetStream struct { - instance nats.JetStreamContext + instance nats.JetStreamContext + streamName string } var ( @@ -61,6 +63,7 @@ func Connect(cfg Config) error { return errors.New(msg) } natsJetStream.instance = js + natsJetStream.streamName = cfg.StreamName return nil }