站长图库向大家介绍了thinkphp6,think-queue,thinkphp普通队列,thinkphp延迟队列等相关知识,希望对您有所帮助
本篇文章给大家带来了关于thinkphp的相关知识,其中主要介绍了关于使用think-queue来实现普通队列和延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务,下面一起来看一下,希望对大家有帮助。

###TP6 队列
TP6 中使用 think-queue 可以实现普通队列和延迟队列。
think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
队列的多队列, 内存限制 ,启动,停止,守护等
消息队列可降级为同步执行
消息队列实现过程
1、通过生产者推送消息到消息队列服务中
2、消息队列服务将收到的消息存入redis队列中(zset)
3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
4、处理获取下来的消息调用业务类进行处理相关业务
5、业务处理后,需要从队列中删除消息
composer 安装 think-queue
composer require topthink/think-queue
配置文件
安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。
tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。
return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', '6379'), 'password' => env('redis.password','123456'), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ],];创建目录及队列消费类文件
在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/** * Class Queue 队列消费基础类 * @package app\queue */abstract class Queue{ /** * @describe:fire是消息队列默认调用的方法 * @param \think\queue\Job $job * @param $message */ public function fire(Job $job, $data) { if (empty($data)) { Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__)); return ; } $jobId = $job->getJobId(); // 队列的数据库id或者redis key // $jobClassName = $job->getName(); // 队列对象类 // $queueName = $job->getQueue(); // 队列名称 // 如果已经执行中或者执行完成就不再执行了 if (!$this->checkJob($jobId, $data)) { $job->delete(); Cache::store('redis')->delete($jobId); return ; } // 执行业务处理 if ($this->execute($data)) { Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__)); $job->delete(); // 任务执行成功后删除 Cache::store('redis')->delete($jobId); // 删除redis中的缓存 } else { // 检查任务重试次数 if ($job->attempts() > 3) { Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__)); // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行 //$job->release(10);  

