diff --git a/jetstream_pubsub.go b/jetstream_pubsub.go index c0c2e9c..6569abc 100644 --- a/jetstream_pubsub.go +++ b/jetstream_pubsub.go @@ -14,7 +14,7 @@ func (js JetStream) Publish(subject string, payload []byte) error { return errors.New(msg) } - if js.debug { + if js.cfg.Debug { fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject) } @@ -29,7 +29,7 @@ func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscri return nil, errors.New(msg) } - if js.debug { + if js.cfg.Debug { fmt.Printf("[natsio.JetStream] subscribe new message from subject #%s \n", subject) } @@ -51,7 +51,7 @@ func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, return nil, errors.New(msg) } - if js.debug { + if js.cfg.Debug { fmt.Printf("[natsio.JetStream] pull subscribe new message from subject #%s \n", subject) } diff --git a/jetstream_queue.go b/jetstream_queue.go index abcd581..1708378 100644 --- a/jetstream_queue.go +++ b/jetstream_queue.go @@ -15,7 +15,7 @@ func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler return errors.New(msg) } - if js.debug { + if js.cfg.Debug { fmt.Printf("[natsio.JetStream] queue subscribe new message from subject #%s \n", subject) } diff --git a/natsio.go b/natsio.go index c379e80..17df5e0 100644 --- a/natsio.go +++ b/natsio.go @@ -10,20 +10,19 @@ import ( // Server ... type Server struct { instance *nats.Conn - debug bool + cfg Config } // JetStream ... type JetStream struct { instance nats.JetStreamContext - debug bool + cfg Config streamName string } var ( natsServer Server natsJetStream JetStream - globalConfig Config ) // Connect ... @@ -55,7 +54,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) { // set client natsServer.instance = nc - natsServer.debug = cfg.Debug + natsServer.cfg = cfg // create jet stream context js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) @@ -64,11 +63,11 @@ func Connect(cfg Config) (*Server, *JetStream, error) { return nil, nil, errors.New(msg) } natsJetStream.instance = js - natsJetStream.debug = cfg.Debug + natsJetStream.cfg = cfg natsJetStream.streamName = cfg.StreamName - // assign client type - globalConfig = cfg + // add stream + natsJetStream.addStream() fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) diff --git a/server_reqres.go b/server_reqres.go index 8cfb871..72c2325 100644 --- a/server_reqres.go +++ b/server_reqres.go @@ -14,13 +14,13 @@ const requestTimeout = 10 * time.Second // Request ... func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) { timeout := requestTimeout - if globalConfig.RequestTimeout > 0 { - timeout = globalConfig.RequestTimeout + if s.cfg.RequestTimeout > 0 { + timeout = s.cfg.RequestTimeout } msg, err := s.instance.Request(subject, payload, timeout) if errors.Is(err, nats.ErrNoResponders) { fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject) - } else if s.debug { + } else if s.cfg.Debug { fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject) } diff --git a/utils.go b/utils.go index 229ed1e..f04c363 100644 --- a/utils.go +++ b/utils.go @@ -21,15 +21,13 @@ func combineStreamAndSubjectName(stream, subject string) string { } // GenerateJetStreamSubject ... -// GenerateJetStreamSubject("admin", "help_center", "get_ticket_by_id") -func GenerateJetStreamSubject(server, service, subject string) string { - return fmt.Sprintf("%s.jetstream.%s.%s.%s", globalConfig.StreamName, server, service, subject) +func GenerateJetStreamSubject(stream, server, service, subject string) string { + return fmt.Sprintf("%s.jetstream.%s.%s.%s", stream, server, service, subject) } // GenerateReqrepSubject ... -// GenerateReqrepSubject("admin", "help_center", "get_ticket_by_id") -func GenerateReqrepSubject(server, service, subject string) string { - return fmt.Sprintf("%s.reqrep.%s.%s.%s", globalConfig.StreamName, server, service, subject) +func GenerateReqrepSubject(stream, server, service, subject string) string { + return fmt.Sprintf("%s.reqrep.%s.%s.%s", stream, server, service, subject) } // InterfaceToBytes ...