package natsio import ( "errors" "fmt" "time" "github.com/nats-io/nats.go" ) // Default timeout 10s const requestTimeout = 10 * time.Second // Request ... func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) { timeout := requestTimeout if s.cfg.RequestTimeout > 0 { timeout = s.cfg.RequestTimeout } msg, err := s.instance.Request(subject, payload, timeout) if errors.Is(err, nats.ErrNoResponders) { fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject) } else if s.cfg.Debug { fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject) } return msg, err } // RequestWithBindData ... func (s Server) RequestWithBindData(subject string, payload []byte, result interface{}) error { msg, err := s.Request(subject, payload) if msg == nil || err != nil { return err } // map return BytesToInterface(msg.Data, result) } // Reply ... func (s Server) Reply(msg *nats.Msg, payload []byte) error { return s.instance.Publish(msg.Reply, payload) } // Subscribe ... func (s Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) { sub, err := s.instance.Subscribe(subject, cb) if err != nil { msg := fmt.Sprintf("[natsio.Server] subscribe subject %s error: %s", subject, err.Error()) return nil, errors.New(msg) } if s.cfg.Debug { fmt.Printf("[natsio.Server] subscribe to subject #%s \n", subject) } return sub, nil } // QueueSubscribe ... func (s Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) { sub, err := s.instance.QueueSubscribe(subject, queue, cb) if err != nil { msg := fmt.Sprintf("[natsio.Server] queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error()) return nil, errors.New(msg) } if s.cfg.Debug { fmt.Printf("[natsio.Server] queue subscribe to subject #%s \n", subject) } return sub, nil }