Queue Notes
Index
【Tip】
For any message, we need to consider:
- Whether it will timeout (exceeding the configured consumption time).
- How we should handle exceptions during the consumption process.
- Whether the message is idempotent, i.e., multiple executions do not affect business logic.
- How to handle messages that were being processed when the service stops without triggering an exception.。Safe Shutdown
Essentially, this means shutting down the consumer process after a specified time, but it cannot guarantee 100% safety. However, the restart commands of K8S
or Docker
will ensure a better guarantee. (This conclusion was drawn from testing...)
Consumption Timeout
- For services with heavy computations or time-consuming operations, we should try to create new queues and configure longer timeout durations to distinguish them from regular queues.
- Timeout tasks will be placed in the
timeout
queue, but they have already been triggered, i.e., . - For idempotent tasks, we can configure the
ReloadChannelListener
listener, which will move messages in thetimeout queue
back to thewaiting queue
for continued consumption.
Consumption Exception
- When an exception occurs during consumption, the task will immediately retry until the maximum retry limit is reached.
- Please ensure
transaction consistency
! Otherwise, retries may lead to unexpected issues, such as operations being skipped or using pessimistic locks. - If
transaction consistency
cannot be guaranteed, the task can be marked as failed within thetry-catch-finally
block (ensure that the consumption "appears successful" for this round), and it will be reprocessed later.
Idempotency
Please try to ensure that tasks are idempotent. Most tasks are update
operations, so you can implement a where you re-check whether a write operation is needed. If the content has already been processed, skip that operation when it is executed again.
Service Stop Issues
When the service stops, the underlying system will close the consumption process after a specified time. However, if a large number of messages are still being delivered for consumption, it is possible that tasks being processed may be forcibly terminated in the middle of execution without triggering an exception.
Solution 1
- Before delivering the message, check external variables to decide whether to deliver it, in order to control the queue traffic. (You can create a new command for CLI operation) Reference: Queue Delivery Switch.
- Increase the
waiting time
value to allow existing tasks to be processed as much as possible. Inswoole
, the waiting time,Hyperf
waiting time
Code Exp
<?php
declare(strict_types=1);
namespace App\Lib\RedisQueue;
use App\Job\AbstractJob;
use App\Lib\Redis\Redis;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;
use Hyperf\Context\ApplicationContext;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
class RedisQueueFactory
{
/**
* 根据队列名称判断是否投递消息.
*/
private const IS_PUSH_KEY = 'IS_PUSH_%s';
/**
* 获取队列实例.
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public static function getQueueInstance(string $queueName = 'default'): DriverInterface
{
return ApplicationContext::getContainer()->get(DriverFactory::class)->get($queueName);
}
/**
* 根据外部变量控制是否投递消息.
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public static function safePush(AbstractJob $job, string $queueName = 'default', int $delay = 0): bool
{
// 动态读取外部变量, 判断是否投递
$key = sprintf(static::IS_PUSH_KEY, $queueName);
$isPush = Redis::getRedisInstance()->get($key);
if ($isPush) {
return self::getQueueInstance($queueName)->push($job, $delay);
}
return false;
}
}
Code Example
#[GetMapping(path: 'queue/safe_push')]
public function safePushMessage(): array
{
for ($i = 10; --$i;) {
Coroutine::create(function () use ($i) {
$job = new DemoJob((string) $i, []);
RedisQueueFactory::safePush($job, 'redis-queue', 0);
});
}
return $this->result->getResult();
}
Solution 2
The outermost gateway layer performs traffic switching, for example: Gateway
, ServerA
, ServerB
. When ServerA
needs to be restarted, all traffic is redirected to ServerB
. After the restart is complete, all traffic is redirected back to ServerA
, and at this point, ServerB
can restart. This solution relies on gateway capabilities.