From 8c1ee783f36e7c53da3a1127af0ce813478e2971 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:19:58 +0700 Subject: [PATCH] add check consumer --- jetstream_consumer.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 jetstream_consumer.go diff --git a/jetstream_consumer.go b/jetstream_consumer.go new file mode 100644 index 0000000..da38249 --- /dev/null +++ b/jetstream_consumer.go @@ -0,0 +1,31 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// GetConsumerInfo ... +func (js JetStream) GetConsumerInfo(stream, name string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(stream, name) +} + +// AddConsumer ... +func (js JetStream) AddConsumer(stream, subject, name string) error { + channel := combineStreamAndSubjectName(stream, subject) + + // Add + _, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: name, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: channel, + }) + + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, stream, err.Error()) + return errors.New(msg) + } + return nil +}