From 688b8d97549fbfdd23890e6ca729985526a124b1 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:06:11 +0700 Subject: [PATCH 01/12] add pull subscribe --- jetstream_pubsub.go | 25 ++++++++++++++++++++++++- jetstream_queue.go | 2 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 069f551..bc2f584 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -13,7 +13,7 @@ func (js JetStream) Publish(stream, subject string, payload []byte) error { _, err := js.instance.PublishAsync(channel, payload) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil @@ -30,3 +30,26 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats } return sub, nil } + +// PullSubscribe ... +// +// Example: +// +// sub, err := natsio.PullSubscribe("A_STREAM", "A_OBJECT") +// +// for { +// messages, err := sub.Fetch(10) +// // process each messages +// } +// +func (js JetStream) PullSubscribe(stream, subject string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + + sub, err := js.instance.PullSubscribe(channel, subject) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s error: %s", channel, err.Error()) + return nil, errors.New(msg) + } + + return sub, nil +} diff --git a/jetstream_queue.go b/jetstream_queue.go index 08ff2f0..10db4d7 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -13,7 +13,7 @@ func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.Ms _, err := js.instance.QueueSubscribe(channel, queueName, cb) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil -- 2.34.1 From 240cc725e8dde7bfb54b311fa61cf365b8a26859 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:23:22 +0700 Subject: [PATCH 02/12] fix pullsub channel name --- jetstream_pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index bc2f584..daad7cd 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -45,7 +45,7 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats func (js JetStream) PullSubscribe(stream, subject string) (*nats.Subscription, error) { channel := combineStreamAndSubjectName(stream, subject) - sub, err := js.instance.PullSubscribe(channel, subject) + sub, err := js.instance.PullSubscribe(stream, subject) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s error: %s", channel, err.Error()) return nil, errors.New(msg) -- 2.34.1 From 52d1c4839f6ee0fdc892f81a243659898ab17657 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:27:40 +0700 Subject: [PATCH 03/12] fix pullsub channel name --- jetstream_pubsub.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index daad7cd..311854e 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -35,19 +35,19 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // // Example: // -// sub, err := natsio.PullSubscribe("A_STREAM", "A_OBJECT") +// js := natsio.GetJetStream() +// +// sub, err := js.PullSubscribe("A_SUBJECT", "A_DURABLE") // // for { // messages, err := sub.Fetch(10) // // process each messages // } // -func (js JetStream) PullSubscribe(stream, subject string) (*nats.Subscription, error) { - channel := combineStreamAndSubjectName(stream, subject) - - sub, err := js.instance.PullSubscribe(stream, subject) +func (js JetStream) PullSubscribe(subject, durable string) (*nats.Subscription, error) { + sub, err := js.instance.PullSubscribe(subject, durable) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", subject, durable, err.Error()) return nil, errors.New(msg) } -- 2.34.1 From f6d6b5afee8ed28e368429556a5a8ae903add5dd Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:49:07 +0700 Subject: [PATCH 04/12] add check consumer --- jetstream_pubsub.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 311854e..066db87 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -44,7 +44,12 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // // process each messages // } // -func (js JetStream) PullSubscribe(subject, durable string) (*nats.Subscription, error) { +func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*nats.Subscription, error) { + // Check if consumer existed + con, err := js.instance.ConsumerInfo(stream, consumer) + fmt.Println("con", con) + fmt.Println("err", err) + sub, err := js.instance.PullSubscribe(subject, durable) if err != nil { msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", subject, durable, err.Error()) -- 2.34.1 From 941e4c5ca3098f857b5ec9d3f7b2bbbf1ba4b08c Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:55:13 +0700 Subject: [PATCH 05/12] add check consumer --- jetstream_pubsub.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 066db87..c4c1aaa 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -49,6 +49,16 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n con, err := js.instance.ConsumerInfo(stream, consumer) fmt.Println("con", con) fmt.Println("err", err) + if con == nil { + info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: durable, + }) + if err != nil { + fmt.Println("CONSUMER INFO", info) + } else { + fmt.Println("ADD CONSUMER ERROR", err) + } + } sub, err := js.instance.PullSubscribe(subject, durable) if err != nil { -- 2.34.1 From fe6e8c24719c7383a1be8c80f5be5d09f14ed42b Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 10:58:34 +0700 Subject: [PATCH 06/12] add check consumer --- jetstream_pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index c4c1aaa..2586eb8 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -53,7 +53,7 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ Durable: durable, }) - if err != nil { + if err == nil { fmt.Println("CONSUMER INFO", info) } else { fmt.Println("ADD CONSUMER ERROR", err) -- 2.34.1 From 11bc0b9552e4b0fd19f72284c7ed01c5ff3ef590 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:00:26 +0700 Subject: [PATCH 07/12] add check consumer --- jetstream_pubsub.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 2586eb8..28bc81f 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -51,7 +51,8 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n fmt.Println("err", err) if con == nil { info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ - Durable: durable, + Durable: durable, + AckPolicy: nats.AckExplicitPolicy, }) if err == nil { fmt.Println("CONSUMER INFO", info) -- 2.34.1 From cfbda097a03b385f7ef67c846b6c7da8ee2ca552 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:03:15 +0700 Subject: [PATCH 08/12] add check consumer --- jetstream_pubsub.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 28bc81f..806ac7c 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -45,14 +45,17 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // } // func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + // Check if consumer existed con, err := js.instance.ConsumerInfo(stream, consumer) fmt.Println("con", con) fmt.Println("err", err) if con == nil { info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ - Durable: durable, - AckPolicy: nats.AckExplicitPolicy, + Durable: durable, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: channel, }) if err == nil { fmt.Println("CONSUMER INFO", info) @@ -63,7 +66,7 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n sub, err := js.instance.PullSubscribe(subject, durable) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", subject, durable, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", channel, durable, err.Error()) return nil, errors.New(msg) } -- 2.34.1 From f83159126665175c887df0a05bacc3c449233efa Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:07:46 +0700 Subject: [PATCH 09/12] add check consumer --- jetstream_pubsub.go | 2 +- natsio.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 806ac7c..e71e6b3 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -55,7 +55,7 @@ func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*n info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ Durable: durable, AckPolicy: nats.AckExplicitPolicy, - FilterSubject: channel, + FilterSubject: subject, }) if err == nil { fmt.Println("CONSUMER INFO", info) diff --git a/natsio.go b/natsio.go index 33bdbb2..23a3903 100644 --- a/natsio.go +++ b/natsio.go @@ -21,6 +21,9 @@ type JetStream struct { var ( natsServer Server natsJetStream JetStream + + // FIXME: delete this + jsPublic nats.JetStreamContext ) // Connect ... @@ -62,6 +65,9 @@ func Connect(cfg Config) error { } natsJetStream.instance = js + // FIXME: delete this + jsPublic = js + return nil } @@ -74,3 +80,8 @@ func GetServer() Server { func GetJetStream() JetStream { return natsJetStream } + +// GetJSPublic ... +func GetJSPublic() nats.JetStreamContext { + return jsPublic +} -- 2.34.1 From 095e12a241e5f4eebf99d0df9c080612e67ea15f Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:19:01 +0700 Subject: [PATCH 10/12] add check consumer --- jetstream_pubsub.go | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index e71e6b3..dc4b142 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -37,36 +37,27 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats // // js := natsio.GetJetStream() // -// sub, err := js.PullSubscribe("A_SUBJECT", "A_DURABLE") +// sub, err := js.PullSubscribe("A_SUBJECT", "A_SUBJECT", "A_CONSUMER") // // for { // messages, err := sub.Fetch(10) // // process each messages // } // -func (js JetStream) PullSubscribe(stream, subject, durable, consumer string) (*nats.Subscription, error) { +func (js JetStream) PullSubscribe(stream, subject, consumer string) (*nats.Subscription, error) { channel := combineStreamAndSubjectName(stream, subject) // Check if consumer existed - con, err := js.instance.ConsumerInfo(stream, consumer) - fmt.Println("con", con) - fmt.Println("err", err) - if con == nil { - info, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ - Durable: durable, - AckPolicy: nats.AckExplicitPolicy, - FilterSubject: subject, - }) - if err == nil { - fmt.Println("CONSUMER INFO", info) - } else { - fmt.Println("ADD CONSUMER ERROR", err) - } + con, err := js.GetConsumerInfo(stream, consumer) + if con == nil || err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, stream) + return nil, errors.New(msg) } - sub, err := js.instance.PullSubscribe(subject, durable) + // Pull + sub, err := js.instance.PullSubscribe(channel, consumer) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - durable #%s error: %s", channel, durable, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - consumer #%s error: %s", channel, consumer, err.Error()) return nil, errors.New(msg) } -- 2.34.1 From 8c1ee783f36e7c53da3a1127af0ce813478e2971 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:19:58 +0700 Subject: [PATCH 11/12] add check consumer --- jetstream_consumer.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 jetstream_consumer.go diff --git a/jetstream_consumer.go b/jetstream_consumer.go new file mode 100644 index 0000000..da38249 --- /dev/null +++ b/jetstream_consumer.go @@ -0,0 +1,31 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// GetConsumerInfo ... +func (js JetStream) GetConsumerInfo(stream, name string) (*nats.ConsumerInfo, error) { + return js.instance.ConsumerInfo(stream, name) +} + +// AddConsumer ... +func (js JetStream) AddConsumer(stream, subject, name string) error { + channel := combineStreamAndSubjectName(stream, subject) + + // Add + _, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: name, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: channel, + }) + + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, stream, err.Error()) + return errors.New(msg) + } + return nil +} -- 2.34.1 From 76996c81422bb3c3f22cc06dc19fb4a37d7613d1 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 18 Mar 2022 11:24:12 +0700 Subject: [PATCH 12/12] done add pull sub --- natsio.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/natsio.go b/natsio.go index 23a3903..33bdbb2 100644 --- a/natsio.go +++ b/natsio.go @@ -21,9 +21,6 @@ type JetStream struct { var ( natsServer Server natsJetStream JetStream - - // FIXME: delete this - jsPublic nats.JetStreamContext ) // Connect ... @@ -65,9 +62,6 @@ func Connect(cfg Config) error { } natsJetStream.instance = js - // FIXME: delete this - jsPublic = js - return nil } @@ -80,8 +74,3 @@ func GetServer() Server { func GetJetStream() JetStream { return natsJetStream } - -// GetJSPublic ... -func GetJSPublic() nats.JetStreamContext { - return jsPublic -} -- 2.34.1