package natsio import ( "errors" "fmt" jsoniter "github.com/json-iterator/go" "github.com/nats-io/nats.go" ) // Server ... type Server struct { instance *nats.Conn cfg Config } // JetStream ... type JetStream struct { instance nats.JetStreamContext cfg Config streamName string } var ( natsServer Server natsJetStream JetStream // builtin json alternative json = jsoniter.ConfigCompatibleWithStandardLibrary ) // Connect ... func Connect(cfg Config) (*Server, *JetStream, error) { // validate if err := cfg.validate(); err != nil { return nil, nil, err } // connect options opts := make([]nats.Option, 0) // has authentication if cfg.User != "" { opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) } // if it has TLS if cfg.TLS != nil { opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath)) opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath)) } nc, err := nats.Connect(cfg.URL, opts...) if err != nil { msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error()) return nil, nil, errors.New(msg) } // set client natsServer.instance = nc natsServer.cfg = cfg // create jet stream context js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) return nil, nil, errors.New(msg) } natsJetStream.instance = js natsJetStream.cfg = cfg natsJetStream.streamName = cfg.StreamName // add stream natsJetStream.addStream() fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL) return &natsServer, &natsJetStream, nil }