natsio/natsio.go

80 lines
1.6 KiB
Go
Raw Normal View History

2021-10-08 04:23:59 +00:00
package natsio
import (
"errors"
"fmt"
2022-12-04 15:10:05 +00:00
jsoniter "github.com/json-iterator/go"
2021-10-08 04:23:59 +00:00
"github.com/nats-io/nats.go"
)
2021-12-02 03:29:52 +00:00
// Server ...
type Server struct {
instance *nats.Conn
2022-12-15 11:04:03 +00:00
cfg Config
2021-12-02 03:29:52 +00:00
}
// JetStream ...
type JetStream struct {
2022-12-04 15:10:05 +00:00
instance nats.JetStreamContext
2022-12-15 11:04:03 +00:00
cfg Config
2022-12-04 15:10:05 +00:00
streamName string
2021-12-02 03:29:52 +00:00
}
2021-10-08 04:23:59 +00:00
var (
2021-12-02 03:29:52 +00:00
natsServer Server
natsJetStream JetStream
// builtin json alternative
json = jsoniter.ConfigCompatibleWithStandardLibrary
2021-10-08 04:23:59 +00:00
)
// Connect ...
2022-12-05 03:19:37 +00:00
func Connect(cfg Config) (*Server, *JetStream, error) {
2022-12-04 15:21:30 +00:00
// validate
if err := cfg.validate(); err != nil {
2022-12-05 03:19:37 +00:00
return nil, nil, err
2021-10-08 04:23:59 +00:00
}
2022-12-04 15:21:30 +00:00
// connect options
2021-10-08 04:23:59 +00:00
opts := make([]nats.Option, 0)
2022-12-04 15:21:30 +00:00
// has authentication
2021-10-08 04:23:59 +00:00
if cfg.User != "" {
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
}
2022-12-04 15:21:30 +00:00
// if it has TLS
2021-10-08 04:23:59 +00:00
if cfg.TLS != nil {
opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
}
nc, err := nats.Connect(cfg.URL, opts...)
if err != nil {
msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error())
2022-12-05 03:19:37 +00:00
return nil, nil, errors.New(msg)
2021-10-08 04:23:59 +00:00
}
2022-12-04 15:21:30 +00:00
// set client
2021-12-02 03:29:52 +00:00
natsServer.instance = nc
2022-12-15 11:04:03 +00:00
natsServer.cfg = cfg
2021-10-08 04:23:59 +00:00
2022-12-04 15:21:30 +00:00
// create jet stream context
2021-12-02 03:29:52 +00:00
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
2021-10-08 04:23:59 +00:00
if err != nil {
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
2022-12-05 03:19:37 +00:00
return nil, nil, errors.New(msg)
2021-10-08 04:23:59 +00:00
}
2021-12-02 03:29:52 +00:00
natsJetStream.instance = js
2022-12-15 11:04:03 +00:00
natsJetStream.cfg = cfg
2022-12-04 15:10:05 +00:00
natsJetStream.streamName = cfg.StreamName
2021-10-08 04:23:59 +00:00
2022-12-15 11:04:03 +00:00
// add stream
natsJetStream.addStream()
2022-12-04 15:21:30 +00:00
fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
2022-12-05 03:19:37 +00:00
return &natsServer, &natsJetStream, nil
2021-10-08 04:23:59 +00:00
}