init module

This commit is contained in:
Nam Huynh 2021-08-09 10:45:44 +07:00
parent 556e2fce78
commit 7ae6e76d09
7 changed files with 148 additions and 0 deletions

2
.gitignore vendored
View File

@ -13,3 +13,5 @@
# Dependency directories (remove the comment below to include it)
# vendor/
.idea

8
go.mod Normal file
View File

@ -0,0 +1,8 @@
module github.com/Selly-Modules/rabbitmq
go 1.16
require (
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/streadway/amqp v1.0.0
)

4
go.sum Normal file
View File

@ -0,0 +1,4 @@
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

24
log.go Normal file
View File

@ -0,0 +1,24 @@
package rabbitmq
import (
"fmt"
"github.com/logrusorgru/aurora"
)
// NewMessageLog ...
func NewMessageLog(queueName string) {
fmt.Println(aurora.BgBlue("*** New message from queue: " + queueName))
}
// PublishErrorLog ...
func PublishErrorLog(queueName string, err error) {
fmt.Println(aurora.Red(
fmt.Sprintf(
"*** Error when publish message to %s: %s",
queueName,
err.Error(),
),
),
)
}

32
publisher.go Normal file
View File

@ -0,0 +1,32 @@
package rabbitmq
import (
"encoding/json"
"github.com/streadway/amqp"
)
// Publish new message to channel
func Publish(queueName string, data interface{}) error {
byteData, _ := json.Marshal(data)
// Set publishing data
message := generateData(byteData)
// Send
return channel.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
message,
)
}
// generateData ...
func generateData(body []byte) amqp.Publishing {
return amqp.Publishing{
ContentType: "application/json",
Body: body,
}
}

38
rabbitmq.go Normal file
View File

@ -0,0 +1,38 @@
package rabbitmq
import (
"fmt"
"github.com/logrusorgru/aurora"
"github.com/streadway/amqp"
)
var (
connection *amqp.Connection
channel *amqp.Channel
)
// Connect ...
func Connect(uri string) error {
conn, err := amqp.Dial(uri)
if err != nil {
fmt.Println(aurora.Red("*** Connect to RabbitMQ failed: " + uri))
return err
}
fmt.Println(aurora.Green("*** CONNECTED TO RABBITMQ: " + uri))
// Set connection
connection = conn
// Set channel
channel, _ = connection.Channel()
return nil
}
// GetConnection ...
func GetConnection() *amqp.Connection {
return connection
}

40
subscriber.go Normal file
View File

@ -0,0 +1,40 @@
package rabbitmq
import (
"fmt"
"github.com/logrusorgru/aurora"
"github.com/streadway/amqp"
)
// SubscribeQueue ...
func SubscribeQueue(queueName string) {
_, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("*** Declare failed subscriber queue "+queueName, err.Error())
} else {
fmt.Println(aurora.Green("*** RabbitMQ - Declared queue: " + queueName))
}
}
// GetMessagesFromQueue ...
func GetMessagesFromQueue(queueName string) <-chan amqp.Delivery {
messages, _ := channel.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
return messages
}