diff --git a/config.go b/config.go index 1da9939..d084b3c 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,9 @@ package natsio -import "time" +import ( + "errors" + "time" +) // Config ... type Config struct { @@ -21,6 +24,24 @@ type Config struct { // Stream name StreamName string + + // Server + Server string + + // Service + Service string +} + +func (c Config) validate() error { + if c.URL == "" { + return errors.New("connect URL is required") + } + + if c.Server == "" || c.Service == "" { + return errors.New("server and service name is required") + } + + return nil } // TLSConfig ... diff --git a/natsio.go b/natsio.go index f2375b1..169a675 100644 --- a/natsio.go +++ b/natsio.go @@ -10,7 +10,6 @@ import ( // Server ... type Server struct { instance *nats.Conn - Config Config } // JetStream ... @@ -22,23 +21,25 @@ type JetStream struct { var ( natsServer Server natsJetStream JetStream + globalConfig Config ) // Connect ... func Connect(cfg Config) error { - if cfg.URL == "" { - return errors.New("connect URL is required") + // validate + if err := cfg.validate(); err != nil { + return err } - // Connect options + // connect options opts := make([]nats.Option, 0) - // Has authentication + // has authentication if cfg.User != "" { opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) } - // If it has TLS + // if it has TLS if cfg.TLS != nil { opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath)) opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath)) @@ -50,13 +51,10 @@ func Connect(cfg Config) error { return errors.New(msg) } - fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) - - // Set client + // set client natsServer.instance = nc - natsServer.Config = cfg - // Create jet stream context + // create jet stream context js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) @@ -65,6 +63,11 @@ func Connect(cfg Config) error { natsJetStream.instance = js natsJetStream.streamName = cfg.StreamName + // assign client type + globalConfig = cfg + + fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) + return nil } diff --git a/server_reqres.go b/server_reqres.go index 335b4a4..b155b16 100644 --- a/server_reqres.go +++ b/server_reqres.go @@ -15,8 +15,8 @@ const requestTimeout = 10 * time.Second // Request ... func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) { timeout := requestTimeout - if sv.Config.RequestTimeout > 0 { - timeout = sv.Config.RequestTimeout + if globalConfig.RequestTimeout > 0 { + timeout = globalConfig.RequestTimeout } msg, err := sv.instance.Request(subject, payload, timeout) if errors.Is(err, nats.ErrNoResponders) { diff --git a/utils.go b/utils.go index 8ffd670..9a8ab42 100644 --- a/utils.go +++ b/utils.go @@ -28,3 +28,8 @@ func generateSubjectNames(streamName string, subjects []string) []string { func combineStreamAndSubjectName(stream, subject string) string { return fmt.Sprintf("%s.%s", stream, subject) } + +// GenerateSubjectByClientType ... +func GenerateSubjectByClientType(subject string) string { + return fmt.Sprintf("%s.%s.%s.%s", globalConfig.StreamName, globalConfig.Server, globalConfig.Service, subject) +}