natsio/stream.go

70 lines
1.7 KiB
Go
Raw Normal View History

2021-10-08 04:23:59 +00:00
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
2021-10-08 04:40:59 +00:00
// GetStreamInfo ...
func GetStreamInfo(name string) (*nats.StreamInfo, error) {
return natsJS.StreamInfo(name)
}
2021-10-08 04:23:59 +00:00
// AddStream add new stream, with default config
2021-10-08 07:41:58 +00:00
// Due to subject must have a unique name, subject name will be combined with stream name
// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1
2021-10-08 04:23:59 +00:00
func AddStream(name string, subjects []string) error {
// Get info about the stream
2021-10-08 05:00:03 +00:00
stream, _ := GetStreamInfo(name)
2021-10-08 07:41:58 +00:00
2021-10-08 04:23:59 +00:00
// If stream not found, create new
if stream == nil {
2021-10-08 07:41:58 +00:00
subjectNames := generateSubjectNames(name, subjects)
2021-10-08 04:33:32 +00:00
_, err := natsJS.AddStream(&nats.StreamConfig{
2021-10-08 04:23:59 +00:00
Name: name,
2021-10-08 07:41:58 +00:00
Subjects: subjectNames,
2021-10-08 04:23:59 +00:00
})
if err != nil {
msg := fmt.Sprintf("add stream error: %s", err.Error())
return errors.New(msg)
}
}
return nil
}
2021-10-08 05:00:03 +00:00
// DeleteStream ...
func DeleteStream(name string) error {
if err := natsJS.DeleteStream(name); err != nil {
msg := fmt.Sprintf("delete stream error: %s", err.Error())
return errors.New(msg)
}
return nil
}
// AddStreamSubjects ...
func AddStreamSubjects(name string, subjects []string) error {
// Get info about the stream
stream, _ := GetStreamInfo(name)
if stream == nil {
msg := fmt.Sprintf("error when adding stream %s subjects: stream not found", name)
return errors.New(msg)
}
// Merge current and new subjects
2021-10-08 07:41:58 +00:00
subjectNames := generateSubjectNames(name, subjects)
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
2021-10-08 05:00:03 +00:00
_, err := natsJS.UpdateStream(&nats.StreamConfig{
Name: name,
Subjects: newSubjects,
})
if err != nil {
msg := fmt.Sprintf("add stream error: %s", err.Error())
return errors.New(msg)
}
return nil
}