73 lines
1.8 KiB
Go
73 lines
1.8 KiB
Go
|
package natsio
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
|
||
|
"github.com/nats-io/nats.go"
|
||
|
)
|
||
|
|
||
|
func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
|
||
|
cfg := nats.StreamConfig{
|
||
|
Name: stream,
|
||
|
Subjects: subjects,
|
||
|
Retention: nats.WorkQueuePolicy,
|
||
|
MaxConsumers: -1,
|
||
|
MaxMsgSize: -1,
|
||
|
MaxMsgs: -1,
|
||
|
NoAck: false,
|
||
|
}
|
||
|
return &cfg
|
||
|
}
|
||
|
|
||
|
// GetStreamInfo ...
|
||
|
func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) {
|
||
|
return js.instance.StreamInfo(js.streamName)
|
||
|
}
|
||
|
|
||
|
// addStream add new stream, with default config
|
||
|
func (js JetStream) addStream() {
|
||
|
// get info about the stream
|
||
|
stream, _ := js.GetStreamInfo()
|
||
|
|
||
|
// if stream not found, create new
|
||
|
if stream == nil {
|
||
|
_, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{}))
|
||
|
if err != nil {
|
||
|
fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// DeleteStream ...
|
||
|
func (js JetStream) DeleteStream() error {
|
||
|
if err := js.instance.DeleteStream(js.streamName); err != nil {
|
||
|
msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error())
|
||
|
return errors.New(msg)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// AddSubjects ...
|
||
|
func (js JetStream) AddSubjects(subjects []string) error {
|
||
|
// get stream info
|
||
|
stream, _ := js.GetStreamInfo()
|
||
|
if stream == nil {
|
||
|
msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName)
|
||
|
return errors.New(msg)
|
||
|
}
|
||
|
|
||
|
// merge current and new subjects
|
||
|
newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects)
|
||
|
|
||
|
// update
|
||
|
_, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects))
|
||
|
if err != nil {
|
||
|
msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error())
|
||
|
return errors.New(msg)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|