update source
This commit is contained in:
		
							parent
							
								
									55aaabf51c
								
							
						
					
					
						commit
						c180bd8489
					
				| 
						 | 
					@ -0,0 +1,8 @@
 | 
				
			||||||
 | 
					# Default ignored files
 | 
				
			||||||
 | 
					/shelf/
 | 
				
			||||||
 | 
					/workspace.xml
 | 
				
			||||||
 | 
					# Editor-based HTTP Client requests
 | 
				
			||||||
 | 
					/httpRequests/
 | 
				
			||||||
 | 
					# Datasource local storage ignored files
 | 
				
			||||||
 | 
					/dataSources/
 | 
				
			||||||
 | 
					/dataSources.local.xml
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,8 @@
 | 
				
			||||||
 | 
					<?xml version="1.0" encoding="UTF-8"?>
 | 
				
			||||||
 | 
					<project version="4">
 | 
				
			||||||
 | 
					  <component name="ProjectModuleManager">
 | 
				
			||||||
 | 
					    <modules>
 | 
				
			||||||
 | 
					      <module fileurl="file://$PROJECT_DIR$/.idea/natsio.iml" filepath="$PROJECT_DIR$/.idea/natsio.iml" />
 | 
				
			||||||
 | 
					    </modules>
 | 
				
			||||||
 | 
					  </component>
 | 
				
			||||||
 | 
					</project>
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,9 @@
 | 
				
			||||||
 | 
					<?xml version="1.0" encoding="UTF-8"?>
 | 
				
			||||||
 | 
					<module type="WEB_MODULE" version="4">
 | 
				
			||||||
 | 
					  <component name="Go" enabled="true" />
 | 
				
			||||||
 | 
					  <component name="NewModuleRootManager">
 | 
				
			||||||
 | 
					    <content url="file://$MODULE_DIR$" />
 | 
				
			||||||
 | 
					    <orderEntry type="inheritedJdk" />
 | 
				
			||||||
 | 
					    <orderEntry type="sourceFolder" forTests="false" />
 | 
				
			||||||
 | 
					  </component>
 | 
				
			||||||
 | 
					</module>
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,6 @@
 | 
				
			||||||
 | 
					<?xml version="1.0" encoding="UTF-8"?>
 | 
				
			||||||
 | 
					<project version="4">
 | 
				
			||||||
 | 
					  <component name="VcsDirectoryMappings">
 | 
				
			||||||
 | 
					    <mapping directory="" vcs="Git" />
 | 
				
			||||||
 | 
					  </component>
 | 
				
			||||||
 | 
					</project>
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,54 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Config ...
 | 
				
			||||||
 | 
					type Config struct {
 | 
				
			||||||
 | 
						// Connect url
 | 
				
			||||||
 | 
						URL string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Auth user
 | 
				
			||||||
 | 
						User string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Auth password
 | 
				
			||||||
 | 
						Password string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// TLS config
 | 
				
			||||||
 | 
						TLS *TLSConfig
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// RequestTimeout
 | 
				
			||||||
 | 
						RequestTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Stream name
 | 
				
			||||||
 | 
						StreamName string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Debug
 | 
				
			||||||
 | 
						Debug bool
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c Config) validate() error {
 | 
				
			||||||
 | 
						if c.URL == "" {
 | 
				
			||||||
 | 
							return errors.New("connect URL is required")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if c.StreamName == "" {
 | 
				
			||||||
 | 
							return errors.New("stream name is required")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TLSConfig ...
 | 
				
			||||||
 | 
					type TLSConfig struct {
 | 
				
			||||||
 | 
						// Cert file
 | 
				
			||||||
 | 
						CertFilePath string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Key file
 | 
				
			||||||
 | 
						KeyFilePath string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Root CA
 | 
				
			||||||
 | 
						RootCAFilePath string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,13 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var AllStreams = struct {
 | 
				
			||||||
 | 
						ChangeStream string
 | 
				
			||||||
 | 
					}{
 | 
				
			||||||
 | 
						ChangeStream: "change_stream",
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var AllServers = struct {
 | 
				
			||||||
 | 
						MongoCDC string
 | 
				
			||||||
 | 
					}{
 | 
				
			||||||
 | 
						MongoCDC: "mongo_cdc",
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,21 @@
 | 
				
			||||||
 | 
					module natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					go 1.18
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					require (
 | 
				
			||||||
 | 
						github.com/json-iterator/go v1.1.12
 | 
				
			||||||
 | 
						github.com/nats-io/nats.go v1.25.0
 | 
				
			||||||
 | 
						github.com/thoas/go-funk v0.9.3
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					require (
 | 
				
			||||||
 | 
						github.com/golang/protobuf v1.5.3 // indirect
 | 
				
			||||||
 | 
						github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
 | 
				
			||||||
 | 
						github.com/modern-go/reflect2 v1.0.2 // indirect
 | 
				
			||||||
 | 
						github.com/nats-io/nats-server/v2 v2.9.16 // indirect
 | 
				
			||||||
 | 
						github.com/nats-io/nkeys v0.4.4 // indirect
 | 
				
			||||||
 | 
						github.com/nats-io/nuid v1.0.1 // indirect
 | 
				
			||||||
 | 
						golang.org/x/crypto v0.8.0 // indirect
 | 
				
			||||||
 | 
						golang.org/x/sys v0.7.0 // indirect
 | 
				
			||||||
 | 
						google.golang.org/protobuf v1.30.0 // indirect
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,46 @@
 | 
				
			||||||
 | 
					github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
				
			||||||
 | 
					github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 | 
				
			||||||
 | 
					github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
				
			||||||
 | 
					github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 | 
				
			||||||
 | 
					github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
 | 
				
			||||||
 | 
					github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 | 
				
			||||||
 | 
					github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 | 
				
			||||||
 | 
					github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 | 
				
			||||||
 | 
					github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 | 
				
			||||||
 | 
					github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 | 
				
			||||||
 | 
					github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU=
 | 
				
			||||||
 | 
					github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
 | 
				
			||||||
 | 
					github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
 | 
				
			||||||
 | 
					github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 | 
				
			||||||
 | 
					github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 | 
				
			||||||
 | 
					github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 | 
				
			||||||
 | 
					github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
 | 
				
			||||||
 | 
					github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc=
 | 
				
			||||||
 | 
					github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0=
 | 
				
			||||||
 | 
					github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
 | 
				
			||||||
 | 
					github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
 | 
				
			||||||
 | 
					github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
 | 
				
			||||||
 | 
					github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
 | 
				
			||||||
 | 
					github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
 | 
				
			||||||
 | 
					github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 | 
				
			||||||
 | 
					github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
				
			||||||
 | 
					github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
				
			||||||
 | 
					github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 | 
				
			||||||
 | 
					github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 | 
				
			||||||
 | 
					github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 | 
				
			||||||
 | 
					github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 | 
				
			||||||
 | 
					github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
 | 
				
			||||||
 | 
					github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
 | 
				
			||||||
 | 
					golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
 | 
				
			||||||
 | 
					golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
 | 
				
			||||||
 | 
					golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
 | 
				
			||||||
 | 
					golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
				
			||||||
 | 
					golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
 | 
				
			||||||
 | 
					golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 | 
				
			||||||
 | 
					gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 | 
				
			||||||
 | 
					gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
 | 
				
			||||||
 | 
					gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,35 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetConsumerInfo ...
 | 
				
			||||||
 | 
					func (js JetStream) GetConsumerInfo(consumerName string) (*nats.ConsumerInfo, error) {
 | 
				
			||||||
 | 
						return js.instance.ConsumerInfo(js.streamName, consumerName)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// AddConsumer ...
 | 
				
			||||||
 | 
					func (js JetStream) AddConsumer(consumerName, filterSubject string) error {
 | 
				
			||||||
 | 
						// get consumer first, return if existed
 | 
				
			||||||
 | 
						consumer, _ := js.GetConsumerInfo(consumerName)
 | 
				
			||||||
 | 
						if consumer != nil {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// add
 | 
				
			||||||
 | 
						_, err := js.instance.AddConsumer(js.streamName, &nats.ConsumerConfig{
 | 
				
			||||||
 | 
							Durable:       consumerName,
 | 
				
			||||||
 | 
							AckPolicy:     nats.AckExplicitPolicy,
 | 
				
			||||||
 | 
							FilterSubject: filterSubject,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] add consumer %s to stream #%s error: %s", consumerName, js.streamName, err.Error())
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,59 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Publish ...
 | 
				
			||||||
 | 
					func (js JetStream) Publish(subject string, payload []byte) error {
 | 
				
			||||||
 | 
						_, err := js.instance.PublishAsync(subject, payload)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] publish message to subject #%s error: %s", subject, err.Error())
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if js.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.JetStream] published a message to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Subscribe ...
 | 
				
			||||||
 | 
					func (js JetStream) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
 | 
				
			||||||
 | 
						sub, err := js.instance.Subscribe(subject, cb)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] subscribe subject %s error: %s", subject, err.Error())
 | 
				
			||||||
 | 
							return nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if js.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.JetStream] subscribe to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return sub, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (js JetStream) PullSubscribe(subject, consumer string) (*nats.Subscription, error) {
 | 
				
			||||||
 | 
						// check if consumer existed
 | 
				
			||||||
 | 
						con, err := js.GetConsumerInfo(consumer)
 | 
				
			||||||
 | 
						if con == nil || err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] pull subscribe consumer %s not existed in stream %s", consumer, js.streamName)
 | 
				
			||||||
 | 
							return nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// pull
 | 
				
			||||||
 | 
						sub, err := js.instance.PullSubscribe(subject, consumer)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] pull subscribe subject #%s - consumer #%s error: %s", subject, consumer, err.Error())
 | 
				
			||||||
 | 
							return nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if js.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.JetStream] pull subscribe to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return sub, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,23 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// QueueSubscribe ...
 | 
				
			||||||
 | 
					func (js JetStream) QueueSubscribe(subject, queueName string, cb nats.MsgHandler) error {
 | 
				
			||||||
 | 
						_, err := js.instance.QueueSubscribe(subject, queueName, cb)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] queue subscribe with subject #%s error: %s", subject, err.Error())
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if js.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.JetStream] queue subscribe to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,72 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func generateStreamConfig(stream string, subjects []string) *nats.StreamConfig {
 | 
				
			||||||
 | 
						cfg := nats.StreamConfig{
 | 
				
			||||||
 | 
							Name:         stream,
 | 
				
			||||||
 | 
							Subjects:     subjects,
 | 
				
			||||||
 | 
							Retention:    nats.WorkQueuePolicy,
 | 
				
			||||||
 | 
							MaxConsumers: -1,
 | 
				
			||||||
 | 
							MaxMsgSize:   -1,
 | 
				
			||||||
 | 
							MaxMsgs:      -1,
 | 
				
			||||||
 | 
							NoAck:        false,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &cfg
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetStreamInfo ...
 | 
				
			||||||
 | 
					func (js JetStream) GetStreamInfo() (*nats.StreamInfo, error) {
 | 
				
			||||||
 | 
						return js.instance.StreamInfo(js.streamName)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// addStream add new stream, with default config
 | 
				
			||||||
 | 
					func (js JetStream) addStream() {
 | 
				
			||||||
 | 
						// get info about the stream
 | 
				
			||||||
 | 
						stream, _ := js.GetStreamInfo()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// if stream not found, create new
 | 
				
			||||||
 | 
						if stream == nil {
 | 
				
			||||||
 | 
							_, err := js.instance.AddStream(generateStreamConfig(js.streamName, []string{}))
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								fmt.Printf("[natsio.JetStream] add stream %s error: %s \n", js.streamName, err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DeleteStream ...
 | 
				
			||||||
 | 
					func (js JetStream) DeleteStream() error {
 | 
				
			||||||
 | 
						if err := js.instance.DeleteStream(js.streamName); err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] delete stream %s error: %s", js.streamName, err.Error())
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// AddSubjects ...
 | 
				
			||||||
 | 
					func (js JetStream) AddSubjects(subjects []string) error {
 | 
				
			||||||
 | 
						// get stream info
 | 
				
			||||||
 | 
						stream, _ := js.GetStreamInfo()
 | 
				
			||||||
 | 
						if stream == nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] error when adding stream #%s subjects: stream not found", js.streamName)
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// merge current and new subjects
 | 
				
			||||||
 | 
						newSubjects := mergeAndUniqueArrayStrings(subjects, stream.Config.Subjects)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// update
 | 
				
			||||||
 | 
						_, err := js.instance.UpdateStream(generateStreamConfig(js.streamName, newSubjects))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.JetStream] add subject to stream #%s error: %s", js.streamName, err.Error())
 | 
				
			||||||
 | 
							return errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,68 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Common
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type IDAndName struct {
 | 
				
			||||||
 | 
						ID   string `json:"_id"`
 | 
				
			||||||
 | 
						Name string `json:"name"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// USER
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type User struct {
 | 
				
			||||||
 | 
						ID     string     `json:"_id"`
 | 
				
			||||||
 | 
						Name   string     `json:"name"`
 | 
				
			||||||
 | 
						Type   string     `json:"type"`
 | 
				
			||||||
 | 
						Avatar *FilePhoto `json:"avatar"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func ParseToUser(data interface{}, result *User) (err error) {
 | 
				
			||||||
 | 
						b := InterfaceToBytes(data)
 | 
				
			||||||
 | 
						if len(b) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(b, result)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							err = errors.New("[natsio.ParseUser] cannot read data")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// FILE PHOTO
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type FilePhoto struct {
 | 
				
			||||||
 | 
						ID         string          `json:"_id"`
 | 
				
			||||||
 | 
						Name       string          `json:"name"`
 | 
				
			||||||
 | 
						Dimensions *FileDimensions `json:"dimensions"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type FileSize struct {
 | 
				
			||||||
 | 
						Width  int    `json:"width"`
 | 
				
			||||||
 | 
						Height int    `json:"height"`
 | 
				
			||||||
 | 
						URL    string `json:"url,omitempty"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type FileDimensions struct {
 | 
				
			||||||
 | 
						Small  *FileSize `json:"sm"`
 | 
				
			||||||
 | 
						Medium *FileSize `json:"md"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func ParseToPhoto(data interface{}, result *FilePhoto) (err error) {
 | 
				
			||||||
 | 
						b := InterfaceToBytes(data)
 | 
				
			||||||
 | 
						if len(b) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(b, result)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							err = errors.New("[natsio.ParsePhoto] cannot read data")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,79 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						jsoniter "github.com/json-iterator/go"
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Server ...
 | 
				
			||||||
 | 
					type Server struct {
 | 
				
			||||||
 | 
						instance *nats.Conn
 | 
				
			||||||
 | 
						cfg      Config
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// JetStream ...
 | 
				
			||||||
 | 
					type JetStream struct {
 | 
				
			||||||
 | 
						instance   nats.JetStreamContext
 | 
				
			||||||
 | 
						cfg        Config
 | 
				
			||||||
 | 
						streamName string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						natsServer    Server
 | 
				
			||||||
 | 
						natsJetStream JetStream
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// builtin json alternative
 | 
				
			||||||
 | 
						json = jsoniter.ConfigCompatibleWithStandardLibrary
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Connect ...
 | 
				
			||||||
 | 
					func Connect(cfg Config) (*Server, *JetStream, error) {
 | 
				
			||||||
 | 
						// validate
 | 
				
			||||||
 | 
						if err := cfg.validate(); err != nil {
 | 
				
			||||||
 | 
							return nil, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// connect options
 | 
				
			||||||
 | 
						opts := make([]nats.Option, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// has authentication
 | 
				
			||||||
 | 
						if cfg.User != "" {
 | 
				
			||||||
 | 
							opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// if it has TLS
 | 
				
			||||||
 | 
						if cfg.TLS != nil {
 | 
				
			||||||
 | 
							opts = append(opts, nats.ClientCert(cfg.TLS.CertFilePath, cfg.TLS.KeyFilePath))
 | 
				
			||||||
 | 
							opts = append(opts, nats.RootCAs(cfg.TLS.RootCAFilePath))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nc, err := nats.Connect(cfg.URL, opts...)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("error when connecting to NATS: %s", err.Error())
 | 
				
			||||||
 | 
							return nil, nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// set client
 | 
				
			||||||
 | 
						natsServer.instance = nc
 | 
				
			||||||
 | 
						natsServer.cfg = cfg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create jet stream context
 | 
				
			||||||
 | 
						js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("error when create NATS JetStream: %s", err.Error())
 | 
				
			||||||
 | 
							return nil, nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						natsJetStream.instance = js
 | 
				
			||||||
 | 
						natsJetStream.cfg = cfg
 | 
				
			||||||
 | 
						natsJetStream.streamName = cfg.StreamName
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// add stream
 | 
				
			||||||
 | 
						natsJetStream.addStream()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fmt.Printf("⚡️[natsio]: connected to %s \n", cfg.URL)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &natsServer, &natsJetStream, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,28 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Node include all necessary things of a client
 | 
				
			||||||
 | 
					type Node struct {
 | 
				
			||||||
 | 
						Sv Server
 | 
				
			||||||
 | 
						Js JetStream
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (n Node) ServerQueueSubscribe(subjectName string, h nats.MsgHandler) {
 | 
				
			||||||
 | 
						queueName := GenerateQueueNameFromSubject(subjectName)
 | 
				
			||||||
 | 
						if _, err := n.Sv.QueueSubscribe(subjectName, queueName, h); err != nil {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Node.ServerQueueSubscribe] error: %s \n", err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (n Node) JetStreamQueueSubscribe(subjectName string, h nats.MsgHandler) {
 | 
				
			||||||
 | 
						queueName := GenerateQueueNameFromSubject(subjectName)
 | 
				
			||||||
 | 
						if err := n.Js.QueueSubscribe(subjectName, queueName, h); err != nil {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Node.ServerQueueSubscribe] error: %s \n", err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,74 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Default timeout 10s
 | 
				
			||||||
 | 
					const requestTimeout = 10 * time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Request ...
 | 
				
			||||||
 | 
					func (s Server) Request(subject string, payload []byte) (*nats.Msg, error) {
 | 
				
			||||||
 | 
						timeout := requestTimeout
 | 
				
			||||||
 | 
						if s.cfg.RequestTimeout > 0 {
 | 
				
			||||||
 | 
							timeout = s.cfg.RequestTimeout
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						msg, err := s.instance.Request(subject, payload, timeout)
 | 
				
			||||||
 | 
						if errors.Is(err, nats.ErrNoResponders) {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Server] no responders for subject: %s \n", subject)
 | 
				
			||||||
 | 
						} else if s.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Server] send request to subject #%s successfully \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return msg, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// RequestWithBindData ...
 | 
				
			||||||
 | 
					func (s Server) RequestWithBindData(subject string, payload []byte, result interface{}) error {
 | 
				
			||||||
 | 
						msg, err := s.Request(subject, payload)
 | 
				
			||||||
 | 
						if msg == nil || err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// map
 | 
				
			||||||
 | 
						return BytesToInterface(msg.Data, result)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Reply ...
 | 
				
			||||||
 | 
					func (s Server) Reply(msg *nats.Msg, payload []byte) error {
 | 
				
			||||||
 | 
						return s.instance.Publish(msg.Reply, payload)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Subscribe ...
 | 
				
			||||||
 | 
					func (s Server) Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error) {
 | 
				
			||||||
 | 
						sub, err := s.instance.Subscribe(subject, cb)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.Server] subscribe subject %s error: %s", subject, err.Error())
 | 
				
			||||||
 | 
							return nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if s.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Server] subscribe to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return sub, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// QueueSubscribe ...
 | 
				
			||||||
 | 
					func (s Server) QueueSubscribe(subject, queue string, cb nats.MsgHandler) (*nats.Subscription, error) {
 | 
				
			||||||
 | 
						sub, err := s.instance.QueueSubscribe(subject, queue, cb)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							msg := fmt.Sprintf("[natsio.Server] queue subscribe subject %s, queue %s error: %s", subject, queue, err.Error())
 | 
				
			||||||
 | 
							return nil, errors.New(msg)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if s.cfg.Debug {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.Server] queue subscribe to subject #%s \n", subject)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return sub, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,14 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type StreamUtilities struct {
 | 
				
			||||||
 | 
						Stream string
 | 
				
			||||||
 | 
						Server string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s StreamUtilities) GenerateReqrepSubject(subject string) string {
 | 
				
			||||||
 | 
						return GenerateReqrepSubject(s.Stream, s.Server, subject)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s StreamUtilities) GenerateJetStreamSubject(subject string) string {
 | 
				
			||||||
 | 
						return GenerateJetStreamSubject(s.Stream, s.Server, subject)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,59 @@
 | 
				
			||||||
 | 
					package natsio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/nats-io/nats.go"
 | 
				
			||||||
 | 
						"github.com/thoas/go-funk"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// mergeAndUniqueArrayStrings ...
 | 
				
			||||||
 | 
					func mergeAndUniqueArrayStrings(arr1, arr2 []string) []string {
 | 
				
			||||||
 | 
						var result = make([]string, 0)
 | 
				
			||||||
 | 
						result = append(result, arr1...)
 | 
				
			||||||
 | 
						result = append(result, arr2...)
 | 
				
			||||||
 | 
						result = funk.UniqString(result)
 | 
				
			||||||
 | 
						return result
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func combineStreamAndSubjectName(stream, subject string) string {
 | 
				
			||||||
 | 
						return fmt.Sprintf("%s.%s", stream, subject)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GenerateJetStreamSubject ...
 | 
				
			||||||
 | 
					func GenerateJetStreamSubject(stream, server, subject string) string {
 | 
				
			||||||
 | 
						return fmt.Sprintf("%s.jetstream.%s.%s", stream, server, subject)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GenerateReqrepSubject ...
 | 
				
			||||||
 | 
					func GenerateReqrepSubject(stream, server, subject string) string {
 | 
				
			||||||
 | 
						return fmt.Sprintf("%s.reqrep.%s.%s", stream, server, subject)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func GenerateQueueNameFromSubject(subject string) string {
 | 
				
			||||||
 | 
						return strings.ReplaceAll(subject, ".", "_")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// InterfaceToBytes ...
 | 
				
			||||||
 | 
					func InterfaceToBytes(data interface{}) []byte {
 | 
				
			||||||
 | 
						b, err := json.Marshal(data)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.InterfaceToBytes] error: %v with data: %v\n", err, data)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return b
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func BytesToInterface(b []byte, dest interface{}) error {
 | 
				
			||||||
 | 
						err := json.Unmarshal(b, &dest)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.BytesToInterface] error: %v with data: %s\n", err, string(b))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func MsgRespond(msg *nats.Msg, data interface{}) {
 | 
				
			||||||
 | 
						if err := msg.Respond(InterfaceToBytes(data)); err != nil {
 | 
				
			||||||
 | 
							fmt.Printf("[natsio.MsgRespond] error when response msg %s", msg.Reply)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
		Reference in New Issue