From 3fe4271f1bbc0b878c345d95d236b420bacd5228 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Mon, 21 Mar 2022 10:19:29 +0700 Subject: [PATCH] fix update stream subjects change retention policy --- jetstream_stream.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/jetstream_stream.go b/jetstream_stream.go index 1f05b8c..e4469ee 100644 --- a/jetstream_stream.go +++ b/jetstream_stream.go @@ -7,6 +7,19 @@ import ( "github.com/nats-io/nats.go" ) +func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig { + cfg := nats.StreamConfig{ + Name: stream, + Subjects: subjects, + Retention: nats.WorkQueuePolicy, + MaxConsumers: -1, + MaxMsgSize: -1, + MaxMsgs: -1, + NoAck: false, + } + return &cfg +} + // GetStreamInfo ... func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) { return js.instance.StreamInfo(name) @@ -22,15 +35,7 @@ func (js JetStream) AddStream(name string, subjects []string) error { // If stream not found, create new if stream == nil { subjectNames := generateSubjectNames(name, subjects) - _, err := js.instance.AddStream(&nats.StreamConfig{ - Name: name, - Subjects: subjectNames, - Retention: nats.WorkQueuePolicy, - MaxConsumers: -1, - MaxMsgSize: -1, - MaxMsgs: -1, - NoAck: false, - }) + _, err := js.instance.AddStream(generateStreamConfig(name, subjectNames)) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg) @@ -62,10 +67,7 @@ func (js JetStream) AddStreamSubjects(name string, subjects []string) error { subjectNames := generateSubjectNames(name, subjects) newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) - _, err := js.instance.UpdateStream(&nats.StreamConfig{ - Name: name, - Subjects: newSubjects, - }) + _, err := js.instance.UpdateStream(generateStreamConfig(name, newSubjects)) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) return errors.New(msg)