natsio/jetstream_stream.go

73 lines
1.8 KiB
Go
Raw Permalink Normal View History

2021-10-08 04:23:59 +00:00
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
cfg := nats.StreamConfig{
Name: stream,
Subjects: subjects,
Retention: nats.WorkQueuePolicy,
MaxConsumers: -1,
MaxMsgSize: -1,
MaxMsgs: -1,
NoAck: false,
}
return &cfg
}
2021-10-08 04:40:59 +00:00
// GetStreamInfo ...
2022-12-04 15:10:05 +00:00
func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) {
return js.instance.StreamInfo(js.streamName)
2021-10-08 04:40:59 +00:00
}
2022-12-15 10:34:47 +00:00
// addStream add new stream, with default config
func (js JetStream) addStream() {
// get info about the stream
2022-12-04 15:10:05 +00:00
stream, _ := js.GetStreamInfo()
2021-10-08 07:41:58 +00:00
2022-12-15 10:34:47 +00:00
// if stream not found, create new
2021-10-08 04:23:59 +00:00
if stream == nil {
2022-12-15 10:34:47 +00:00
_, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{}))
2021-10-08 04:23:59 +00:00
if err != nil {
2022-12-15 10:34:47 +00:00
fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error())
2021-10-08 04:23:59 +00:00
}
}
}
2021-10-08 05:00:03 +00:00
// DeleteStream ...
2022-12-04 15:10:05 +00:00
func (js JetStream) DeleteStream() error {
if err := js.instance.DeleteStream(js.streamName); err != nil {
2022-12-15 10:34:47 +00:00
msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error())
2021-10-08 05:00:03 +00:00
return errors.New(msg)
}
2022-12-15 10:34:47 +00:00
2021-10-08 05:00:03 +00:00
return nil
}
2022-12-15 10:34:47 +00:00
// AddSubjects ...
func (js JetStream) AddSubjects(subjects []string) error {
// get stream info
2022-12-04 15:10:05 +00:00
stream, _ := js.GetStreamInfo()
2021-10-08 05:00:03 +00:00
if stream == nil {
2022-12-15 10:34:47 +00:00
msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName)
2021-10-08 05:00:03 +00:00
return errors.New(msg)
}
2022-12-15 10:34:47 +00:00
// merge current and new subjects
newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects)
2021-10-08 05:00:03 +00:00
2022-12-15 10:34:47 +00:00
// update
2022-12-04 15:10:05 +00:00
_, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects))
2021-10-08 05:00:03 +00:00
if err != nil {
2022-12-15 10:34:47 +00:00
msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error())
2021-10-08 05:00:03 +00:00
return errors.New(msg)
}
2022-12-15 10:34:47 +00:00
2021-10-08 05:00:03 +00:00
return nil
}