package natsio import ( "errors" "fmt" "github.com/nats-io/nats.go" ) // Server ... type Server struct { instance *nats.Conn } // JetStream ... type JetStream struct { instance nats.JetStreamContext streamName string } var ( natsServer Server natsJetStream JetStream globalConfig Config ) // Connect ... func Connect(cfg Config) error { // validate if err := cfg.validate(); err != nil { return err } // connect options opts := make([]nats.Option, 0) // has authentication if cfg.User != "" { opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) } // if it has TLS if cfg.TLS != nil { opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath)) opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath)) } nc, err := nats.Connect(cfg.URL, opts...) if err != nil { msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error()) return errors.New(msg) } // set client natsServer.instance = nc // create jet stream context js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) return errors.New(msg) } natsJetStream.instance = js natsJetStream.streamName = cfg.StreamName // assign client type globalConfig = cfg fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) return nil } // GetServer ... func GetServer() Server { return natsServer } // GetJetStream ... func GetJetStream() JetStream { return natsJetStream }