龙空技术网

golang实现rabbitmq消息队列消费失败尝试重试

孫攀龍 317

前言:

如今小伙伴们对“php延迟队列订单超时”大概比较注意,朋友们都需要知道一些“php延迟队列订单超时”的相关知识。那么小编也在网上汇集了一些有关“php延迟队列订单超时””的相关文章,希望姐妹们能喜欢,各位老铁们快快来学习一下吧!

在工作中发现,有些时候消息因为某些原因在消费一次后,如果消息失败,这时候不ack,消息就回一直重回队列首部,造成消息拥堵。

一:消息重试机制

如是有了如下思路:

消息进入队列前,header默认有参数 retry_num=0 表示尝试次数;

消费者在消费时候的,如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路

当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);

这样消息就又回到原始消费队列尾部了;

最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;

db日志可以记录交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)

这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)

图解:

附上代码

git clone

send.go 消费者

package mainimport (    "fmt"    _ "fmt"    ";)func main() {    for i := 0;i<20;i++{        body := fmt.Sprintf("{\"order_id\":%d}",i)        fmt.Println(body)        /**            使用默认的交换机            如果是默认交换机            type QueueExchange struct {            QuName  string           // 队列名称            RtKey   string           // key值            ExName  string           // 交换机名称            ExType  string           // 交换机类型            Dns     string              //链接地址            }            如果你喜欢使用默认交换机            RtKey  此处建议填写成 RtKey 和 QuName 一样的值         */        queueExchange := rabbitmq.QueueExchange{            "a_test_0001",            "a_test_0001",            "hello_go",            "direct",            "amqp://guest:guest@192.168.1.169:5672/",        }        _ = rabbitmq.Send(queueExchange,body)    }}

recv.go消费者

package mainimport (    "fmt"    ";    "time")type RecvPro struct {}//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db/*返回值 error 为nil  则表示该消息消费成功否则消息会进入ttl延时队列  重复尝试消费3次3次后消息如果还是失败 消息就执行失败  进入告警 FailAction */func (t *RecvPro) Consumer(dataByte []byte) error {    time.Sleep(time.Second*1)    //return errors.New("顶顶顶顶")    fmt.Println(string(dataByte))    //time.Sleep(1*time.Second)    //return errors.New("顶顶顶顶")    return nil}//消息已经消费3次 失败了 请进行处理/*如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等 */func (t *RecvPro) FailAction(err error,dataByte []byte) error {    fmt.Println(string(dataByte))    fmt.Println(err)    fmt.Println("任务处理失败了,我要进入db日志库了")    fmt.Println("任务处理失败了,发送钉钉消息通知主人")    return nil}func main() {    processTask := &RecvPro{}    /*        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了        maxTryConnTimeFromMinute:表示最大尝试时间  分钟     */    err := rabbitmq.Recv(rabbitmq.QueueExchange{        "a_test_0001",        "a_test_0001",        "hello_go",        "direct",        "amqp://guest:guest@192.168.1.169:5672/",    },    processTask,4,2)    if(err != nil){        fmt.Println(err)    }}

recv.go消费者

package mainimport (    "fmt"    ";    "time")type RecvPro struct {}//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db/*返回值 error 为nil  则表示该消息消费成功否则消息会进入ttl延时队列  重复尝试消费3次3次后消息如果还是失败 消息就执行失败  进入告警 FailAction */func (t *RecvPro) Consumer(dataByte []byte) error {    time.Sleep(time.Second*1)    //return errors.New("顶顶顶顶")    fmt.Println(string(dataByte))    //time.Sleep(1*time.Second)    //return errors.New("顶顶顶顶")    return nil}//消息已经消费3次 失败了 请进行处理/*如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等 */func (t *RecvPro) FailAction(err error,dataByte []byte) error {    fmt.Println(string(dataByte))    fmt.Println(err)    fmt.Println("任务处理失败了,我要进入db日志库了")    fmt.Println("任务处理失败了,发送钉钉消息通知主人")    return nil}func main() {    processTask := &RecvPro{}    /*        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了        maxTryConnTimeFromMinute:表示最大尝试时间  分钟     */    err := rabbitmq.Recv(rabbitmq.QueueExchange{        "a_test_0001",        "a_test_0001",        "hello_go",        "direct",        "amqp://guest:guest@192.168.1.169:5672/",    },    processTask,4,2)    if(err != nil){        fmt.Println(err)    }}

utils/rabbitmq包

package rabbitmqimport (    "errors"    "strconv"    "time"    //"errors"    "fmt"    "github.com/streadway/amqp"    "log")// 定义全局变量,指针类型var mqConn *amqp.Connectionvar mqChan *amqp.Channel// 定义生产者接口type Producer interface {    MsgContent() string}// 定义生产者接口type RetryProducer interface {    MsgContent() string}// 定义接收者接口type Receiver interface {    Consumer([]byte)    error    FailAction(error , []byte)  error}// 定义RabbitMQ对象type RabbitMQ struct {    connection *amqp.Connection    Channel *amqp.Channel    dns string    QueueName   string            // 队列名称    RoutingKey  string            // key名称    ExchangeName string           // 交换机名称    ExchangeType string           // 交换机类型    producerList []Producer    retryProducerList []RetryProducer    receiverList []Receiver}// 定义队列交换机对象type QueueExchange struct {    QuName  string           // 队列名称    RtKey   string           // key值    ExName  string           // 交换机名称    ExType  string           // 交换机类型    Dns     string              //链接地址}// 链接rabbitMQfunc (r *RabbitMQ)MqConnect() (err error){    mqConn, err = amqp.Dial(r.dns)    r.connection = mqConn   // 赋值给RabbitMQ对象    if err != nil {        fmt.Printf("rbmq链接失败  :%s \n", err)    }    return}// 关闭mq链接func (r *RabbitMQ)CloseMqConnect() (err error){    err = r.connection.Close()    if err != nil{        fmt.Printf("关闭mq链接失败  :%s \n", err)    }    return}// 链接rabbitMQfunc (r *RabbitMQ)MqOpenChannel() (err error){    mqConn := r.connection    r.Channel, err = mqConn.Channel()    //defer mqChan.Close()    if err != nil {        fmt.Printf("MQ打开管道失败:%s \n", err)    }    return err}// 链接rabbitMQfunc (r *RabbitMQ)CloseMqChannel() (err error){    r.Channel.Close()    if err != nil {        fmt.Printf("关闭mq链接失败  :%s \n", err)    }    return err}// 创建一个新的操作对象func NewMq(q QueueExchange) RabbitMQ {    return RabbitMQ{        QueueName:q.QuName,        RoutingKey:q.RtKey,        ExchangeName: q.ExName,        ExchangeType: q.ExType,        dns:q.Dns,    }}func (mq *RabbitMQ) sendMsg (body string) (err error)  {    err = mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer func() {        _ = mq.Channel.Close()    }()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    if mq.ExchangeName != "" && mq.RoutingKey != ""{        err = mq.Channel.Publish(            mq.ExchangeName,     // exchange            mq.RoutingKey, // routing key            false,  // mandatory            false,  // immediate            amqp.Publishing {                ContentType: "text/plain",                Body:        []byte(body),                DeliveryMode: 2,            })    }else{        err = mq.Channel.Publish(            "",     // exchange            mq.QueueName, // routing key            false,  // mandatory            false,  // immediate            amqp.Publishing {                ContentType: "text/plain",                Body:        []byte(body),                DeliveryMode: 2,            })    }    return}/*发送延时消息 */func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){    err =mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            return        }    }    if ttl <= 0{        return errors.New("发送延时消息,ttl参数是必须的")    }    table := make(map[string]interface{},3)    table["x-dead-letter-routing-key"] = mq.RoutingKey    table["x-dead-letter-exchange"] = mq.ExchangeName    table["x-message-ttl"] = ttl*1000    //fmt.Printf("%+v",table)    //fmt.Printf("%+v",mq)    // 用于检查队列是否存在,已经存在不需要重复声明    ttlstring := strconv.FormatInt(ttl,10)    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)    if err != nil {        return    }    // 绑定任务    if routingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)        if err != nil {            return        }    }    header := make(map[string]interface{},1)    header["retry_nums"] = 0    var ttl_exchange string    var ttl_routkey string    if(mq.ExchangeName != "" ){        ttl_exchange = mq.ExchangeName    }else{        ttl_exchange = ""    }    if mq.RoutingKey != "" && mq.ExchangeName != ""{        ttl_routkey = routingKey    }else{        ttl_routkey = queueName    }    err = mq.Channel.Publish(        ttl_exchange,     // exchange        ttl_routkey, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing {            ContentType: "text/plain",            Body:        []byte(body),            Headers:header,        })    if err != nil {        return    }    return}func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {    err :=mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    //原始路由key    oldRoutingKey := args[0]    //原始交换机名    oldExchangeName := args[1]    table := make(map[string]interface{},3)    table["x-dead-letter-routing-key"] = oldRoutingKey    if oldExchangeName != "" {        table["x-dead-letter-exchange"] = oldExchangeName    }else{        mq.ExchangeName = ""        table["x-dead-letter-exchange"] = ""    }    table["x-message-ttl"] = int64(20000)    //fmt.Printf("%+v",table)    //fmt.Printf("%+v",mq)    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    header := make(map[string]interface{},1)    header["retry_nums"] = retry_nums + int32(1)    var ttl_exchange string    var ttl_routkey string    if(mq.ExchangeName != "" ){        ttl_exchange = mq.ExchangeName    }else{        ttl_exchange = ""    }    if mq.RoutingKey != "" && mq.ExchangeName != ""{        ttl_routkey = mq.RoutingKey    }else{        ttl_routkey = mq.QueueName    }    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)    err = mq.Channel.Publish(        ttl_exchange,     // exchange        ttl_routkey, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing {            ContentType: "text/plain",            Body:        []byte(body),            Headers:header,        })    if err != nil {        fmt.Printf("MQ任务发送失败:%s \n", err)    }}// 监听接收者接收任务 消费者func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {    err :=mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    // 获取消费通道,确保rabbitMQ一个一个发送消息    err =  ch.Qos(1, 0, false)    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)    if err != nil {        log.Printf("Consume err :%s \n", err)    }    for msg := range msgList {        retry_nums,ok := msg.Headers["retry_nums"].(int32)        if(!ok){            retry_nums = int32(0)        }        // 处理数据        err := receiver.Consumer(msg.Body)        if err!=nil {            //消息处理失败 进入延时尝试机制            if retry_nums < 3{                fmt.Println(string(msg.Body))                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")                retry_msg(msg.Body,retry_nums,QueueExchange{                        mq.QueueName,                        mq.RoutingKey,                        mq.ExchangeName,                        mq.ExchangeType,                        mq.dns,                    })            }else{                //消息失败 入库db                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")                receiver.FailAction(err,msg.Body)            }            err = msg.Ack(true)            if err != nil {                fmt.Printf("确认消息未完成异常:%s \n", err)            }        }else {            // 确认消息,必须为false            err = msg.Ack(true)            if err != nil {                fmt.Printf("消息消费ack失败 err :%s \n", err)            }        }    }}//消息处理失败之后 延时尝试func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){    //原始队列名称 交换机名称    oldQName := queueExchange.QuName    oldExchangeName := queueExchange.ExName    oldRoutingKey := queueExchange.RtKey    if oldRoutingKey == "" || oldExchangeName == ""{        oldRoutingKey = oldQName    }    if queueExchange.QuName != "" {        queueExchange.QuName = queueExchange.QuName + "_retry_3";    }    if queueExchange.RtKey != "" {        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";    }else{        queueExchange.RtKey = queueExchange.QuName + "_retry_3";    }//fmt.Printf("%+v",queueExchange)    mq := NewMq(queueExchange)    _ = mq.MqConnect()    defer func(){        _ = mq.CloseMqConnect()    }()    //fmt.Printf("%+v",queueExchange)    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)}func Send(queueExchange QueueExchange,msg string) (err error){    mq := NewMq(queueExchange)    err = mq.MqConnect()    if err != nil{        return    }    defer func(){        _ = mq.CloseMqConnect()    }()    err = mq.sendMsg(msg)    return}//发送延时消息func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){    mq := NewMq(queueExchange)    err = mq.MqConnect()    if err != nil{        return    }    defer func(){        _ = mq.CloseMqConnect()    }()    err = mq.sendDelayMsg(msg,ttl)    return}/*runNums  开启并发执行任务数量 */func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){    var (        exitTask bool        maxTryConnNums int  //rbmq链接失败后多久尝试一次        runNums int        maxTryConnTimeFromMinute int    )    if(len(otherParams) <= 0){        runNums = 1        maxTryConnTimeFromMinute = 0    }else if(len(otherParams) == 1){        runNums = otherParams[0]        maxTryConnTimeFromMinute = 0    }else if(len(otherParams) == 2){        runNums = otherParams[0]        maxTryConnTimeFromMinute = otherParams[1]    }    //maxTryConnNums := 360 //rbmq链接失败后最大尝试次数    //maxTryConnTime := time.Duration(10) //rbmq链接失败后多久尝试一次    maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq链接失败后最大尝试次数    maxTryConnTime := time.Duration(6) //rbmq链接失败后多久尝试一次    mq := NewMq(queueExchange)    //链接rabbitMQ    err = mq.MqConnect()    if(err != nil){        return    }    defer func() {        if panicErr := recover(); panicErr != nil{            fmt.Println(recover())            err = errors.New(fmt.Sprintf("%s",panicErr))        }    }()    //rbmq断开链接后 协程退出释放信号    taskQuit:= make(chan struct{}, 1)    //尝试链接rbmq    tryToLinkC := make(chan struct{}, 1)    //最大尝试次数    tryToLinkMaxNums := make(chan struct{}, 1)    maxTryNums := 0 //尝试重启次数    //开始执行任务    for i:=1;i<=runNums;i++{        go Recv2(mq,receiver,taskQuit);    }    //如果rbmq断开连接后 尝试重新建立链接    var tryToLink = func() {        for {            maxTryNums += 1            err = mq.MqConnect()            if(err == nil){                tryToLinkC <- struct{}{}                break            }            if(maxTryNums > maxTryConnNums){                tryToLinkMaxNums <- struct{}{}                break            }            //如果链接断开了 10秒重新尝试链接一次            time.Sleep(time.Second * maxTryConnTime)        }        return    }    scheduleTimer := time.NewTimer(time.Millisecond*300)    exitTask = true    for{        select {        case <-tryToLinkC: //建立链接成功后 重新开启协程执行任务            fmt.Println("重新开启新的协程执行任务")            go Recv2(mq,receiver,taskQuit);        case <-tryToLinkMaxNums://rbmq超出最大链接次数 退出任务            fmt.Println("rbmq链接超过最大尝试次数!")            exitTask = false            err = errors.New("rbmq链接超过最大尝试次数!")        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接            fmt.Println("rbmq断开连接后 开始尝试重新建立链接")             go tryToLink()        case <- scheduleTimer.C:            //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~")        }        // 重置调度间隔        scheduleTimer.Reset(time.Millisecond*300)        if !exitTask{            break        }    }    fmt.Println("exit")    return}func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){        defer func() {            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")            taskQuit <- struct{}{}            return        }()        // 验证链接是否正常        err := mq.MqOpenChannel()        if(err != nil){            return        }        mq.ListenReceiver(receiver)}type retryPro struct {    msgContent   string}
二,延时队列

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:

实现方案

定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务pcntl_alarm为进程设置一个闹钟信号swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现

生产者:

1 <?php 2 require_once __DIR__ . '/../vendor/autoload.php'; 3 use PhpAmqpLib\Connection\AMQPStreamConnection; 4 use PhpAmqpLib\Message\AMQPMessage; 5  6  7 $queue = "test_ack_queue"; 8 $exchange = "test_ack_queue"; 9 //获取连接10 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');11 //从连接中创建通道12 $channel = $connection->channel();13 14 $channel->exchange_declare('delay_exchange', 'direct',false,true,false);15 $channel->exchange_declare('cache_exchange', 'direct',false,true,false);16 17 $tale = new \PhpAmqpLib\Wire\AMQPTable();18 $tale->set('x-dead-letter-exchange', 'delay_exchange');19 $tale->set('x-dead-letter-routing-key','delay_exchange');20 //$tale->set('x-message-ttl',10000);21 22 $channel->queue_declare('cache_queue',false,true,false,false,false,$tale);23 $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');24 25 $channel->queue_declare('delay_queue',false,true,false,false,false);26 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');27 28 29 $msg = new AMQPMessage('Hello World',array(30     'expiration' => 10000,31     'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT32 33 ));34 35 $channel->basic_publish($msg,'cache_exchange','cache_exchange');36 echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;37 38 39 40 41 //while ($wait) {42 //    $channel->wait();43 //}44 45 $channel->close();46 $connection->close();task

消费者:

1 <?php 2 require_once __DIR__ . '/../vendor/autoload.php'; 3 use PhpAmqpLib\Connection\AMQPStreamConnection; 4 use PhpAmqpLib\Message\AMQPMessage; 5  6  7 //获取连接 8 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 9 //从连接中创建通道10 $channel = $connection->channel();11 12 13 //$channel->queue_declare($queue, false, true, false, false);14 //$channel->exchange_declare($exchange, 'topic', false, true, false);15 //$channel->queue_bind($queue, $exchange);16 17 18 19 $channel->exchange_declare('delay_exchange', 'direct',false,false,false);20 $channel->queue_declare('delay_queue',false,true,false,false,false);21 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');22 23 24 25 function process_message(AMQPMessage $message)26 {27     $headers = $message->get('application_headers');28     $nativeData = $headers->getNativeData();29 //    var_dump($nativeData['x-delay']);30     echo date('Y-m-d H:i:s')." [x] Received",$message->body,PHP_EOL;31     $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);32 33 }34 35 36 $channel->basic_qos(null, 1, null);37 $channel->basic_consume('delay_queue', '', false, false, false, false, 'process_message');38 39 function shutdown($channel, $connection)40 {41     $channel->close();42     $connection->close();43 }44 register_shutdown_function('shutdown', $channel, $connection);45 46 while (count($channel->callbacks)) {47     $channel->wait();48 }work

延时队列实现和上面所讲的消息重试有异曲同工之处,都是利用了延时时间和死信队列这一特性实现

最新源码仓库地址:

其它:该rabbitmq包实现中包含了,rabbitmq断线重连,有兴趣的同学可以看看

  (重试和重连接是两个概念)

  重连接 :rabbitmq链接失败导致任务失败,此时要等待rabbitmq服务器恢复正常后才能再次启动协程处理任务

  重试:rabbitmq服务正常,消息消费进程也正常,但是消息处理失败。尝试多次消费消息后还是失败就ack消息,在整个重试过程中不会阻塞消费

golang监听rabbitmq消息队列任务断线自动重连接:

标签: #php延迟队列订单超时 #php延迟队列订单超时怎么解决问题