diff --git a/.gitignore b/.gitignore index 66fd13c..bbfd2cb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +.idea \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0a02477 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/Selly-Modules/rabbitmq + +go 1.16 + +require ( + github.com/logrusorgru/aurora v2.0.3+incompatible + github.com/streadway/amqp v1.0.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b49a65f --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= +github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/log.go b/log.go new file mode 100644 index 0000000..7f2d2d6 --- /dev/null +++ b/log.go @@ -0,0 +1,24 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/logrusorgru/aurora" +) + +// NewMessageLog ... +func NewMessageLog(queueName string) { + fmt.Println(aurora.BgBlue("*** New message from queue: " + queueName)) +} + +// PublishErrorLog ... +func PublishErrorLog(queueName string, err error) { + fmt.Println(aurora.Red( + fmt.Sprintf( + "*** Error when publish message to %s: %s", + queueName, + err.Error(), + ), + ), + ) +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..aef801c --- /dev/null +++ b/publisher.go @@ -0,0 +1,32 @@ +package rabbitmq + +import ( + "encoding/json" + + "github.com/streadway/amqp" +) + +// Publish new message to channel +func Publish(queueName string, data interface{}) error { + byteData, _ := json.Marshal(data) + + // Set publishing data + message := generateData(byteData) + + // Send + return channel.Publish( + "", // exchange + queueName, // routing key + false, // mandatory + false, // immediate + message, + ) +} + +// generateData ... +func generateData(body []byte) amqp.Publishing { + return amqp.Publishing{ + ContentType: "application/json", + Body: body, + } +} diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..4239106 --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,38 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/logrusorgru/aurora" + "github.com/streadway/amqp" +) + +var ( + connection *amqp.Connection + channel *amqp.Channel +) + +// Connect ... +func Connect(uri string) error { + conn, err := amqp.Dial(uri) + if err != nil { + fmt.Println(aurora.Red("*** Connect to RabbitMQ failed: " + uri)) + return err + } + + fmt.Println(aurora.Green("*** CONNECTED TO RABBITMQ: " + uri)) + + // Set connection + connection = conn + + // Set channel + channel, _ = connection.Channel() + + return nil +} + +// GetConnection ... +func GetConnection() *amqp.Connection { + return connection +} + diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..7ff69f0 --- /dev/null +++ b/subscriber.go @@ -0,0 +1,40 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/logrusorgru/aurora" + "github.com/streadway/amqp" +) + +// SubscribeQueue ... +func SubscribeQueue(queueName string) { + _, err := channel.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + fmt.Println("*** Declare failed subscriber queue "+queueName, err.Error()) + } else { + fmt.Println(aurora.Green("*** RabbitMQ - Declared queue: " + queueName)) + } +} + +// GetMessagesFromQueue ... +func GetMessagesFromQueue(queueName string) <-chan amqp.Delivery { + messages, _ := channel.Consume( + queueName, + "", + true, + false, + false, + false, + nil, + ) + return messages +}