账号密码登录
微信安全登录
微信扫描二维码登录

登录后绑定QQ、微信即可实现信息互通

手机验证码登录
找回密码返回
邮箱找回 手机找回
注册账号返回
其他登录方式
分享
  • 收藏
    X
    php如何用libevent处理rabbitmq发来的消息,防止消息丢失或者人为的中断导致消息没有被处理完整
    59
    0

    之前是用官方提供的方法来处理的

    ``
    $channel->basic_consume('hello', '', false, true, false, false, $callback);

    while(count($channel->callbacks)) {

    $channel->wait();

    }
    ``

    发现中断程序的时候 每个消息没有被完全处理完,比如拿到消息还没有处理完入库就被中断了。

    后来打算用libevent 据说在事件处理中,除非强制kill掉,不然事件会处理完再中断,但是以下代码还是不断报错;

    ``
    $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);

            $this->channel = $this->connection->channel();;
            $this->channel->queue_declare($queue, false, true, false, false);

    // $this->channel->basic_consume($queue, '', false, true, false, false, function ($msg) {
    // set_trace_prefix(md5($msg->body . time()));
    // trace('receive msg:' . $msg->body);
    // trace('done');
    // });

            $ch = $this->channel;
            $fd = $this->connection->getSocket();
            Worker::$globalEvent->add($fd, EventInterface::EV_READ, function ($fd) use ($queue, $ch){
                $msg = $ch->basic_get($queue, true);
                print_r($msg);
                print("get a messge:\n");
    
            });
            ``
    0
    打赏
    收藏
    点击回答
        全部回答
    • 0
    • 二更半 普通会员 1楼
      502 Bad Gateway

      502 Bad Gateway


      nginx
    更多回答
    网站公告
    扫一扫访问手机版
    • 回到顶部
    • 回到顶部