88 lines
1.6 KiB
Go
88 lines
1.6 KiB
Go
package natsio
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/logrusorgru/aurora"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// Server ...
|
|
type Server struct {
|
|
instance *nats.Conn
|
|
}
|
|
|
|
// JetStream ...
|
|
type JetStream struct {
|
|
instance nats.JetStreamContext
|
|
}
|
|
|
|
var (
|
|
natsServer Server
|
|
natsJetStream JetStream
|
|
|
|
// FIXME: delete this
|
|
jsPublic nats.JetStreamContext
|
|
)
|
|
|
|
// Connect ...
|
|
func Connect(cfg Config) error {
|
|
if cfg.URL == "" {
|
|
return errors.New("connect URL is required")
|
|
}
|
|
|
|
// Connect options
|
|
opts := make([]nats.Option, 0)
|
|
|
|
// Has authentication
|
|
if cfg.User != "" {
|
|
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
|
|
}
|
|
|
|
// If it has TLS
|
|
if cfg.TLS != nil {
|
|
opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
|
|
opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
|
|
}
|
|
|
|
nc, err := nats.Connect(cfg.URL, opts...)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error())
|
|
return errors.New(msg)
|
|
}
|
|
|
|
fmt.Println(aurora.Green("*** CONNECTED TO NATS: " + cfg.URL))
|
|
|
|
// Set client
|
|
natsServer.instance = nc
|
|
|
|
// Create jet stream context
|
|
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
|
|
return errors.New(msg)
|
|
}
|
|
natsJetStream.instance = js
|
|
|
|
// FIXME: delete this
|
|
jsPublic = js
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetServer ...
|
|
func GetServer() Server {
|
|
return natsServer
|
|
}
|
|
|
|
// GetJetStream ...
|
|
func GetJetStream() JetStream {
|
|
return natsJetStream
|
|
}
|
|
|
|
// GetJSPublic ...
|
|
func GetJSPublic() nats.JetStreamContext {
|
|
return jsPublic
|
|
}
|