18910140161

阿里云mns php长轮询 实现自动消费的问题?

顺晟科技

2022-09-29 10:16:18

130

阿里云MNS的案例代码

  public function t1()
    {

        $endPoint = 'url';
        $accessId = "id";
        $accessKey = "key";

        //队列名称
        $queueName = "buyorder-settle";
        $client = new Client($endPoint, $accessId, $accessKey);

        // 1.首先获取Queue的实例。
        // PHP SDK默认会对发送的消息做Base64 Encode,对接收到的消息做Base64 Decode。
        // 如果不希望SDK做这样的Base64操作,可以在getQueueRef的时候,传入参数$base64=FALSE。即$queue = $this->client->getQueueRef($queueName, FALSE);
        $queue = $client->getQueueRef($queueName);

        $receiptHandle = NULL;
        try {
            // 1.直接调用receiveMessage函数。
            // receiveMessage函数接受waitSeconds参数,无特殊情况建议设置为30。
            // waitSeconds非0表示这次receiveMessage是一次http long polling,
            //如果queue内没有message,那么这次request会在server端等到queue内有消息才返回。
            //最长等待时间为waitSeconds的值,最大为30。
            $res = $queue->receiveMessage(30);
            //原始数据
            print ($res->getMessageBody());
//            echo "ReceiveMessage Succeed! \n";
//            if (strtoupper($bodyMD5) == $res->getMessageBodyMD5()) {
//                echo "You got the message sent by yourself! \n";
//            }
            // 2.获取ReceiptHandle,这是一个有时效性的Handle,可以用来设置Message的各种属性和删除Message。更多信息,请参见QueueMessage。
            $receiptHandle = $res->getReceiptHandle();
        } catch (MnsException $e) {
            // 3.和CreateQueue和SendMessage一样,ReceiveMessage也有可能出错,所以加上CatchException并做对应的处理。
            print "ReceiveMessage Failed: " . $e;
            return;
        }

        // 这里是您处理消息的逻辑。Sample里就直接略过这一步。
        // 如果这里发生了程序崩溃或卡住等异常情况,对应的Message会在VisibilityTimeout之后重新可见,从而可以被其他进程处理,避免消息丢失。

        // 4.消息已经处理完成,从队列里删除这条消息。
        try {
            // 5.直接调用deleteMessage。
            $res = $queue->deleteMessage($receiptHandle);
            print "DeleteMessage Succeed! \n";
        } catch (MnsException $e) {
            // 6.这里CatchException并做异常处理。
            // 如果是receiptHandle已经过期,那么ErrorCode是MessageNotExist,表示通过这个receiptHandle已经找不到对应的消息。
            // 为了保证receiptHandle不过期,VisibilityTimeout的设置需要保证足够消息处理完成。并且在消息处理过程中,也可以调用changeMessageVisibility这个函数来延长消息的VisibilityTimeout时间。
            print "DeleteMessage Failed: " . $e;
            return;
        }
    }

问题一:
请问这里的语句
$res = $queue->receiveMessage(30);
是30秒内队列有消息,就持续消费。若等待30秒没有值就结束的意思吗?下次需要手动刷新本页面?

问题二:
现在必须刷新一次网页才能消费一条消息,如何实现长轮询自动消费?

你已经提到 长轮询 了,长轮询 在客户端就是需要加上死循环的。

客户端逻辑如下伪代码,

for {
    if (满足某个条件) {
        break
    }
    msg = client->getRequest(timeOut = 30s) //最多等待30s
    handle(msg) // 处理返回,返回可能是超时原因导致数据为空
}

服务端逻辑伪代码(此处为阿里云MNS,自己假设的,没看过文档、代码)

// 线程1
for {
    msg = newMsg() //产生新的数据
    if hasClientRequest() { //发现有客户端请求在,客户端设置了超时时间,没有数据就一直挂起
        responseToClient(msg) // 把数据返回给客户端
    } else {
        pushToQueue(msg) //把数据先放入队列
    }
}

// 线程2
for {
    if hasNewClientRequest() { //发现有新的客户端请求到来
        if queueIsNotEmpty() { // 队列非空
            reponseQueueMsgToClient() // 将队列中的数据返回给客户端
        }
    }
}
// consumer.php
$rec = __DIR__.'/rec.txt';

while (true) {
    if (is_file($rec) && file_get_contents($rec) === 'quit') {
        break;
    }

    $job = $mns->receiveMessage($waitSeconds);
    dispatch($job);
}

然后用supervisor跑几十个进程实现并发消费

退出的逻辑可以参考laravel监听信号
任务超时可以参考laravel使用闹钟信号 https://www.php.net/manual/en...

我们已经准备好了,你呢?
2024我们与您携手共赢,为您的企业形象保驾护航