package natsio import ( "errors" "fmt" "github.com/nats-io/nats.go" ) func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig { cfg := nats.StreamConfig{ Name: stream, Subjects: subjects, Retention: nats.WorkQueuePolicy, MaxConsumers: -1, MaxMsgSize: -1, MaxMsgs: -1, NoAck: false, } return &cfg } // GetStreamInfo ... func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) { return js.instance.StreamInfo(js.streamName) } // addStream add new stream, with default config func (js JetStream) addStream() { // get info about the stream stream, _ := js.GetStreamInfo() // if stream not found, create new if stream == nil { _, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{})) if err != nil { fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error()) } } } // DeleteStream ... func (js JetStream) DeleteStream() error { if err := js.instance.DeleteStream(js.streamName); err != nil { msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error()) return errors.New(msg) } return nil } // AddSubjects ... func (js JetStream) AddSubjects(subjects []string) error { // get stream info stream, _ := js.GetStreamInfo() if stream == nil { msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName) return errors.New(msg) } // merge current and new subjects newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) // update _, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects)) if err != nil { msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error()) return errors.New(msg) } return nil }