diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index 069f551..bc2f584 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -13,7 +13,7 @@ func (js JetStream) Publish(stream, subject string, payload []byte) error { _, err := js.instance.PublishAsync(channel, payload) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil @@ -30,3 +30,26 @@ func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats } return sub, nil } + +// PullSubscribe ... +// +// Example: +// +// sub, err := natsio.PullSubscribe("A_STREAM", "A_OBJECT") +// +// for { +// messages, err := sub.Fetch(10) +// // process each messages +// } +// +func (js JetStream) PullSubscribe(stream, subject string) (*nats.Subscription, error) { + channel := combineStreamAndSubjectName(stream, subject) + + sub, err := js.instance.PullSubscribe(channel, subject) + if err != nil { + msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s error: %s", channel, err.Error()) + return nil, errors.New(msg) + } + + return sub, nil +} diff --git a/jetstream_queue.go b/jetstream_queue.go index 08ff2f0..10db4d7 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -13,7 +13,7 @@ func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.Ms _, err := js.instance.QueueSubscribe(channel, queueName, cb) if err != nil { - msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject %s error: %s", channel, err.Error()) + msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject #%s error: %s", channel, err.Error()) return errors.New(msg) } return nil