This commit is contained in:
namhq1989 2022-12-15 17:34:47 +07:00
parent cdbe3b80e2
commit 685d324788
8 changed files with 84 additions and 85 deletions

View File

@ -24,6 +24,9 @@ type Config struct {
// Stream name
StreamName string
// Debug
Debug bool
}
func (c Config) validate() error {
@ -31,6 +34,10 @@ func (c Config) validate() error {
return errors.New("connect URL is required")
}
if c.StreamName == "" {
return errors.New("stream name is required")
}
return nil
}

View File

@ -8,27 +8,27 @@ import (
)
// GetConsumerInfo ...
func (js JetStream) GetConsumerInfo(name string) (*nats.ConsumerInfo, error) {
return js.instance.ConsumerInfo(js.streamName, name)
func (js JetStream) GetConsumerInfo(consumerName string) (*nats.ConsumerInfo, error) {
return js.instance.ConsumerInfo(js.streamName, consumerName)
}
// AddConsumer ...
func (js JetStream) AddConsumer(name, filterSubject string) error {
// Get consumer first, return if existed
consumer, _ := js.GetConsumerInfo(name)
func (js JetStream) AddConsumer(consumerName, filterSubject string) error {
// get consumer first, return if existed
consumer, _ := js.GetConsumerInfo(consumerName)
if consumer != nil {
return nil
}
// Add
// add
_, err := js.instance.AddConsumer(js.streamName, &nats.ConsumerConfig{
Durable: name,
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: filterSubject,
})
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - add consumer %s for stream #%s error: %s", name, js.streamName, err.Error())
msg := fmt.Sprintf("[natsio.JetStream] add consumer %s to stream #%s error: %s", consumerName, js.streamName, err.Error())
return errors.New(msg)
}
return nil

View File

@ -3,62 +3,57 @@ package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// Publish ...
func (js JetStream) Publish(subject string, payload []byte) error {
channel := combineStreamAndSubjectName(js.streamName, subject)
_, err := js.instance.PublishAsync(channel, payload)
_, err := js.instance.PublishAsync(subject, payload)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject #%s error: %s", channel, err.Error())
msg := fmt.Sprintf("[natsio.JetStream] publish message to subject #%s error: %s", subject, err.Error())
return errors.New(msg)
}
if js.debug {
fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject)
}
return nil
}
// Subscribe ...
func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
channel := combineStreamAndSubjectName(js.streamName, subject)
sub, err := js.instance.Subscribe(channel, cb)
sub, err := js.instance.Subscribe(subject, cb)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - subscribe subject %s error: %s", channel, err.Error())
msg := fmt.Sprintf("[natsio.JetStream] subscribe subject %s error: %s", subject, err.Error())
return nil, errors.New(msg)
}
if js.debug {
fmt.Printf("[natsio.JetStream] subscribe new message from subject #%s \n", subject)
}
return sub, nil
}
// PullSubscribe ...
//
// Example:
//
// js := natsio.GetJetStream()
//
// sub, err := js.PullSubscribe("A_SUBJECT", "A_SUBJECT", "A_CONSUMER")
//
// for {
// messages, err := sub.Fetch(10)
// // process each messages
// }
func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, error) {
channel := combineStreamAndSubjectName(js.streamName, subject)
// Check if consumer existed
// check if consumer existed
con, err := js.GetConsumerInfo(consumer)
if con == nil || err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe consumer %s not existed in stream %s", consumer, js.streamName)
msg := fmt.Sprintf("[natsio.JetStream] pull subscribe consumer %s not existed in stream %s", consumer, js.streamName)
return nil, errors.New(msg)
}
// Pull
sub, err := js.instance.PullSubscribe(channel, consumer)
// pull
sub, err := js.instance.PullSubscribe(subject, consumer)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - pull subscribe subject #%s - consumer #%s error: %s", channel, consumer, err.Error())
msg := fmt.Sprintf("[natsio.JetStream] pull subscribe subject #%s - consumer #%s error: %s", subject, consumer, err.Error())
return nil, errors.New(msg)
}
if js.debug {
fmt.Printf("[natsio.JetStream] pull subscribe new message from subject #%s \n", subject)
}
return sub, nil
}

View File

@ -9,12 +9,15 @@ import (
// QueueSubscribe ...
func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error {
channel := combineStreamAndSubjectName(js.streamName, subject)
_, err := js.instance.QueueSubscribe(channel, queueName, cb)
_, err := js.instance.QueueSubscribe(subject, queueName, cb)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject #%s error: %s", channel, err.Error())
msg := fmt.Sprintf("[natsio.JetStream] queue subscribe with subject #%s error: %s", subject, err.Error())
return errors.New(msg)
}
if js.debug {
fmt.Printf("[natsio.JetStream] queue subscribe new message from subject #%s \n", subject)
}
return nil
}

View File

@ -25,52 +25,48 @@ func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) {
return js.instance.StreamInfo(js.streamName)
}
// AddStream add new stream, with default config
// Due to subject must have a unique name, subject name will be combined with stream name
// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1
func (js JetStream) AddStream(subjects []string) error {
// Get info about the stream
// addStream add new stream, with default config
func (js JetStream) addStream() {
// get info about the stream
stream, _ := js.GetStreamInfo()
// If stream not found, create new
// if stream not found, create new
if stream == nil {
subjectNames := generateSubjectNames(js.streamName, subjects)
_, err := js.instance.AddStream(generateStreamConfig(js.streamName, subjectNames))
_, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{}))
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
return errors.New(msg)
fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error())
}
}
return nil
}
// DeleteStream ...
func (js JetStream) DeleteStream() error {
if err := js.instance.DeleteStream(js.streamName); err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error())
msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error())
return errors.New(msg)
}
return nil
}
// AddStreamSubjects ...
func (js JetStream) AddStreamSubjects(subjects []string) error {
// Get info about the stream
// AddSubjects ...
func (js JetStream) AddSubjects(subjects []string) error {
// get stream info
stream, _ := js.GetStreamInfo()
if stream == nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", js.streamName)
msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName)
return errors.New(msg)
}
// Merge current and new subjects
subjectNames := generateSubjectNames(js.streamName, subjects)
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
// merge current and new subjects
newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects)
// update
_, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects))
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error())
return errors.New(msg)
}
return nil
}

View File

@ -10,11 +10,13 @@ import (
// Server ...
type Server struct {
instance *nats.Conn
debug bool
}
// JetStream ...
type JetStream struct {
instance nats.JetStreamContext
debug bool
streamName string
}
@ -53,6 +55,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) {
// set client
natsServer.instance = nc
natsServer.debug = cfg.Debug
// create jet stream context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
@ -61,6 +64,7 @@ func Connect(cfg Config) (*Server, *JetStream, error) {
return nil, nil, errors.New(msg)
}
natsJetStream.instance = js
natsJetStream.debug = cfg.Debug
natsJetStream.streamName = cfg.StreamName
// assign client type

View File

@ -3,7 +3,6 @@ package natsio
import (
"errors"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
@ -13,21 +12,24 @@ import (
const requestTimeout = 10 * time.Second
// Request ...
func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) {
timeout := requestTimeout
if globalConfig.RequestTimeout > 0 {
timeout = globalConfig.RequestTimeout
}
msg, err := sv.instance.Request(subject, payload, timeout)
msg, err := s.instance.Request(subject, payload, timeout)
if errors.Is(err, nats.ErrNoResponders) {
log.Printf("[NATS SERVER]: request - no responders for subject: %s", subject)
fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject)
} else if s.debug {
fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject)
}
return msg, err
}
// RequestWithBindData ...
func (sv Server) RequestWithBindData(subject string, payload []byte, result interface{}) error {
msg, err := sv.Request(subject, payload)
func (s Server) RequestWithBindData(subject string, payload []byte, result interface{}) error {
msg, err := s.Request(subject, payload)
if msg == nil || err != nil {
return err
}
@ -37,26 +39,28 @@ func (sv Server) RequestWithBindData(subject string, payload []byte, result inte
}
// Reply ...
func (sv Server) Reply(msg *nats.Msg, payload []byte) error {
return sv.instance.Publish(msg.Reply, payload)
func (s Server) Reply(msg *nats.Msg, payload []byte) error {
return s.instance.Publish(msg.Reply, payload)
}
// Subscribe ...
func (sv Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
sub, err := sv.instance.Subscribe(subject, cb)
func (s Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
sub, err := s.instance.Subscribe(subject, cb)
if err != nil {
msg := fmt.Sprintf("[NATS SERVER] - subscribe subject %s error: %s", subject, err.Error())
msg := fmt.Sprintf("[natsio.Server] subscribe subject %s error: %s", subject, err.Error())
return nil, errors.New(msg)
}
return sub, nil
}
// QueueSubscribe ...
func (sv Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) {
sub, err := sv.instance.QueueSubscribe(subject, queue, cb)
func (s Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) {
sub, err := s.instance.QueueSubscribe(subject, queue, cb)
if err != nil {
msg := fmt.Sprintf("[NATS SERVER] - queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error())
msg := fmt.Sprintf("[natsio.Server] queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error())
return nil, errors.New(msg)
}
return sub, nil
}

View File

@ -16,16 +16,6 @@ func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string {
return result
}
// generateSubjectNames ...
func generateSubjectNames(streamName string, subjects []string) []string {
var result = make([]string, 0)
for _, subject := range subjects {
name := combineStreamAndSubjectName(streamName, subject)
result = append(result, name)
}
return result
}
func combineStreamAndSubjectName(stream, subject string) string {
return fmt.Sprintf("%s.%s", stream, subject)
}