check #2
|
@ -7,6 +7,19 @@ import (
|
|||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
|
||||
cfg := nats.StreamConfig{
|
||||
Name: stream,
|
||||
Subjects: subjects,
|
||||
Retention: nats.WorkQueuePolicy,
|
||||
MaxConsumers: -1,
|
||||
MaxMsgSize: -1,
|
||||
MaxMsgs: -1,
|
||||
NoAck: false,
|
||||
}
|
||||
return &cfg
|
||||
}
|
||||
|
||||
// GetStreamInfo ...
|
||||
func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) {
|
||||
return js.instance.StreamInfo(name)
|
||||
|
@ -22,15 +35,7 @@ func (js JetStream) AddStream(name string, subjects []string) error {
|
|||
// If stream not found, create new
|
||||
if stream == nil {
|
||||
subjectNames := generateSubjectNames(name, subjects)
|
||||
_, err := js.instance.AddStream(&nats.StreamConfig{
|
||||
Name: name,
|
||||
Subjects: subjectNames,
|
||||
Retention: nats.WorkQueuePolicy,
|
||||
MaxConsumers: -1,
|
||||
MaxMsgSize: -1,
|
||||
MaxMsgs: -1,
|
||||
NoAck: false,
|
||||
})
|
||||
_, err := js.instance.AddStream(generateStreamConfig(name, subjectNames))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
|
||||
return errors.New(msg)
|
||||
|
@ -62,10 +67,7 @@ func (js JetStream) AddStreamSubjects(name string, subjects []string) error {
|
|||
subjectNames := generateSubjectNames(name, subjects)
|
||||
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
|
||||
|
||||
_, err := js.instance.UpdateStream(&nats.StreamConfig{
|
||||
Name: name,
|
||||
Subjects: newSubjects,
|
||||
})
|
||||
_, err := js.instance.UpdateStream(generateStreamConfig(name, newSubjects))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
|
||||
return errors.New(msg)
|
||||
|
|
Loading…
Reference in New Issue