natsio/natsio.go

83 lines
1.5 KiB
Go

package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// Server ...
type Server struct {
instance *nats.Conn
}
// JetStream ...
type JetStream struct {
instance nats.JetStreamContext
streamName string
}
var (
natsServer Server
natsJetStream JetStream
globalConfig Config
)
// Connect ...
func Connect(cfg Config) error {
// validate
if err := cfg.validate(); err != nil {
return err
}
// connect options
opts := make([]nats.Option, 0)
// has authentication
if cfg.User != "" {
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
}
// if it has TLS
if cfg.TLS != nil {
opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
}
nc, err := nats.Connect(cfg.URL, opts...)
if err != nil {
msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error())
return errors.New(msg)
}
// set client
natsServer.instance = nc
// create jet stream context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
return errors.New(msg)
}
natsJetStream.instance = js
natsJetStream.streamName = cfg.StreamName
// assign client type
globalConfig = cfg
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
return nil
}
// GetServer ...
func GetServer() Server {
return natsServer
}
// GetJetStream ...
func GetJetStream() JetStream {
return natsJetStream
}