fix update stream subjects change retention policy

This commit is contained in:
Nam Huynh 2022-03-21 10:19:29 +07:00 committed by GitHub
parent ee34e47598
commit 3fe4271f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 13 deletions

View File

@ -7,6 +7,19 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
cfg := nats.StreamConfig{
Name: stream,
Subjects: subjects,
Retention: nats.WorkQueuePolicy,
MaxConsumers: -1,
MaxMsgSize: -1,
MaxMsgs: -1,
NoAck: false,
}
return &cfg
}
// GetStreamInfo ... // GetStreamInfo ...
func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) { func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) {
return js.instance.StreamInfo(name) return js.instance.StreamInfo(name)
@ -22,15 +35,7 @@ func (js JetStream) AddStream(name string, subjects []string) error {
// If stream not found, create new // If stream not found, create new
if stream == nil { if stream == nil {
subjectNames := generateSubjectNames(name, subjects) subjectNames := generateSubjectNames(name, subjects)
_, err := js.instance.AddStream(&nats.StreamConfig{ _, err := js.instance.AddStream(generateStreamConfig(name, subjectNames))
Name: name,
Subjects: subjectNames,
Retention: nats.WorkQueuePolicy,
MaxConsumers: -1,
MaxMsgSize: -1,
MaxMsgs: -1,
NoAck: false,
})
if err != nil { if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
return errors.New(msg) return errors.New(msg)
@ -62,10 +67,7 @@ func (js JetStream) AddStreamSubjects(name string, subjects []string) error {
subjectNames := generateSubjectNames(name, subjects) subjectNames := generateSubjectNames(name, subjects)
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
_, err := js.instance.UpdateStream(&nats.StreamConfig{ _, err := js.instance.UpdateStream(generateStreamConfig(name, newSubjects))
Name: name,
Subjects: newSubjects,
})
if err != nil { if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error()) msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
return errors.New(msg) return errors.New(msg)