Warehouse nats #13
|
@ -1,5 +1,7 @@
|
|||
package natsio
|
||||
|
||||
import "time"
|
||||
|
||||
// Config ...
|
||||
type Config struct {
|
||||
// Connect url
|
||||
|
@ -13,6 +15,9 @@ type Config struct {
|
|||
|
||||
// TLS config
|
||||
TLS *TLSConfig
|
||||
|
||||
// RequestTimeout
|
||||
RequestTimeout time.Duration
|
||||
}
|
||||
|
||||
// TLSConfig ...
|
||||
|
|
|
@ -3,8 +3,6 @@ package natsio
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/logrusorgru/aurora"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
@ -12,6 +10,7 @@ import (
|
|||
// Server ...
|
||||
type Server struct {
|
||||
instance *nats.Conn
|
||||
Config Config
|
||||
}
|
||||
|
||||
// JetStream ...
|
||||
|
@ -33,8 +32,6 @@ func Connect(cfg Config) error {
|
|||
// Connect options
|
||||
opts := make([]nats.Option, 0)
|
||||
|
||||
opts = append(opts, nats.Timeout(1*time.Minute))
|
||||
|
||||
// Has authentication
|
||||
if cfg.User != "" {
|
||||
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
|
||||
|
@ -56,6 +53,7 @@ func Connect(cfg Config) error {
|
|||
|
||||
// Set client
|
||||
natsServer.instance = nc
|
||||
natsServer.Config = cfg
|
||||
|
||||
// Create jet stream context
|
||||
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
||||
|
|
|
@ -13,7 +13,11 @@ const requestTimeout = 10 * time.Second
|
|||
|
||||
// Request ...
|
||||
func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
|
||||
return sv.instance.Request(subject, payload, requestTimeout)
|
||||
timeout := requestTimeout
|
||||
if sv.Config.RequestTimeout > 0 {
|
||||
timeout = sv.Config.RequestTimeout
|
||||
}
|
||||
return sv.instance.Request(subject, payload, timeout)
|
||||
}
|
||||
|
||||
// Reply ...
|
||||
|
|
Loading…
Reference in New Issue