2021-10-08 04:23:59 +00:00
|
|
|
package natsio
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
)
|
|
|
|
|
2022-03-21 03:12:47 +00:00
|
|
|
func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
|
|
|
|
cfg := nats.StreamConfig{
|
|
|
|
Name: stream,
|
|
|
|
Subjects: subjects,
|
|
|
|
Retention: nats.WorkQueuePolicy,
|
|
|
|
MaxConsumers: -1,
|
|
|
|
MaxMsgSize: -1,
|
|
|
|
MaxMsgs: -1,
|
|
|
|
NoAck: false,
|
|
|
|
}
|
|
|
|
return &cfg
|
|
|
|
}
|
|
|
|
|
2021-10-08 04:40:59 +00:00
|
|
|
// GetStreamInfo ...
|
2021-12-02 03:29:52 +00:00
|
|
|
func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) {
|
|
|
|
return js.instance.StreamInfo(name)
|
2021-10-08 04:40:59 +00:00
|
|
|
}
|
|
|
|
|
2021-10-08 04:23:59 +00:00
|
|
|
// AddStream add new stream, with default config
|
2021-10-08 07:41:58 +00:00
|
|
|
// Due to subject must have a unique name, subject name will be combined with stream name
|
|
|
|
// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1
|
2021-12-02 03:29:52 +00:00
|
|
|
func (js JetStream) AddStream(name string, subjects []string) error {
|
2021-10-08 04:23:59 +00:00
|
|
|
// Get info about the stream
|
2021-12-02 03:29:52 +00:00
|
|
|
stream, _ := js.GetStreamInfo(name)
|
2021-10-08 07:41:58 +00:00
|
|
|
|
2021-10-08 04:23:59 +00:00
|
|
|
// If stream not found, create new
|
|
|
|
if stream == nil {
|
2021-10-08 07:41:58 +00:00
|
|
|
subjectNames := generateSubjectNames(name, subjects)
|
2022-03-21 03:12:47 +00:00
|
|
|
_, err := js.instance.AddStream(generateStreamConfig(name, subjectNames))
|
2021-10-08 04:23:59 +00:00
|
|
|
if err != nil {
|
2021-12-02 03:29:52 +00:00
|
|
|
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
|
2021-10-08 04:23:59 +00:00
|
|
|
return errors.New(msg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2021-10-08 05:00:03 +00:00
|
|
|
|
|
|
|
// DeleteStream ...
|
2021-12-02 03:29:52 +00:00
|
|
|
func (js JetStream) DeleteStream(name string) error {
|
|
|
|
if err := js.instance.DeleteStream(name); err != nil {
|
|
|
|
msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error())
|
2021-10-08 05:00:03 +00:00
|
|
|
return errors.New(msg)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddStreamSubjects ...
|
2021-12-02 03:29:52 +00:00
|
|
|
func (js JetStream) AddStreamSubjects(name string, subjects []string) error {
|
2021-10-08 05:00:03 +00:00
|
|
|
// Get info about the stream
|
2021-12-02 03:29:52 +00:00
|
|
|
stream, _ := js.GetStreamInfo(name)
|
2021-10-08 05:00:03 +00:00
|
|
|
if stream == nil {
|
2021-12-02 03:29:52 +00:00
|
|
|
msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", name)
|
2021-10-08 05:00:03 +00:00
|
|
|
return errors.New(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Merge current and new subjects
|
2021-10-08 07:41:58 +00:00
|
|
|
subjectNames := generateSubjectNames(name, subjects)
|
|
|
|
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
|
2021-10-08 05:00:03 +00:00
|
|
|
|
2022-03-21 03:12:47 +00:00
|
|
|
_, err := js.instance.UpdateStream(generateStreamConfig(name, newSubjects))
|
2021-10-08 05:00:03 +00:00
|
|
|
if err != nil {
|
2021-12-02 03:29:52 +00:00
|
|
|
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
|
2021-10-08 05:00:03 +00:00
|
|
|
return errors.New(msg)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|