From 35714c82927f0c79be39b6d204c7dc36f52c3408 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 8 Oct 2021 14:41:58 +0700 Subject: [PATCH] fix rule of stream and subject name --- pubsub.go | 16 ++++++++++------ stream.go | 10 +++++++--- utils.go | 20 +++++++++++++++++++- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/pubsub.go b/pubsub.go index dea03b0..c130250 100644 --- a/pubsub.go +++ b/pubsub.go @@ -9,21 +9,25 @@ import ( ) // Publish ... -func Publish(subject string, data interface{}) error { +func Publish(stream, subject string, data interface{}) error { + channel := combineStreamAndSubjectName(stream, subject) + b, _ := json.Marshal(data) - _, err := natsJS.PublishAsync(subject, b) + _, err := natsJS.PublishAsync(channel, b) if err != nil { - msg := fmt.Sprintf("publish message error: %s", err.Error()) + msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error()) return errors.New(msg) } return nil } // Subscribe ... -func Subscribe(subject string, cb nats.MsgHandler) error { - _, err := natsJS.Subscribe(subject, cb) +func Subscribe(stream, subject string, cb nats.MsgHandler) error { + channel := combineStreamAndSubjectName(stream, subject) + + _, err := natsJS.Subscribe(channel, cb) if err != nil { - msg := fmt.Sprintf("subscribe subject %s error: %s", subject, err.Error()) + msg := fmt.Sprintf("subscribe subject %s error: %s", channel, err.Error()) return errors.New(msg) } return nil diff --git a/stream.go b/stream.go index 9809ca0..8a3b2fb 100644 --- a/stream.go +++ b/stream.go @@ -13,15 +13,18 @@ func GetStreamInfo(name string) (*nats.StreamInfo, error) { } // AddStream add new stream, with default config +// Due to subject must have a unique name, subject name will be combined with stream name +// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1 func AddStream(name string, subjects []string) error { // Get info about the stream stream, _ := GetStreamInfo(name) + // If stream not found, create new if stream == nil { + subjectNames := generateSubjectNames(name, subjects) _, err := natsJS.AddStream(&nats.StreamConfig{ Name: name, - Subjects: subjects, - Storage: nats.FileStorage, + Subjects: subjectNames, }) if err != nil { msg := fmt.Sprintf("add stream error: %s", err.Error()) @@ -51,7 +54,8 @@ func AddStreamSubjects(name string, subjects []string) error { } // Merge current and new subjects - newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) + subjectNames := generateSubjectNames(name, subjects) + newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) _, err := natsJS.UpdateStream(&nats.StreamConfig{ Name: name, diff --git a/utils.go b/utils.go index 59942bc..8ffd670 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,10 @@ package natsio -import "github.com/thoas/go-funk" +import ( + "fmt" + + "github.com/thoas/go-funk" +) // mergeAndUniqueArrayStrings ... func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { @@ -10,3 +14,17 @@ func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { result = funk.UniqString(result) return result } + +// generateSubjectNames ... +func generateSubjectNames(streamName string, subjects []string) []string { + var result = make([]string, 0) + for _, subject := range subjects { + name := combineStreamAndSubjectName(streamName, subject) + result = append(result, name) + } + return result +} + +func combineStreamAndSubjectName(stream, subject string) string { + return fmt.Sprintf("%s.%s", stream, subject) +}