add server and service to config
This commit is contained in:
parent
b3519790e2
commit
01c89a341c
23
config.go
23
config.go
|
@ -1,6 +1,9 @@
|
||||||
package natsio
|
package natsio
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
// Config ...
|
// Config ...
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -21,6 +24,24 @@ type Config struct {
|
||||||
|
|
||||||
// Stream name
|
// Stream name
|
||||||
StreamName string
|
StreamName string
|
||||||
|
|
||||||
|
// Server
|
||||||
|
Server string
|
||||||
|
|
||||||
|
// Service
|
||||||
|
Service string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) validate() error {
|
||||||
|
if c.URL == "" {
|
||||||
|
return errors.New("connect URL is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Server == "" || c.Service == "" {
|
||||||
|
return errors.New("server and service name is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TLSConfig ...
|
// TLSConfig ...
|
||||||
|
|
25
natsio.go
25
natsio.go
|
@ -10,7 +10,6 @@ import (
|
||||||
// Server ...
|
// Server ...
|
||||||
type Server struct {
|
type Server struct {
|
||||||
instance *nats.Conn
|
instance *nats.Conn
|
||||||
Config Config
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// JetStream ...
|
// JetStream ...
|
||||||
|
@ -22,23 +21,25 @@ type JetStream struct {
|
||||||
var (
|
var (
|
||||||
natsServer Server
|
natsServer Server
|
||||||
natsJetStream JetStream
|
natsJetStream JetStream
|
||||||
|
globalConfig Config
|
||||||
)
|
)
|
||||||
|
|
||||||
// Connect ...
|
// Connect ...
|
||||||
func Connect(cfg Config) error {
|
func Connect(cfg Config) error {
|
||||||
if cfg.URL == "" {
|
// validate
|
||||||
return errors.New("connect URL is required")
|
if err := cfg.validate(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect options
|
// connect options
|
||||||
opts := make([]nats.Option, 0)
|
opts := make([]nats.Option, 0)
|
||||||
|
|
||||||
// Has authentication
|
// has authentication
|
||||||
if cfg.User != "" {
|
if cfg.User != "" {
|
||||||
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
|
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it has TLS
|
// if it has TLS
|
||||||
if cfg.TLS != nil {
|
if cfg.TLS != nil {
|
||||||
opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
|
opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
|
||||||
opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
|
opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
|
||||||
|
@ -50,13 +51,10 @@ func Connect(cfg Config) error {
|
||||||
return errors.New(msg)
|
return errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
|
// set client
|
||||||
|
|
||||||
// Set client
|
|
||||||
natsServer.instance = nc
|
natsServer.instance = nc
|
||||||
natsServer.Config = cfg
|
|
||||||
|
|
||||||
// Create jet stream context
|
// create jet stream context
|
||||||
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
|
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
|
||||||
|
@ -65,6 +63,11 @@ func Connect(cfg Config) error {
|
||||||
natsJetStream.instance = js
|
natsJetStream.instance = js
|
||||||
natsJetStream.streamName = cfg.StreamName
|
natsJetStream.streamName = cfg.StreamName
|
||||||
|
|
||||||
|
// assign client type
|
||||||
|
globalConfig = cfg
|
||||||
|
|
||||||
|
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,8 @@ const requestTimeout = 10 * time.Second
|
||||||
// Request ...
|
// Request ...
|
||||||
func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
|
func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
|
||||||
timeout := requestTimeout
|
timeout := requestTimeout
|
||||||
if sv.Config.RequestTimeout > 0 {
|
if globalConfig.RequestTimeout > 0 {
|
||||||
timeout = sv.Config.RequestTimeout
|
timeout = globalConfig.RequestTimeout
|
||||||
}
|
}
|
||||||
msg, err := sv.instance.Request(subject, payload, timeout)
|
msg, err := sv.instance.Request(subject, payload, timeout)
|
||||||
if errors.Is(err, nats.ErrNoResponders) {
|
if errors.Is(err, nats.ErrNoResponders) {
|
||||||
|
|
5
utils.go
5
utils.go
|
@ -28,3 +28,8 @@ func generateSubjectNames(streamName string, subjects []string) []string {
|
||||||
func combineStreamAndSubjectName(stream, subject string) string {
|
func combineStreamAndSubjectName(stream, subject string) string {
|
||||||
return fmt.Sprintf("%s.%s", stream, subject)
|
return fmt.Sprintf("%s.%s", stream, subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerateSubjectByClientType ...
|
||||||
|
func GenerateSubjectByClientType(subject string) string {
|
||||||
|
return fmt.Sprintf("%s.%s.%s.%s", globalConfig.StreamName, globalConfig.Server, globalConfig.Service, subject)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue