Jerry's WIKIJerry's WIKI
Overview
  • 🐞 Web
  • 📐 Components
  • 💡 Skills
  • 🎱 Specification
  • 🖥 Workflows
  • 🛠 Tools
  • 🌐️ Linux
  • 📦 Container
  • ♨️ Language
Coffee
  • 简体中文
  • English
GitHub
Overview
  • 🐞 Web
  • 📐 Components
  • 💡 Skills
  • 🎱 Specification
  • 🖥 Workflows
  • 🛠 Tools
  • 🌐️ Linux
  • 📦 Container
  • ♨️ Language
Coffee
  • 简体中文
  • English
GitHub
  • 📞 Event Mechanism

    • Event Roles and Considerations
    • Code Example
  • ⏰ Crontab
  • ⛓ Processes
  • 📝 File System
  • 🕓 Cache
  • 📩 Queue

    • Queue Usage
    • Notes
  • 🚦 Signal
  • 📤 GuzzleHttp
  • 📉 Rate Limiter
  • ❌ Exception
  • 🖨 Logs
  • 📡 Command
  • 🔁 WebSocket

Queue Notes

Index

  • Consumption Timeout
  • Consumption Exception
  • Idempotency
  • Service Stop Issues
    • Solution 1
    • Solution 2

【Tip】

For any message, we need to consider:

  • Whether it will timeout (exceeding the configured consumption time).
  • How we should handle exceptions during the consumption process.
  • Whether the message is idempotent, i.e., multiple executions do not affect business logic.
  • How to handle messages that were being processed when the service stops without triggering an exception.。Safe Shutdown

Essentially, this means shutting down the consumer process after a specified time, but it cannot guarantee 100% safety. However, the restart commands of K8S or Docker will ensure a better guarantee. (This conclusion was drawn from testing...)


Consumption Timeout

  1. For services with heavy computations or time-consuming operations, we should try to create new queues and configure longer timeout durations to distinguish them from regular queues.
  2. Timeout tasks will be placed in the timeout queue, but they have already been triggered, i.e., .
  3. For idempotent tasks, we can configure the ReloadChannelListener listener, which will move messages in the timeout queue back to the waiting queue for continued consumption.

Consumption Exception

  1. When an exception occurs during consumption, the task will immediately retry until the maximum retry limit is reached.
  2. Please ensure transaction consistency! Otherwise, retries may lead to unexpected issues, such as operations being skipped or using pessimistic locks.
  3. If transaction consistency cannot be guaranteed, the task can be marked as failed within the try-catch-finally block (ensure that the consumption "appears successful" for this round), and it will be reprocessed later.

Idempotency

Please try to ensure that tasks are idempotent. Most tasks are update operations, so you can implement a where you re-check whether a write operation is needed. If the content has already been processed, skip that operation when it is executed again.

Service Stop Issues

When the service stops, the underlying system will close the consumption process after a specified time. However, if a large number of messages are still being delivered for consumption, it is possible that tasks being processed may be forcibly terminated in the middle of execution without triggering an exception.

Solution 1

  1. Before delivering the message, check external variables to decide whether to deliver it, in order to control the queue traffic. (You can create a new command for CLI operation) Reference: Queue Delivery Switch.
  2. Increase the waiting time value to allow existing tasks to be processed as much as possible. In swoole, the waiting time,Hyperfwaiting time

Code Exp

封装投递类

<?php

declare(strict_types=1);
namespace App\Lib\RedisQueue;

use App\Job\AbstractJob;
use App\Lib\Redis\Redis;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;
use Hyperf\Context\ApplicationContext;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;

class RedisQueueFactory
{
    /**
     * 根据队列名称判断是否投递消息.
     */
    private const IS_PUSH_KEY = 'IS_PUSH_%s';

    /**
     * 获取队列实例.
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public static function getQueueInstance(string $queueName = 'default'): DriverInterface
    {
        return ApplicationContext::getContainer()->get(DriverFactory::class)->get($queueName);
    }

    /**
     * 根据外部变量控制是否投递消息.
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public static function safePush(AbstractJob $job, string $queueName = 'default', int $delay = 0): bool
    {
        // 动态读取外部变量, 判断是否投递
        $key = sprintf(static::IS_PUSH_KEY, $queueName);
        $isPush = Redis::getRedisInstance()->get($key);
        if ($isPush) {
            return self::getQueueInstance($queueName)->push($job, $delay);
        }
        return false;
    }
}


Code Example

#[GetMapping(path: 'queue/safe_push')]
public function safePushMessage(): array
{
    for ($i = 10; --$i;) {
        Coroutine::create(function () use ($i) {
            $job = new DemoJob((string) $i, []);
            RedisQueueFactory::safePush($job, 'redis-queue', 0);
        });
    }

    return $this->result->getResult();
}

Solution 2

The outermost gateway layer performs traffic switching, for example: Gateway, ServerA, ServerB. When ServerA needs to be restarted, all traffic is redirected to ServerB. After the restart is complete, all traffic is redirected back to ServerA, and at this point, ServerB can restart. This solution relies on gateway capabilities.

Edit this page
Update At:
Contributor: 田朝帆
Prev
Queue Usage
Next
Signal