From ad6c1c31c986819f1119628ac23fe2602a56edf7 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 8 Oct 2021 12:00:03 +0700 Subject: [PATCH] add func add stream subjects --- go.mod | 1 + go.sum | 12 ++++++++++++ stream.go | 34 +++++++++++++++++++++++++++++++++- utils.go | 12 ++++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 utils.go diff --git a/go.mod b/go.mod index 90d17b1..ef3b649 100644 --- a/go.mod +++ b/go.mod @@ -7,5 +7,6 @@ require ( github.com/logrusorgru/aurora v2.0.3+incompatible github.com/nats-io/nats-server/v2 v2.6.1 // indirect github.com/nats-io/nats.go v1.13.0 + github.com/thoas/go-funk v0.9.1 google.golang.org/protobuf v1.27.1 // indirect ) diff --git a/go.sum b/go.sum index cc322f2..a4b26ff 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -32,6 +34,13 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= +github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -62,3 +71,6 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/stream.go b/stream.go index 95a0fb3..9809ca0 100644 --- a/stream.go +++ b/stream.go @@ -15,7 +15,7 @@ func GetStreamInfo(name string) (*nats.StreamInfo, error) { // AddStream add new stream, with default config func AddStream(name string, subjects []string) error { // Get info about the stream - stream, _ := natsJS.StreamInfo(name) + stream, _ := GetStreamInfo(name) // If stream not found, create new if stream == nil { _, err := natsJS.AddStream(&nats.StreamConfig{ @@ -31,3 +31,35 @@ func AddStream(name string, subjects []string) error { return nil } + +// DeleteStream ... +func DeleteStream(name string) error { + if err := natsJS.DeleteStream(name); err != nil { + msg := fmt.Sprintf("delete stream error: %s", err.Error()) + return errors.New(msg) + } + return nil +} + +// AddStreamSubjects ... +func AddStreamSubjects(name string, subjects []string) error { + // Get info about the stream + stream, _ := GetStreamInfo(name) + if stream == nil { + msg := fmt.Sprintf("error when adding stream %s subjects: stream not found", name) + return errors.New(msg) + } + + // Merge current and new subjects + newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects) + + _, err := natsJS.UpdateStream(&nats.StreamConfig{ + Name: name, + Subjects: newSubjects, + }) + if err != nil { + msg := fmt.Sprintf("add stream error: %s", err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..59942bc --- /dev/null +++ b/utils.go @@ -0,0 +1,12 @@ +package natsio + +import "github.com/thoas/go-funk" + +// mergeAndUniqueArrayStrings ... +func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string { + var result = make([]string, 0) + result = append(result, arr1...) + result = append(result, arr2...) + result = funk.UniqString(result) + return result +}