minor bugs fixed
This commit is contained in:
parent
685d324788
commit
7512b669b1
|
@ -14,7 +14,7 @@ func (js JetStream) Publish(subject string, payload []byte) error {
|
||||||
return errors.New(msg)
|
return errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if js.debug {
|
if js.cfg.Debug {
|
||||||
fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject)
|
fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscri
|
||||||
return nil, errors.New(msg)
|
return nil, errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if js.debug {
|
if js.cfg.Debug {
|
||||||
fmt.Printf("[natsio.JetStream] subscribe new message from subject #%s \n", subject)
|
fmt.Printf("[natsio.JetStream] subscribe new message from subject #%s \n", subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription,
|
||||||
return nil, errors.New(msg)
|
return nil, errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if js.debug {
|
if js.cfg.Debug {
|
||||||
fmt.Printf("[natsio.JetStream] pull subscribe new message from subject #%s \n", subject)
|
fmt.Printf("[natsio.JetStream] pull subscribe new message from subject #%s \n", subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler
|
||||||
return errors.New(msg)
|
return errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if js.debug {
|
if js.cfg.Debug {
|
||||||
fmt.Printf("[natsio.JetStream] queue subscribe new message from subject #%s \n", subject)
|
fmt.Printf("[natsio.JetStream] queue subscribe new message from subject #%s \n", subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
13
natsio.go
13
natsio.go
|
@ -10,20 +10,19 @@ import (
|
||||||
// Server ...
|
// Server ...
|
||||||
type Server struct {
|
type Server struct {
|
||||||
instance *nats.Conn
|
instance *nats.Conn
|
||||||
debug bool
|
cfg Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// JetStream ...
|
// JetStream ...
|
||||||
type JetStream struct {
|
type JetStream struct {
|
||||||
instance nats.JetStreamContext
|
instance nats.JetStreamContext
|
||||||
debug bool
|
cfg Config
|
||||||
streamName string
|
streamName string
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
natsServer Server
|
natsServer Server
|
||||||
natsJetStream JetStream
|
natsJetStream JetStream
|
||||||
globalConfig Config
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Connect ...
|
// Connect ...
|
||||||
|
@ -55,7 +54,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) {
|
||||||
|
|
||||||
// set client
|
// set client
|
||||||
natsServer.instance = nc
|
natsServer.instance = nc
|
||||||
natsServer.debug = cfg.Debug
|
natsServer.cfg = cfg
|
||||||
|
|
||||||
// create jet stream context
|
// create jet stream context
|
||||||
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
||||||
|
@ -64,11 +63,11 @@ func Connect(cfg Config) (*Server, *JetStream, error) {
|
||||||
return nil, nil, errors.New(msg)
|
return nil, nil, errors.New(msg)
|
||||||
}
|
}
|
||||||
natsJetStream.instance = js
|
natsJetStream.instance = js
|
||||||
natsJetStream.debug = cfg.Debug
|
natsJetStream.cfg = cfg
|
||||||
natsJetStream.streamName = cfg.StreamName
|
natsJetStream.streamName = cfg.StreamName
|
||||||
|
|
||||||
// assign client type
|
// add stream
|
||||||
globalConfig = cfg
|
natsJetStream.addStream()
|
||||||
|
|
||||||
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
|
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
|
||||||
|
|
||||||
|
|
|
@ -14,13 +14,13 @@ const requestTimeout = 10 * time.Second
|
||||||
// Request ...
|
// Request ...
|
||||||
func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) {
|
func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) {
|
||||||
timeout := requestTimeout
|
timeout := requestTimeout
|
||||||
if globalConfig.RequestTimeout > 0 {
|
if s.cfg.RequestTimeout > 0 {
|
||||||
timeout = globalConfig.RequestTimeout
|
timeout = s.cfg.RequestTimeout
|
||||||
}
|
}
|
||||||
msg, err := s.instance.Request(subject, payload, timeout)
|
msg, err := s.instance.Request(subject, payload, timeout)
|
||||||
if errors.Is(err, nats.ErrNoResponders) {
|
if errors.Is(err, nats.ErrNoResponders) {
|
||||||
fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject)
|
fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject)
|
||||||
} else if s.debug {
|
} else if s.cfg.Debug {
|
||||||
fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject)
|
fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
10
utils.go
10
utils.go
|
@ -21,15 +21,13 @@ func combineStreamAndSubjectName(stream, subject string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenerateJetStreamSubject ...
|
// GenerateJetStreamSubject ...
|
||||||
// GenerateJetStreamSubject("admin", "help_center", "get_ticket_by_id")
|
func GenerateJetStreamSubject(stream, server, service, subject string) string {
|
||||||
func GenerateJetStreamSubject(server, service, subject string) string {
|
return fmt.Sprintf("%s.jetstream.%s.%s.%s", stream, server, service, subject)
|
||||||
return fmt.Sprintf("%s.jetstream.%s.%s.%s", globalConfig.StreamName, server, service, subject)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenerateReqrepSubject ...
|
// GenerateReqrepSubject ...
|
||||||
// GenerateReqrepSubject("admin", "help_center", "get_ticket_by_id")
|
func GenerateReqrepSubject(stream, server, service, subject string) string {
|
||||||
func GenerateReqrepSubject(server, service, subject string) string {
|
return fmt.Sprintf("%s.reqrep.%s.%s.%s", stream, server, service, subject)
|
||||||
return fmt.Sprintf("%s.reqrep.%s.%s.%s", globalConfig.StreamName, server, service, subject)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InterfaceToBytes ...
|
// InterfaceToBytes ...
|
||||||
|
|
Loading…
Reference in New Issue