Eli's Blog

1. 简介

面向消息的中间件,用于组件之间的解藕,主要体现在消息的发送者和消费者之间无强依赖关系

消息中间件:在消息传输过程中保存消息的容器。其中作用:

  • 解藕
  • 削峰
  • 异步处理
  • 缓存存储
  • 消息通知
  • 提供系统的拓展性

消息中间传递模型

  • 点对点 (PTP)

  • 发布订阅 (Pub/Sub)

AMQP: Advanced Message Queue Protocol

2. 安装

RabbitMQ 3.7.19: Upgrading to Erlang 21.x or Later Versions

2.1 Linux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
yum repolist 
yum clean all
yum makecache
yum list erlang

cd /etc/yum.repos.d/

# erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

yum install  erlang

# rabbitmq
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

yum install rabbitmq-server-3.7.19-1.el7

2.2 配置文件

1) /etc/rabbitmq/rabbitmq-env.conf

1
2
3
4
RABBITMQ_NODE_IP_ADDRESS: 127.0.0.1
RABBITMQ_NODE_PORT: 5672
RABBITMQ_NODE_CONFIG_FILE: *.config
RABBITMQ_NODE_LOG_BASE:

2) /etc/rabbitmq/rabbitmq.config

1
2
3
tcp_listeners             5672
disk_free_limit
vm_memory_high_watermark 0.4

2.3 启动

1
2
3
systemctl enable rabbitmq-server

systemctl start rabbitmq-server

2.4 Mac

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brew install rabbitmq

# 配置环境变量
export RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.8.0
export PATH=$PATH:$RABBIT_HOME/sbin

# 启动服务
sudo rabbitmq-server
sudo rabbitmq-server -detached # 后台运行

# 停止服务
rabbitmqctl stop

# 配置文件位置
cd /usr/local/etc/rabbitmq

2.5 Docker

1
2
3
4
5
6
7
mkdir -p /data/rabbitmq

docker run -d --hostname rabbit-server --name rabbit-sever -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management

5672: API
15672: GUI
25672: 集群通信

3. 控制命令

3.1 插件命令

1
2
3
4
5
# 插件列表
rabbitmq-plugins list

# 开启管理工具 (支持http://localhost:15672/)
rabbitmq-plugins enable rabbitmq_management

3.2 操作命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 虚拟主机
rabbitmqctl add_vhost ut_vhost
rabbitmqctl list_vhosts

# 新增账号
rabbitmqctl add_user utime welovetime
rabbitmqctl list_users

# 修改密码
rabbitmqctl change_password utime utime@celery123

# 设置tag
sudo rabbitmqctl set_user_tags utime administrator

# 设置权限
rabbitmqctl set_permissions -p ut_vhost utime ".*" ".*" ".*"
rabbitmqctl list_permissions

# 队列状态
rabbitmqctl list_queues
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
rabbitmqctl list_queues name consumers
rabbitmqctl list_queues name memory

4. 相关术语

  • Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程

  • VirtualHost:虚拟消息服务器,每个 VirtualHost 相当于一个相对独立的 RabbitMQ 服务器,每个 VirtualHost 之间是相互隔离的。

  • Connections:被监听的链接

  • Exchange:消息交换机,决定消息按什么规则,路由到哪个队列

  • Queue:队列,用于存储生产者的消息;接收到交换机发来的消息,未被消费暂存队列中

  • Binding:绑定,把exchange和queue按照路由规则绑定起来

  • Routing Key:路由关键字,exchange根据这个关键字来投递消息

  • Channel:消息通道,客户端的每个连接建立多个channel

  • Producer/Publisher: 消息生产者,用于投递消息的程序

  • Consumer:消息消费者,用于接收消息的程序

  • ConnectionFactory:连接管理器,应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用;

4.1 Exchange

  • Fanout: 类似广播,转发到所有绑定交换机的Queue上
  • Direct:类似单播,RoutingKey和BindingKey完全匹配
  • Topic:类似组播,转发到符合通配符的Queue上
  • Headers:请求头与消息头匹配,才能接收消息

Fanout工作模式:

fanout

Direct工作模式:

direct

Topic工作模式:

topic

核心概念

  • Exchange:交互机,用于接受、分配消息;生成者产生的消息,送入交互机中,根据交换机规则绑定 key, 通过交换机把消息发到对应得 key 上。
    • Direct Exchange:直接交互式路由键。需要将一个队列绑定到交换机上,要求该消息一一个特定的路由完全匹配。例如一个队列绑定到该交换机上要求路由键为”dog”,只有被标记为”dog”的消息能够被转发。
    • Fanout Exchange: 广播式路由键。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
    • Topic Exchange:主题式交换器。通过消息的路由关键字和绑定的关键字的模糊匹配,将消息路由到被绑定的队列中。支持通配符:*匹配一个词组,#零个或多个词组。*.stock.#匹配路由关键字usd.stockeur.stock.db,但不匹配stock.nasdaq

5. 工作模式

5.1 Simple

生产者 ——> 队列 ——> 消费者

最简单常用的模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
return &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
Mqurl: MQURL,
}
}

// 设置queueName
func NewRabbitMQPubSub(queueName string) *RabbitMQ {
rabbitmq := NewRabbitMQ(queueName, "", "")

var err error

// 创建连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接失败")

// 创建channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败")

return rabbitmq
}

5.2 Work

生产者 ——> 队列 ——> 多个消费者,争抢

一个消息只能被一个消费者获取。与Simple模式的区别,只在于开启了多个消费者端,负载均衡。

5.3 Publish / Subscribe

消息被路由投递给多个队列,一个消息被多个消费者获取

生产者 ——> 交换机 ——> 队列 ——> 消费者

​ |

​ — ——> 队列 ——> 消费者

邮件群发,群聊天,广播 (广告)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
return &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
Mqurl: MQURL,
}
}

// 设置exchangeName
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, "")

var err error

// 创建连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接失败")

// 创建channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败")

return rabbitmq
}

5.4 Routing

从生产端就可以指定队列消息的消费者是谁, 一个消息被多个消费者获取,并且消息的目标队列可被生产者指定

5.5 Topic

交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

6. Golang

6.1 支撑包

1
go get github.com/streadway/amqp

6.2 基础队列

6.2.1 连接 RabbitMQ

1
2
3
4
5
6
7
8
9
func GetRabbitMQConn() (*amqp.Connection, error) {
username := "guest"
password := "guest"
host := "127.0.0.1"
port := 5672

url := fmt.Sprintf("amqp://%s:%s@%s:%d", username, password, host, port)
return amqp.Dial(url)
}

6.2.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type demo struct {
Name string `json:"name"`
Addr string `json:"addr"`
}

func main() {
conn, err := GetRabbitMQConn()
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()

data := demo{
Name: "Jack",
Addr: "Montreal",
}
bs, _ := json.Marshal(data)

err = ch.Publish(
"", // exchange
"simple:queue", // key, RoutingKey, same as Queue.Name when the exchange mode is direct.
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: bs,
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}

log.Printf("[*] sent %s", bs)
}

6.2.3 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func main() {
conn, err := GetRabbitMQConn()
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()

q, err := ch.QueueDeclare(
"simple:queue", // name
false, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}

// 定义一个消费者
msgs, err := ch.Consume(
q.Name, // queue (string)
"", // consumer
true, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consume: %v", err)
}

go func() {
for msg := range msgs {
log.Printf("Received a message: %s\n", msg.Body)
}
}()

log.Println("[*] Waiting for messages. To exit press CTRL+C")
select {}
}

6.3 任务队列

为了避免等待执行一些耗时的任务, 而是将需要执行的任务封装为消息发送给工作队列, 后台运行的工作进程将任务消息取出来并执行相关任务。多个后台工作进程同时间进行, 那么任务在他们之间共享.

6.3.1 发布任务 (task.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func bodyForm(args []string) string {
var s string

if (len(args) < 2) || os.Args[1] == "" {
s = "no task"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

func main() {
conn, err := GetRabbitMQConn()
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()

body := bodyForm(os.Args)

err = ch.Publish(
"",
"task:queue",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Body: []byte(body),
},
)
if err != nil {
log.Fatalf("Failed to publish a message")
}

log.Printf("sent %s\n", body)
}

6.3.2 执行任务 (worker.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func main() {
conn, err := GetRabbitMQConn()
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()

q, err := ch.QueueDeclare(
"task:queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}

// 计数器
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Fatalf("Failed to set Qos: %v", err)
}

msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}

done := make(chan bool)

go func() {
for msg := range msgs {
log.Printf("Received a message: %s\n", msg.Body)
msg.Ack(false)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-done
}

 上一页

mongodb