Warehouse nats #13

Merged
trunglt251292 merged 7 commits from warehouse-nats into develop 2022-08-30 07:47:52 +00:00
2 changed files with 9 additions and 6 deletions
Showing only changes of commit 095f3779db - Show all commits

View File

@ -14,10 +14,17 @@ func (js JetStream) GetConsumerInfo(stream, name string) (*nats.ConsumerInfo, er
// AddConsumer ... // AddConsumer ...
func (js JetStream) AddConsumer(stream, subject, name string) error { func (js JetStream) AddConsumer(stream, subject, name string) error {
// Get consumer first, return if existed
consumer, err := js.GetConsumerInfo(stream, name)
if consumer != nil {
return nil
}
// Generate channel name
channel := combineStreamAndSubjectName(stream, subject) channel := combineStreamAndSubjectName(stream, subject)
// Add // Add
_, err := js.instance.AddConsumer(stream, &nats.ConsumerConfig{ _, err = js.instance.AddConsumer(stream, &nats.ConsumerConfig{
Durable: name, Durable: name,
AckPolicy: nats.AckExplicitPolicy, AckPolicy: nats.AckExplicitPolicy,
FilterSubject: channel, FilterSubject: channel,

View File

@ -22,11 +22,7 @@ func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
// Reply ... // Reply ...
func (sv Server) Reply(msg *nats.Msg, payload []byte) error { func (sv Server) Reply(msg *nats.Msg, payload []byte) error {
err := sv.instance.Publish(msg.Reply, payload) return sv.instance.Publish(msg.Reply, payload)
// Ack message
msg.Ack()
return err
} }
// Subscribe ... // Subscribe ...