1. 为什么需要消息队列
在开发Web应用时,经常会遇到一些耗时操作,比如发送短信、邮件通知、处理大文件等。如果直接在用户请求中处理这些操作,会导致用户等待时间过长,体验非常差。我曾经在一个电商项目中遇到过这样的问题:促销活动时大量用户同时下单,系统需要给每个用户发送订单确认短信,结果短信接口响应慢,直接拖垮了整个下单流程。
这时候消息队列就派上用场了。它就像是一个任务中转站,把耗时的操作放到后台异步处理。用户请求只需要把任务放进队列就可以立即返回,由专门的消费者进程慢慢处理这些任务。这种"削峰填谷"的方式,能有效应对高并发场景。
ThinkPHP官方提供的think-queue扩展包就是一个轻量级的消息队列解决方案。它支持Redis、数据库等多种驱动,配置简单,特别适合中小型项目使用。我在多个实际项目中使用过它,处理过日均百万级的消息量,稳定性相当不错。
2. 安装与基础配置
2.1 使用Composer安装
安装think-queue非常简单,只需要在项目根目录下执行:
composer require topthink/think-queue这里有个小技巧:如果你用的是ThinkPHP5.1,需要指定版本号:
composer require topthink/think-queue:2.0.4安装完成后,可以通过以下命令检查是否安装成功:
php think queue:status如果看到队列状态信息,说明安装成功了。
2.2 配置队列驱动
think-queue支持多种驱动,我强烈推荐使用Redis驱动,因为它的性能最好。配置文件通常位于config/queue.php,下面是一个典型的Redis配置:
return [ 'default' => 'redis', 'connections' => [ 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], ];在实际项目中,我通常会做这些优化配置:
- 设置persistent为true,使用持久连接提高性能
- 为不同业务设置不同的select数据库,方便管理
- 配置适当的timeout值,防止长时间阻塞
3. 创建和处理队列任务
3.1 定义任务类
任务类可以放在app/job目录下。比如我们要处理发送短信的任务:
namespace app\job; use think\queue\Job; class SmsJob { public function fire(Job $job, $data) { // 处理发送短信逻辑 $result = $this->sendSms($data['mobile'], $data['content']); if ($result) { $job->delete(); return true; } if ($job->attempts() > 3) { $job->delete(); $this->logFailedJob($data); } else { $job->release(60); // 1分钟后重试 } } protected function sendSms($mobile, $content) { // 实际调用短信接口的代码 // 返回true或false表示成功或失败 } protected function logFailedJob($data) { // 记录失败任务日志 } }这里有几个实践经验分享:
- 一定要处理任务失败的情况,设置合理的重试次数
- 对于重要任务,建议记录失败日志方便后续处理
- 重试间隔要根据业务特点设置,比如短信可以设置1分钟,邮件可以设置5分钟
3.2 推送任务到队列
在控制器中推送任务非常简单:
use think\facade\Queue; class OrderController { public function create() { // 处理订单创建逻辑... // 推送短信任务 $data = [ 'mobile' => '13800138000', 'content' => '您的订单已创建' ]; Queue::push(\app\job\SmsJob::class, $data, 'sms'); return success('订单创建成功'); } }在实际项目中,我通常会封装一个队列服务类,统一管理各种队列任务。这样可以避免在控制器中直接写队列逻辑,也方便统一处理异常情况。
4. 高并发场景下的优化实践
4.1 多队列管理
当系统中有多种类型的任务时,建议使用不同的队列分开处理。比如:
// 短信队列 Queue::push(SmsJob::class, $data, 'sms'); // 邮件队列 Queue::push(EmailJob::class, $data, 'email'); // 日志队列 Queue::push(LogJob::class, $data, 'log');然后可以针对不同类型的队列启动不同的消费者进程:
# 处理短信队列 php think queue:work --queue sms # 处理邮件队列 php think queue:work --queue email这样做的好处是:
- 不同类型的任务互不影响
- 可以根据任务重要性设置不同的并发数
- 某个队列出问题时不会影响其他队列
4.2 使用Supervisor管理进程
在生产环境中,我们需要确保队列消费者进程一直运行。Supervisor是一个很好的进程管理工具。配置示例如下:
[program:queue_worker] command=php think queue:work --queue sms,email,log --memory=128 --sleep=3 --tries=3 directory=/path/to/your/project autostart=true autorestart=true user=www numprocs=4 redirect_stderr=true stdout_logfile=/var/log/queue_worker.log关键配置说明:
- numprocs=4 表示启动4个进程并行处理
- memory=128 限制每个进程最大内存128M
- sleep=3 队列空时休眠3秒
- tries=3 失败任务最多重试3次
在实际部署时,我通常会根据服务器配置调整numprocs值,一般设置为CPU核心数的1.5-2倍。
4.3 性能优化技巧
在处理高并发队列时,我总结了一些优化经验:
- 批量处理:对于可以批量处理的任务,比如发送短信,可以一次处理多条,减少IO操作
public function fire(Job $job, $data) { // 如果是批量数据 if (isset($data[0])) { $this->batchSendSms($data); } else { $this->sendSms($data); } }- 内存控制:设置合理的内存限制,防止内存泄漏
php think queue:work --memory=128- 超时设置:对于可能长时间运行的任务,设置超时时间
php think queue:work --timeout=60- 延迟队列:对于不需要立即处理的任务,可以使用延迟队列
// 1小时后执行 Queue::later(3600, SmsJob::class, $data, 'sms');5. 常见问题与解决方案
在实际使用think-queue的过程中,我遇到过不少坑,这里分享几个典型问题的解决方法。
5.1 任务重复执行
有时候会发现同一个任务被执行了多次。这通常是因为:
- 任务执行时间过长,超过了visibility timeout
- 消费者进程异常退出
解决方案:
- 优化任务处理逻辑,缩短执行时间
- 设置合理的超时时间
- 实现任务幂等性处理
5.2 内存泄漏
长时间运行的PHP进程容易出现内存泄漏。可以通过以下方式避免:
- 定期重启消费者进程
- 使用--memory参数限制内存
- 在代码中注意释放大变量
# 每个进程处理100个任务后自动重启 php think queue:work --max-jobs=1005.3 队列堆积
当生产者速度远大于消费者时,会导致队列堆积。解决方法:
- 增加消费者进程数量
- 优化消费者处理逻辑
- 使用多个队列服务器分流
监控队列长度也很重要,可以写个简单的脚本检查队列长度,超过阈值时报警。
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); $length = $redis->llen('queues:sms'); if ($length > 1000) { // 发送报警 }6. 监控与维护
一个健壮的队列系统需要完善的监控机制。我通常会在项目中实现以下监控点:
- 队列长度监控:实时监控各队列的任务积压情况
- 消费者进程监控:确保Supervisor管理的进程正常运行
- 失败任务监控:记录并定期处理失败任务
- 性能监控:记录任务处理耗时等指标
可以使用ThinkPHP的日志系统记录这些信息,或者集成专门的监控系统如Prometheus。
对于失败任务,我通常会实现一个重试机制,定期检查失败任务表,对可重试的任务重新放入队列。同时也会设置最大重试次数,避免无限重试。
class RetryFailedJobs { public function run() { $jobs = FailedJob::where('attempts', '<', 3) ->where('last_attempt', '<', time() - 3600) ->select(); foreach ($jobs as $job) { Queue::push($job->payload['job'], $job->payload['data'], $job->queue); $job->increment('attempts'); $job->last_attempt = time(); $job->save(); } } }这个重试任务可以放在计划任务中,每小时执行一次。对于重要的业务任务,还应该实现人工干预接口,方便运营人员手动重试失败任务。