From cfbda097a03b385f7ef67c846b6c7da8ee2ca552 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:03:15 +0700 Subject: [PATCH] add check consumer --- jetstream_pubsub.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 28bc81f..806ac7c 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -45,14 +45,17 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // } // func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + // Check if consumer existed con, err := js.instance.ConsumerInfo(stream, consumer) fmt.Println("con", con) fmt.Println("err", err) if con == nil { info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ - Durable: durable, - AckPolicy: nats.AckExplicitPolicy, + Durable: durable, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: channel, }) if err == nil { fmt.Println("CONSUMER INFO", info) @@ -63,7 +66,7 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n sub, err := js.instance.PullSubscribe(subject, durable) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", subject, durable, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", channel, durable, err.Error()) return nil, errors.New(msg) }