队列注意事项
目录
【注意】
任何消息我们均需要考虑:
- 是否会超时(超过配置的消费时间)。
- 消费过程中的异常我们应如何处理。
- 消息是否是幂等性的。即多次执行不影响业务逻辑。
- 当服务停止时,执行到中途的消息停掉,没有触发异常如何处理。安全关闭 本质上是是指定时间后关闭该消费进程,但是并不能很好保证100%安全。但是
K8S
、Docker
的重启命令会很好的保证。(测试得出的结论..)
消费超时
1、对于大量的计算服务或者耗时操作,我们应该尽量创建新的队列,且配置较长的超时时间,以便和普通的队列进行区分。
2、超时的任务会被放置 timeout
队列中,但是此时它已经被触发,即:。
3、对于是幂等性的任务,我们可以配置 ReloadChannelListener
监听器,它会将 timeout
中的消息放置 waiting
队列,继续消费。
消费异常
1、当消费过程中出现异常抛出,任务会立即进行重试,直至达到最大重试上限。
2、请保证 事务一致性
!否则重试会与之前的执行产生意想不到的问题。例如:执行过的操作再次操作跳过,或者使用使用悲观锁。
3、如果无法保证 事务一致性
,那么,可以在 try-catch-finally
中将该任务标记为失败(先保证该次消费"表面成功"),后续再次处理。
幂等性
1、请尽可能的保证任务是幂等性的。一般任务多为 update
操作,那么可以实现像 断点续传 的模式,每次都重新判断是否需要写操作,已经操作过的 内容,再次执行时跳过该步操作。
服务停止问题
当服务停止时,底层会在指定的时间后进行关闭消费进程,但是持续大量的还有消息投递进来消费,那么总有可能出现,消费过程中 有任务执行到中途被强制杀掉,且不会触发异常抛出。
方案一
1、投递消息前,先根据外部变量判断是否投递,用于控制队列流量。(可以通过创建新的命令行进行cli操作) 参考:队列投递开关
2、加大 等待时间
的值,让存量任务尽可能的消费完。swoole
中的 等待时间,Hyperf
中的 等待时间
代码示例
优化 封装投递类
<?php
declare(strict_types=1);
namespace App\Lib\RedisQueue;
use App\Job\AbstractJob;
use App\Lib\Redis\Redis;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;
use Hyperf\Context\ApplicationContext;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
class RedisQueueFactory
{
/**
* 根据队列名称判断是否投递消息.
*/
private const IS_PUSH_KEY = 'IS_PUSH_%s';
/**
* 获取队列实例.
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public static function getQueueInstance(string $queueName = 'default'): DriverInterface
{
return ApplicationContext::getContainer()->get(DriverFactory::class)->get($queueName);
}
/**
* 根据外部变量控制是否投递消息.
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public static function safePush(AbstractJob $job, string $queueName = 'default', int $delay = 0): bool
{
// 动态读取外部变量, 判断是否投递
$key = sprintf(static::IS_PUSH_KEY, $queueName);
$isPush = Redis::getRedisInstance()->get($key);
if ($isPush) {
return self::getQueueInstance($queueName)->push($job, $delay);
}
return false;
}
}
使用示例
#[GetMapping(path: 'queue/safe_push')]
public function safePushMessage(): array
{
for ($i = 10; --$i;) {
Coroutine::create(function () use ($i) {
$job = new DemoJob((string) $i, []);
RedisQueueFactory::safePush($job, 'redis-queue', 0);
});
}
return $this->result->getResult();
}
方案二
1、最外层的网关层进行流量转换,例如:Gateway
、ServerA
、ServerB
。当 ServerA
需要重启时,流量全量导入 ServerB
,重启完毕后, 流量全量导入 ServerA
,此时,ServerB
重启。依赖网关能力。