From 5e55d86e7eba644f3727e47965bb24d7e93b9146 Mon Sep 17 00:00:00 2001 From: Nam Huynh Date: Fri, 8 Oct 2021 11:23:59 +0700 Subject: [PATCH] init --- .gitignore | 2 ++ config.go | 28 ++++++++++++++++++++++++ go.mod | 11 ++++++++++ go.sum | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ natsio.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++ pubsub.go | 30 +++++++++++++++++++++++++ queue.go | 18 +++++++++++++++ stream.go | 33 ++++++++++++++++++++++++++++ 8 files changed, 247 insertions(+) create mode 100644 config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 natsio.go create mode 100644 pubsub.go create mode 100644 queue.go create mode 100644 stream.go diff --git a/.gitignore b/.gitignore index 66fd13c..bbfd2cb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +.idea \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..295f751 --- /dev/null +++ b/config.go @@ -0,0 +1,28 @@ +package natsio + +// Config ... +type Config struct { + // Connect url + URL string + + // Auth user + User string + + // Auth password + Password string + + // TLS config + TLS *TLSConfig +} + +// TLSConfig ... +type TLSConfig struct { + // Cert file + CertFilePath string + + // Key file + KeyFilePath string + + // Root CA + RootCAFilePath string +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..90d17b1 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/Selly-Modules/natsio + +go 1.16 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/logrusorgru/aurora v2.0.3+incompatible + github.com/nats-io/nats-server/v2 v2.6.1 // indirect + github.com/nats-io/nats.go v1.13.0 + google.golang.org/protobuf v1.27.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cc322f2 --- /dev/null +++ b/go.sum @@ -0,0 +1,64 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= +github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= +github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/natsio.go b/natsio.go new file mode 100644 index 0000000..3692fbf --- /dev/null +++ b/natsio.go @@ -0,0 +1,61 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/logrusorgru/aurora" + "github.com/nats-io/nats.go" +) + +var ( + natsClient *nats.Conn + natsJS nats.JetStreamContext +) + +// Connect ... +func Connect(cfg Config) error { + if cfg.URL == "" { + return errors.New("connect URL is required") + } + + // Connect options + opts := make([]nats.Option, 0) + + // Has authentication + if cfg.User != "" { + opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) + } + + // If it has TLS + if cfg.TLS != nil { + opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath)) + opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath)) + } + + nc, err := nats.Connect(cfg.URL, opts...) + if err != nil { + msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error()) + return errors.New(msg) + } + + fmt.Println(aurora.Green("*** CONNECTED TO NATS: " + cfg.URL)) + + // Set client + natsClient = nc + + // Create jet stream context + js, err := natsClient.JetStream(nats.PublishAsyncMaxPending(256)) + if err != nil { + msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error()) + return errors.New(msg) + } + natsJS = js + + return nil +} + +// GetJetStream ... +func GetJetStream() nats.JetStreamContext { + return natsJS +} diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 0000000..dea03b0 --- /dev/null +++ b/pubsub.go @@ -0,0 +1,30 @@ +package natsio + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// Publish ... +func Publish(subject string, data interface{}) error { + b, _ := json.Marshal(data) + _, err := natsJS.PublishAsync(subject, b) + if err != nil { + msg := fmt.Sprintf("publish message error: %s", err.Error()) + return errors.New(msg) + } + return nil +} + +// Subscribe ... +func Subscribe(subject string, cb nats.MsgHandler) error { + _, err := natsJS.Subscribe(subject, cb) + if err != nil { + msg := fmt.Sprintf("subscribe subject %s error: %s", subject, err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..1730a94 --- /dev/null +++ b/queue.go @@ -0,0 +1,18 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// QueueSubscribe ... +func QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error { + _, err := natsJS.QueueSubscribe(subject, queueName, cb) + if err != nil { + msg := fmt.Sprintf("queue subscribe with subject %s error: %s", subject, err.Error()) + return errors.New(msg) + } + return nil +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..45ac2c4 --- /dev/null +++ b/stream.go @@ -0,0 +1,33 @@ +package natsio + +import ( + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +// AddStream add new stream, with default config +func AddStream(name string, subjects []string) error { + // Get info about the stream + stream, err := natsJS.StreamInfo(name) + if err != nil { + msg := fmt.Sprintf("error getting stream info: %s", err.Error()) + return errors.New(msg) + } + + // If stream not found, create new + if stream == nil { + _, err = natsJS.AddStream(&nats.StreamConfig{ + Name: name, + Subjects: subjects, + Storage: nats.FileStorage, + }) + if err != nil { + msg := fmt.Sprintf("add stream error: %s", err.Error()) + return errors.New(msg) + } + } + + return nil +}