1. 简介
面向消息的中间件,用于组件之间的解藕,主要体现在消息的发送者和消费者之间无强依赖关系
消息中间件:在消息传输过程中保存消息的容器。其中作用:
- 解藕
- 削峰
- 异步处理
- 缓存存储
- 消息通知
- 提供系统的拓展性
消息中间传递模型:
AMQP: Advanced Message Queue Protocol
2. 安装
RabbitMQ 3.7.19: Upgrading to Erlang 21.x or Later Versions
2.1 Linux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| yum repolist yum clean all yum makecache yum list erlang
cd /etc/yum.repos.d/
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server-3.7.19-1.el7
|
2.2 配置文件
1) /etc/rabbitmq/rabbitmq-env.conf
1 2 3 4
| RABBITMQ_NODE_IP_ADDRESS: 127.0.0.1 RABBITMQ_NODE_PORT: 5672 RABBITMQ_NODE_CONFIG_FILE: *.config RABBITMQ_NODE_LOG_BASE:
|
2) /etc/rabbitmq/rabbitmq.config
1 2 3
| tcp_listeners 5672 disk_free_limit vm_memory_high_watermark 0.4
|
2.3 启动
1 2 3
| systemctl enable rabbitmq-server
systemctl start rabbitmq-server
|
2.4 Mac
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| brew install rabbitmq
export RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.8.0 export PATH=$PATH:$RABBIT_HOME/sbin
sudo rabbitmq-server sudo rabbitmq-server -detached
rabbitmqctl stop
cd /usr/local/etc/rabbitmq
|
2.5 Docker
1 2 3 4 5 6 7
| mkdir -p /data/rabbitmq
docker run -d --hostname rabbit-server --name rabbit-sever -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management
5672: API 15672: GUI 25672: 集群通信
|
3. 控制命令
3.1 插件命令
1 2 3 4 5
| rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
|
3.2 操作命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| rabbitmqctl add_vhost ut_vhost rabbitmqctl list_vhosts
rabbitmqctl add_user utime welovetime rabbitmqctl list_users
rabbitmqctl change_password utime utime@celery123
sudo rabbitmqctl set_user_tags utime administrator
rabbitmqctl set_permissions -p ut_vhost utime ".*" ".*" ".*" rabbitmqctl list_permissions
rabbitmqctl list_queues rabbitmqctl list_queues name messages messages_ready messages_unacknowledged rabbitmqctl list_queues name consumers rabbitmqctl list_queues name memory
|
4. 相关术语
Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程
VirtualHost:虚拟消息服务器,每个 VirtualHost 相当于一个相对独立的 RabbitMQ 服务器,每个 VirtualHost 之间是相互隔离的。
Connections:被监听的链接
Exchange:消息交换机,决定消息按什么规则,路由到哪个队列
Queue:队列,用于存储生产者的消息;接收到交换机发来的消息,未被消费暂存队列中
Binding:绑定,把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字来投递消息
Channel:消息通道,客户端的每个连接建立多个channel
Producer/Publisher: 消息生产者,用于投递消息的程序
Consumer:消息消费者,用于接收消息的程序
ConnectionFactory:连接管理器,应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用;
4.1 Exchange
- Fanout: 类似广播,转发到所有绑定交换机的Queue上
- Direct:类似单播,RoutingKey和BindingKey完全匹配
- Topic:类似组播,转发到符合通配符的Queue上
- Headers:请求头与消息头匹配,才能接收消息
Fanout工作模式:

Direct工作模式:

Topic工作模式:

核心概念:
- Exchange:交互机,用于接受、分配消息;生成者产生的消息,送入交互机中,根据交换机规则绑定 key, 通过交换机把消息发到对应得 key 上。
- Direct Exchange:直接交互式路由键。需要将一个队列绑定到交换机上,要求该消息一一个特定的路由完全匹配。例如一个队列绑定到该交换机上要求路由键为”dog”,只有被标记为”dog”的消息能够被转发。
- Fanout Exchange: 广播式路由键。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- Topic Exchange:主题式交换器。通过消息的路由关键字和绑定的关键字的模糊匹配,将消息路由到被绑定的队列中。支持通配符:
*
匹配一个词组,#
零个或多个词组。*.stock.#
匹配路由关键字usd.stock
和eur.stock.db
,但不匹配stock.nasdaq
5. 工作模式
5.1 Simple
生产者 ——> 队列 ——> 消费者
最简单常用的模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ { return &RabbitMQ{ QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL, } }
func NewRabbitMQPubSub(queueName string) *RabbitMQ { rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "创建连接失败")
rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "获取channel失败")
return rabbitmq }
|
5.2 Work
生产者 ——> 队列 ——> 多个消费者,争抢
一个消息只能被一个消费者获取。与Simple模式的区别,只在于开启了多个消费者端,负载均衡。
5.3 Publish / Subscribe
消息被路由投递给多个队列,一个消息被多个消费者获取
生产者 ——> 交换机 ——> 队列 ——> 消费者
|
— ——> 队列 ——> 消费者
邮件群发,群聊天,广播 (广告)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ { return &RabbitMQ{ QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL, } }
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "创建连接失败")
rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "获取channel失败")
return rabbitmq }
|
5.4 Routing
从生产端就可以指定队列消息的消费者是谁, 一个消息被多个消费者获取,并且消息的目标队列可被生产者指定
5.5 Topic
交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
6. Golang
6.1 支撑包
1
| go get github.com/streadway/amqp
|
6.2 基础队列
6.2.1 连接 RabbitMQ
1 2 3 4 5 6 7 8 9
| func GetRabbitMQConn() (*amqp.Connection, error) { username := "guest" password := "guest" host := "127.0.0.1" port := 5672
url := fmt.Sprintf("amqp://%s:%s@%s:%d", username, password, host, port) return amqp.Dial(url) }
|
6.2.2 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| type demo struct { Name string `json:"name"` Addr string `json:"addr"` }
func main() { conn, err := GetRabbitMQConn() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close()
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close()
data := demo{ Name: "Jack", Addr: "Montreal", } bs, _ := json.Marshal(data)
err = ch.Publish( "", "simple:queue", false, false, amqp.Publishing{ ContentType: "text/plain", Body: bs, }) if err != nil { log.Fatalf("Failed to publish a message: %v", err) }
log.Printf("[*] sent %s", bs) }
|
6.2.3 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| func main() { conn, err := GetRabbitMQConn() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close()
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close()
q, err := ch.QueueDeclare( "simple:queue", false, false, false, false, nil, ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) }
msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, ) if err != nil { log.Fatalf("Failed to register a consume: %v", err) }
go func() { for msg := range msgs { log.Printf("Received a message: %s\n", msg.Body) } }()
log.Println("[*] Waiting for messages. To exit press CTRL+C") select {} }
|
6.3 任务队列
为了避免等待执行一些耗时的任务, 而是将需要执行的任务封装为消息发送给工作队列, 后台运行的工作进程将任务消息取出来并执行相关任务。多个后台工作进程同时间进行, 那么任务在他们之间共享.
6.3.1 发布任务 (task.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func bodyForm(args []string) string { var s string
if (len(args) < 2) || os.Args[1] == "" { s = "no task" } else { s = strings.Join(args[1:], " ") } return s }
func main() { conn, err := GetRabbitMQConn() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close()
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close()
body := bodyForm(os.Args)
err = ch.Publish( "", "task:queue", false, false, amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Body: []byte(body), }, ) if err != nil { log.Fatalf("Failed to publish a message") }
log.Printf("sent %s\n", body) }
|
6.3.2 执行任务 (worker.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| func main() { conn, err := GetRabbitMQConn() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close()
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close()
q, err := ch.QueueDeclare( "task:queue", false, false, false, false, nil, ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) }
err = ch.Qos( 1, 0, false, ) if err != nil { log.Fatalf("Failed to set Qos: %v", err) }
msgs, err := ch.Consume( q.Name, "", false, false, false, false, nil, ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) }
done := make(chan bool)
go func() { for msg := range msgs { log.Printf("Received a message: %s\n", msg.Body) msg.Ack(false) } }()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-done }
|