首页 > 文章列表 > 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

golang
301 2022-12-17

golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

代码实现一:

消费者:

package main

import (

    "fmt"

    "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"

)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db

/*

返回值 error 为nil  则表示该消息消费成功

否则消息会进入ttl延时队列  重复尝试消费3次

3次后消息如果还是失败 消息就执行失败  进入告警 FailAction

 */

func (t *RecvPro) Consumer(dataByte []byte) error {

    //time.Sleep(500*time.Microsecond)

    //return errors.New("顶顶顶顶")

    fmt.Println(string(dataByte))

    //time.Sleep(1*time.Second)

    return nil

//消息已经消费3次 失败了 请进行处理

如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等

func (t *RecvPro) FailAction(err error,dataByte []byte) error {

    fmt.Println(err)

    fmt.Println("任务处理失败了,我要进入db日志库了")

    fmt.Println("任务处理失败了,发送钉钉消息通知主人")

func main() {

    t := &RecvPro{}

    //rabbitmq.Recv(rabbitmq.QueueExchange{

    //    "a_test_0001",

    //    "",

    //    "amqp://guest:guest@192.168.2.232:5672/",

    //},t,5)

    /*

        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了

     */

    err := rabbitmq.Recv(rabbitmq.QueueExchange{

        "a_test_0001",

        "hello_go",

        "direct",

        "amqp://guest:guest@192.168.1.169:5672/",

    },t,4)

    if(err != nil){

        fmt.Println(err)

    }

rabbitmq代码

package rabbitmq



import (

    "errors"

    "strconv"

    "time"

    //"errors"

    "fmt"

    "github.com/streadway/amqp"

    "log"

)

// 定义全局变量,指针类型

var mqConn *amqp.Connection

var mqChan *amqp.Channel

// 定义生产者接口

type Producer interface {

    MsgContent() string

}

type RetryProducer interface {

// 定义接收者接口

type Receiver interface {

    Consumer([]byte)    error

    FailAction(error , []byte)  error

// 定义RabbitMQ对象

type RabbitMQ struct {

    connection *amqp.Connection

    Channel *amqp.Channel

    dns string

    QueueName   string            // 队列名称

    RoutingKey  string            // key名称

    ExchangeName string           // 交换机名称

    ExchangeType string           // 交换机类型

    producerList []Producer

    retryProducerList []RetryProducer

    receiverList []Receiver

// 定义队列交换机对象

type QueueExchange struct {

    QuName  string           // 队列名称

    RtKey   string           // key值

    ExName  string           // 交换机名称

    ExType  string           // 交换机类型

    Dns     string              //链接地址

// 链接rabbitMQ

func (r *RabbitMQ)MqConnect() (err error){

    mqConn, err = amqp.Dial(r.dns)

    r.connection = mqConn   // 赋值给RabbitMQ对象

    if err != nil {

        fmt.Printf("rbmq链接失败  :%s \n", err)

    }

    return

// 关闭mq链接

func (r *RabbitMQ)CloseMqConnect() (err error){

    err = r.connection.Close()

    if err != nil{

        fmt.Printf("关闭mq链接失败  :%s \n", err)

func (r *RabbitMQ)MqOpenChannel() (err error){

    mqConn := r.connection

    r.Channel, err = mqConn.Channel()

    //defer mqChan.Close()

        fmt.Printf("MQ打开管道失败:%s \n", err)

    return err

func (r *RabbitMQ)CloseMqChannel() (err error){

    r.Channel.Close()

// 创建一个新的操作对象

func NewMq(q QueueExchange) RabbitMQ {

    return RabbitMQ{

        QueueName:q.QuName,

        RoutingKey:q.RtKey,

        ExchangeName: q.ExName,

        ExchangeType: q.ExType,

        dns:q.Dns,

func (mq *RabbitMQ) sendMsg (body string) (err error)  {

    err = mq.MqOpenChannel()

    ch := mq.Channel

        log.Printf("Channel err  :%s \n", err)

    defer mq.Channel.Close()

    if mq.ExchangeName != "" {

        if mq.ExchangeType == ""{

            mq.ExchangeType = "direct"

        }

        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)

        if err != nil {

            log.Printf("ExchangeDeclare err  :%s \n", err)

    // 用于检查队列是否存在,已经存在不需要重复声明

    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)

        log.Printf("QueueDeclare err :%s \n", err)

    // 绑定任务

    if mq.RoutingKey != "" && mq.ExchangeName != "" {

        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)

            log.Printf("QueueBind err :%s \n", err)

    if mq.ExchangeName != "" && mq.RoutingKey != ""{

        err = mq.Channel.Publish(

            mq.ExchangeName,     // exchange

            mq.RoutingKey, // routing key

            false,  // mandatory

            false,  // immediate

            amqp.Publishing {

                ContentType: "text/plain",

                Body:        []byte(body),

            })

    }else{

            "",     // exchange

            mq.QueueName, // routing key

/*

发送延时消息

 */

func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){

    err =mq.MqOpenChannel()

            return

    if ttl <= 0{

        return errors.New("发送延时消息,ttl参数是必须的")

    table := make(map[string]interface{},3)

    table["x-dead-letter-routing-key"] = mq.RoutingKey

    table["x-dead-letter-exchange"] = mq.ExchangeName

    table["x-message-ttl"] = ttl*1000

    //fmt.Printf("%+v",table)

    //fmt.Printf("%+v",mq)

    ttlstring := strconv.FormatInt(ttl,10)

    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)

    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)

    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)

        return

    if routingKey != "" && mq.ExchangeName != "" {

        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)

    header := make(map[string]interface{},1)

    header["retry_nums"] = 0

    var ttl_exchange string

    var ttl_routkey string

    if(mq.ExchangeName != "" ){

        ttl_exchange = mq.ExchangeName

        ttl_exchange = ""

    if mq.RoutingKey != "" && mq.ExchangeName != ""{

        ttl_routkey = routingKey

        ttl_routkey = queueName

    err = mq.Channel.Publish(

        ttl_exchange,     // exchange

        ttl_routkey, // routing key

        false,  // mandatory

        false,  // immediate

        amqp.Publishing {

            ContentType: "text/plain",

            Body:        []byte(body),

            Headers:header,

        })

func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {

    err :=mq.MqOpenChannel()

    //原始路由key

    oldRoutingKey := args[0]

    //原始交换机名

    oldExchangeName := args[1]

    table["x-dead-letter-routing-key"] = oldRoutingKey

    if oldExchangeName != "" {

        table["x-dead-letter-exchange"] = oldExchangeName

        mq.ExchangeName = ""

        table["x-dead-letter-exchange"] = ""

    table["x-message-ttl"] = int64(20000)

    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)

    header["retry_nums"] = retry_nums + int32(1)

        ttl_routkey = mq.RoutingKey

        ttl_routkey = mq.QueueName

    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)

        fmt.Printf("MQ任务发送失败:%s \n", err)

// 监听接收者接收任务 消费者

func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {

    // 获取消费通道,确保rabbitMQ一个一个发送消息

    err =  ch.Qos(1, 0, false)

    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)

        log.Printf("Consume err :%s \n", err)

    for msg := range msgList {

        retry_nums,ok := msg.Headers["retry_nums"].(int32)

        if(!ok){

            retry_nums = int32(0)

        // 处理数据

        err := receiver.Consumer(msg.Body)

        if err!=nil {

            //消息处理失败 进入延时尝试机制

            if retry_nums < 3{

                fmt.Println(string(msg.Body))

                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")

                retry_msg(msg.Body,retry_nums,QueueExchange{

                        mq.QueueName,

                        mq.RoutingKey,

                        mq.ExchangeName,

                        mq.ExchangeType,

                        mq.dns,

                    })

            }else{

                //消息失败 入库db

                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")

                receiver.FailAction(err,msg.Body)

            }

            err = msg.Ack(true)

            if err != nil {

                fmt.Printf("确认消息未完成异常:%s \n", err)

        }else {

            // 确认消息,必须为false

                fmt.Printf("消息消费ack失败 err :%s \n", err)

//消息处理失败之后 延时尝试

func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){

    //原始队列名称 交换机名称

    oldQName := queueExchange.QuName

    oldExchangeName := queueExchange.ExName

    oldRoutingKey := queueExchange.RtKey

    if oldRoutingKey == "" || oldExchangeName == ""{

        oldRoutingKey = oldQName

    if queueExchange.QuName != "" {

        queueExchange.QuName = queueExchange.QuName + "_retry_3";

    if queueExchange.RtKey != "" {

        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";

        queueExchange.RtKey = queueExchange.QuName + "_retry_3";

//fmt.Printf("%+v",queueExchange)

    mq := NewMq(queueExchange)

    _ = mq.MqConnect()

    defer func(){

        _ = mq.CloseMqConnect()

    }()

    //fmt.Printf("%+v",queueExchange)

    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)

func Send(queueExchange QueueExchange,msg string) (err error){

    err = mq.MqConnect()

        mq.CloseMqConnect()

    err = mq.sendMsg(msg)

//发送延时消息

func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){

    err = mq.sendDelayMsg(msg,ttl)

runNums  开启并发执行任务数量

func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){

    //链接rabbitMQ

    if(err != nil){

    //rbmq断开链接后 协程退出释放信号

    taskQuit:= make(chan struct{}, 1)

    //尝试链接rbmq

    tryToLinkC := make(chan struct{}, 1)

    //开始执行任务

    for i:=1;i<=runNums;i++{

        go Recv2(mq,receiver,taskQuit);

    //如果rbmq断开连接后 尝试重新建立链接

    var tryToLink = func() {

        for {

            err = mq.MqConnect()

            if(err == nil){

                tryToLinkC <- struct{}{}

                break

            time.Sleep(time.Second * 10)

    for{

        select {

        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接

             go tryToLink()

            <-tryToLinkC //建立链接成功后 重新开启协程执行任务

            fmt.Println("重新开启新的协程执行任务")

            go Recv2(mq,receiver,taskQuit);

        time.Sleep(time.Millisecond*100)

func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){

        defer func() {

            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")

            taskQuit <- struct{}{}

        }()

        // 验证链接是否正常

        err := mq.MqOpenChannel()

        if(err != nil){

        mq.ListenReceiver(receiver)

type retryPro struct {

    msgContent   string

实现重连方式很多,下面实现方式比较简单

1.Recv方法创建ampq链接

2.启动协程开始执行任务 

MqOpenChannel 打开一个channel通道处理amqp消息

拿到消息 处理任务

  3,协程中捕获异常发送消息到taskQuit <- struct{}{}

  4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功

  5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*

runNums  开启并发执行任务数量

 */

func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){

    mq := NewMq(queueExchange)

    //链接rabbitMQ

    err = mq.MqConnect()

    if(err != nil){

        return

    }

    //rbmq断开链接后 协程退出释放信号

    taskQuit:= make(chan struct{}, 1)

    //尝试链接rbmq

    tryToLinkC := make(chan struct{}, 1)

    //开始执行任务

    for i:=1;i<=runNums;i++{

        go Recv2(mq,receiver,taskQuit);



    //如果rbmq断开连接后 尝试重新建立链接

    var tryToLink = func() {

        for {

            err = mq.MqConnect()

            if(err == nil){

                tryToLinkC <- struct{}{}

                break

            }

            time.Sleep(time.Second * 10)

        }

    for{

        select {

        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接

             go tryToLink()

            <-tryToLinkC //建立链接成功后 重新开启协程执行任务

            fmt.Println("重新开启新的协程执行任务")

            go Recv2(mq,receiver,taskQuit);

        time.Sleep(time.Millisecond*100)

}

func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){

        defer func() {

            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")

            taskQuit <- struct{}{}

            return

        }()

        // 验证链接是否正常

        err := mq.MqOpenChannel()

        if(err != nil){

        mq.ListenReceiver(receiver)