From ee34e47598b7fceca7921960d5228bb04451bfb1 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:28:49 +0700 Subject: [PATCH] add pull sub and consumer * add pull subscribe * fix pullsub channel name * fix pullsub channel name * add check consumer * add check consumer * add check consumer * add check consumer * add check consumer * add check consumer * add check consumer * add check consumer * done add pull sub --- jetstream_consumer.go | 31 +++++++++++++++++++++++++++++++ jetstream_pubsub.go | 35 ++++++++++++++++++++++++++++++++++- jetstream_queue.go | 2 +- 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 jetstream_consumer.go diff --git a/jetstream_consumer.go b/jetstream_consumer.go new file mode 100644 index 0000000..da38249 --- /dev/null +++ b/jetstream_consumer.go @@ -0,0 +1,31 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// GetConsumerInfo ... +func (js JetStream) GetConsumerInfo(stream, name string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(stream, name) +} + +// AddConsumer ... +func (js JetStream) AddConsumer(stream, subject, name string) error { + channel := combineStreamAndSubjectName(stream, subject) + + // Add + _, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: name, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: channel, + }) + + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, stream, err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 069f551..dc4b142 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -13,7 +13,7 @@ func (js JetStream) Publish(stream, subject string, payload []byte) error { _, err := js.instance.PublishAsync(channel, payload) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil @@ -30,3 +30,36 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats } return sub, nil } + +// PullSubscribe ... +// +// Example: +// +// js := natsio.GetJetStream() +// +// sub, err := js.PullSubscribe("A_SUBJECT", "A_SUBJECT", "A_CONSUMER") +// +// for { +// messages, err := sub.Fetch(10) +// // process each messages +// } +// +func (js JetStream) PullSubscribe(stream, subject, consumer string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + + // Check if consumer existed + con, err := js.GetConsumerInfo(stream, consumer) + if con == nil || err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, stream) + return nil, errors.New(msg) + } + + // Pull + sub, err := js.instance.PullSubscribe(channel, consumer) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - consumer #%s error: %s", channel, consumer, err.Error()) + return nil, errors.New(msg) + } + + return sub, nil +} diff --git a/jetstream_queue.go b/jetstream_queue.go index 08ff2f0..10db4d7 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -13,7 +13,7 @@ func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.Ms _, err := js.instance.QueueSubscribe(channel, queueName, cb) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil