fix rule of stream and subject name

This commit is contained in:
Nam Huynh 2021-10-08 14:41:58 +07:00
parent ad6c1c31c9
commit 35714c8292
3 changed files with 36 additions and 10 deletions

View File

@ -9,21 +9,25 @@ import (
) )
// Publish ... // Publish ...
func Publish(subject string, data interface{}) error { func Publish(stream, subject string, data interface{}) error {
channel := combineStreamAndSubjectName(stream, subject)
b, _ := json.Marshal(data) b, _ := json.Marshal(data)
_, err := natsJS.PublishAsync(subject, b) _, err := natsJS.PublishAsync(channel, b)
if err != nil { if err != nil {
msg := fmt.Sprintf("publish message error: %s", err.Error()) msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error())
return errors.New(msg) return errors.New(msg)
} }
return nil return nil
} }
// Subscribe ... // Subscribe ...
func Subscribe(subject string, cb nats.MsgHandler) error { func Subscribe(stream, subject string, cb nats.MsgHandler) error {
_, err := natsJS.Subscribe(subject, cb) channel := combineStreamAndSubjectName(stream, subject)
_, err := natsJS.Subscribe(channel, cb)
if err != nil { if err != nil {
msg := fmt.Sprintf("subscribe subject %s error: %s", subject, err.Error()) msg := fmt.Sprintf("subscribe subject %s error: %s", channel, err.Error())
return errors.New(msg) return errors.New(msg)
} }
return nil return nil

View File

@ -13,15 +13,18 @@ func GetStreamInfo(name string) (*nats.StreamInfo, error) {
} }
// AddStream add new stream, with default config // AddStream add new stream, with default config
// Due to subject must have a unique name, subject name will be combined with stream name
// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1
func AddStream(name string, subjects []string) error { func AddStream(name string, subjects []string) error {
// Get info about the stream // Get info about the stream
stream, _ := GetStreamInfo(name) stream, _ := GetStreamInfo(name)
// If stream not found, create new // If stream not found, create new
if stream == nil { if stream == nil {
subjectNames := generateSubjectNames(name, subjects)
_, err := natsJS.AddStream(&nats.StreamConfig{ _, err := natsJS.AddStream(&nats.StreamConfig{
Name: name, Name: name,
Subjects: subjects, Subjects: subjectNames,
Storage: nats.FileStorage,
}) })
if err != nil { if err != nil {
msg := fmt.Sprintf("add stream error: %s", err.Error()) msg := fmt.Sprintf("add stream error: %s", err.Error())
@ -51,7 +54,8 @@ func AddStreamSubjects(name string, subjects []string) error {
} }
// Merge current and new subjects // Merge current and new subjects
newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) subjectNames := generateSubjectNames(name, subjects)
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
_, err := natsJS.UpdateStream(&nats.StreamConfig{ _, err := natsJS.UpdateStream(&nats.StreamConfig{
Name: name, Name: name,

View File

@ -1,6 +1,10 @@
package natsio package natsio
import "github.com/thoas/go-funk" import (
"fmt"
"github.com/thoas/go-funk"
)
// mergeAndUniqueArrayStrings ... // mergeAndUniqueArrayStrings ...
func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string {
@ -10,3 +14,17 @@ func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string {
result = funk.UniqString(result) result = funk.UniqString(result)
return result return result
} }
// generateSubjectNames ...
func generateSubjectNames(streamName string, subjects []string) []string {
var result = make([]string, 0)
for _, subject := range subjects {
name := combineStreamAndSubjectName(streamName, subject)
result = append(result, name)
}
return result
}
func combineStreamAndSubjectName(stream, subject string) string {
return fmt.Sprintf("%s.%s", stream, subject)
}