This commit is contained in:
Nam Huynh 2021-12-02 10:29:52 +07:00
parent 6664e4ef98
commit 04a8b182fb
8 changed files with 122 additions and 103 deletions

32
jetstream_pubsub.go Normal file
View File

@ -0,0 +1,32 @@
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// Publish ...
func (js JetStream) Publish(stream, subject string, payload []byte) error {
channel := combineStreamAndSubjectName(stream, subject)
_, err := js.instance.PublishAsync(channel, payload)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - publish message to subject %s error: %s", channel, err.Error())
return errors.New(msg)
}
return nil
}
// Subscribe ...
func (js JetStream) Subscribe(stream, subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
channel := combineStreamAndSubjectName(stream, subject)
sub, err := js.instance.Subscribe(channel, cb)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - subscribe subject %s error: %s", channel, err.Error())
return nil, errors.New(msg)
}
return sub, nil
}

20
jetstream_queue.go Normal file
View File

@ -0,0 +1,20 @@
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// QueueSubscribe ...
func (js JetStream) QueueSubscribe(stream, subject, queueName string, cb nats.MsgHandler) error {
channel := combineStreamAndSubjectName(stream, subject)
_, err := js.instance.QueueSubscribe(channel, queueName, cb)
if err != nil {
msg := fmt.Sprintf("[NATS JETSTREAM] - queue subscribe with subject %s error: %s", channel, err.Error())
return errors.New(msg)
}
return nil
}

View File

@ -8,21 +8,21 @@ import (
) )
// GetStreamInfo ... // GetStreamInfo ...
func GetStreamInfo(name string) (*nats.StreamInfo, error) { func (js JetStream) GetStreamInfo(name string) (*nats.StreamInfo, error) {
return natsJS.StreamInfo(name) return js.instance.StreamInfo(name)
} }
// AddStream add new stream, with default config // AddStream add new stream, with default config
// Due to subject must have a unique name, subject name will be combined with stream name // Due to subject must have a unique name, subject name will be combined with stream name
// E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1 // E.g: stream name is "DEMO", subject name is "Subject-1", so final name in NATS will be: DEMO.Subject-1
func AddStream(name string, subjects []string) error { func (js JetStream) AddStream(name string, subjects []string) error {
// Get info about the stream // Get info about the stream
stream, _ := GetStreamInfo(name) stream, _ := js.GetStreamInfo(name)
// If stream not found, create new // If stream not found, create new
if stream == nil { if stream == nil {
subjectNames := generateSubjectNames(name, subjects) subjectNames := generateSubjectNames(name, subjects)
_, err := natsJS.AddStream(&nats.StreamConfig{ _, err := js.instance.AddStream(&nats.StreamConfig{
Name: name, Name: name,
Subjects: subjectNames, Subjects: subjectNames,
Retention: nats.WorkQueuePolicy, Retention: nats.WorkQueuePolicy,
@ -32,7 +32,7 @@ func AddStream(name string, subjects []string) error {
NoAck: false, NoAck: false,
}) })
if err != nil { if err != nil {
msg := fmt.Sprintf("add stream error: %s", err.Error()) msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
return errors.New(msg) return errors.New(msg)
} }
} }
@ -41,20 +41,20 @@ func AddStream(name string, subjects []string) error {
} }
// DeleteStream ... // DeleteStream ...
func DeleteStream(name string) error { func (js JetStream) DeleteStream(name string) error {
if err := natsJS.DeleteStream(name); err != nil { if err := js.instance.DeleteStream(name); err != nil {
msg := fmt.Sprintf("delete stream error: %s", err.Error()) msg := fmt.Sprintf("[NATS JETSTREAM] - delete stream error: %s", err.Error())
return errors.New(msg) return errors.New(msg)
} }
return nil return nil
} }
// AddStreamSubjects ... // AddStreamSubjects ...
func AddStreamSubjects(name string, subjects []string) error { func (js JetStream) AddStreamSubjects(name string, subjects []string) error {
// Get info about the stream // Get info about the stream
stream, _ := GetStreamInfo(name) stream, _ := js.GetStreamInfo(name)
if stream == nil { if stream == nil {
msg := fmt.Sprintf("error when adding stream %s subjects: stream not found", name) msg := fmt.Sprintf("[NATS JETSTREAM] - error when adding stream %s subjects: stream not found", name)
return errors.New(msg) return errors.New(msg)
} }
@ -62,12 +62,12 @@ func AddStreamSubjects(name string, subjects []string) error {
subjectNames := generateSubjectNames(name, subjects) subjectNames := generateSubjectNames(name, subjects)
newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects) newSubjects := mergeAndUniqueArrayStrings(subjectNames, stream.Config.Subjects)
_, err := natsJS.UpdateStream(&nats.StreamConfig{ _, err := js.instance.UpdateStream(&nats.StreamConfig{
Name: name, Name: name,
Subjects: newSubjects, Subjects: newSubjects,
}) })
if err != nil { if err != nil {
msg := fmt.Sprintf("add stream error: %s", err.Error()) msg := fmt.Sprintf("[NATS JETSTREAM] - add stream error: %s", err.Error())
return errors.New(msg) return errors.New(msg)
} }
return nil return nil

View File

@ -8,9 +8,19 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// Server ...
type Server struct {
instance *nats.Conn
}
// JetStream ...
type JetStream struct {
instance nats.JetStreamContext
}
var ( var (
natsClient *nats.Conn natsServer Server
natsJS nats.JetStreamContext natsJetStream JetStream
) )
// Connect ... // Connect ...
@ -42,25 +52,25 @@ func Connect(cfg Config) error {
fmt.Println(aurora.Green("*** CONNECTED TO NATS: " + cfg.URL)) fmt.Println(aurora.Green("*** CONNECTED TO NATS: " + cfg.URL))
// Set client // Set client
natsClient = nc natsServer.instance = nc
// Create jet stream context // Create jet stream context
js, err := natsClient.JetStream(nats.PublishAsyncMaxPending(256)) js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil { if err != nil {
msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
return errors.New(msg) return errors.New(msg)
} }
natsJS = js natsJetStream.instance = js
return nil return nil
} }
// GetClient ... // GetServer ...
func GetClient() *nats.Conn { func GetServer() Server {
return natsClient return natsServer
} }
// GetJetStream ... // GetJetStream ...
func GetJetStream() nats.JetStreamContext { func GetJetStream() JetStream {
return natsJS return natsJetStream
} }

View File

@ -1,44 +0,0 @@
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// Publish ...
func Publish(stream, subject string, payload []byte) error {
channel := combineStreamAndSubjectName(stream, subject)
_, err := natsJS.PublishAsync(channel, payload)
if err != nil {
msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error())
return errors.New(msg)
}
return nil
}
// PublishWithResponse ...
func PublishWithResponse(stream, subject string, payload []byte) (*nats.Msg, error) {
channel := combineStreamAndSubjectName(stream, subject)
resp, err := natsJS.PublishAsync(channel, payload)
if err != nil {
msg := fmt.Sprintf("publish message to subject %s error: %s", channel, err.Error())
return nil, errors.New(msg)
}
return resp.Msg(), nil
}
// Subscribe ...
func Subscribe(stream, subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
channel := combineStreamAndSubjectName(stream, subject)
sub, err := natsJS.Subscribe(channel, cb)
if err != nil {
msg := fmt.Sprintf("subscribe subject %s error: %s", channel, err.Error())
return nil, errors.New(msg)
}
return sub, nil
}

View File

@ -1,20 +0,0 @@
package natsio
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
)
// QueueSubscribe ...
func QueueSubscribe(stream, subject, queueName string, cb nats.MsgHandler) error {
channel := combineStreamAndSubjectName(stream, subject)
_, err := natsJS.QueueSubscribe(channel, queueName, cb)
if err != nil {
msg := fmt.Sprintf("queue subscribe with subject %s error: %s", channel, err.Error())
return errors.New(msg)
}
return nil
}

View File

@ -1,15 +0,0 @@
package natsio
import (
"time"
"github.com/nats-io/nats.go"
)
// Default timeout 10s
const requestTimeout = 10 * time.Second
// Request ...
func Request(subject string, payload []byte) (*nats.Msg, error) {
return natsClient.Request(subject, payload, requestTimeout)
}

36
server_reqres.go Normal file
View File

@ -0,0 +1,36 @@
package natsio
import (
"errors"
"fmt"
"time"
"github.com/nats-io/nats.go"
)
// Default timeout 10s
const requestTimeout = 10 * time.Second
// Request ...
func (sv Server) Request(subject string, payload []byte) (*nats.Msg, error) {
return sv.instance.Request(subject, payload, requestTimeout)
}
// Reply ...
func (sv Server) Reply(msg *nats.Msg, payload []byte) error {
err := sv.instance.Publish(msg.Reply, payload)
// Ack message
msg.Ack()
return err
}
// Subscribe ...
func (sv Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
sub, err := sv.instance.Subscribe(subject, cb)
if err != nil {
msg := fmt.Sprintf("[NATS SERVER] - subscribe subject %s error: %s", subject, err.Error())
return nil, errors.New(msg)
}
return sub, nil
}