news 2026/4/18 8:40:50

Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

开发环境 Linux

首先我们需要下载安装librdkafka

https://github.com/confluentinc/librdkafka/tags?after=v2.10.0-RC2

tar -zxvf librdkafka-2.7.0.tar.gz cd librdkafka-2.7.0 ./configure make && make install

如何知道我们安装成功了呢

ldconfig -p | grep rdkafka

如果有如下输出 就说明安装成功了 。

接下来 我们安装 rdkafka 的PHP 扩展

https://pecl.php.net/package/rdkafka这里我们选择6.0.0

tar -zxvf rdkafka-6.0.0.tgz cd rdkafka-6.0.0 /usr/php/bin/phpize ./configure --with-php-config=/usr/php/bin/php-config make && make

其中phpize 和 php-config 请修改你自己的PHP 环境。

编译好后,修改php.ini 将rdkafka.so 添加到配置中,重启php 运行如下命令,看看是否扩展生效。

php --ri rdkafka

如果有输出 说明对应的kafka扩展已经安装好了 下面将进行代码层面的编写。

在EasySwooleEvent.php 文件中 加载配置目录 如下图所示:

接下来我们在Config 目录新增一个文件 如下:

Kakfa.php

<?php /** * kafka相关连接配置信息 */ return [ 'kafka' => [ "host" => [ '192.168.1.1:9092', ], "zookeeper"=>[ "192.168.1.1:2181/kafka" ], 'kafka_bin_path'=>'/usr/kafka/bin', //kafka的运行命令 本机有kafka客户端 填写 无 忽略 'is_kafka_client'=>true, //本机有kafka客户端 true 有 false 没有,这里与创建 删除topic 有关 若没有客户端,对应topic 需要提前新建好 ], ];

新增相关的服务 我们在App/Service 下面做相关服务。

新增KafkaConsumerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; class KafkaConsumerService { private $consumer; public function __construct(string $topicName) { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('group.id', 'group_' . $topicName); $conf->set('enable.auto.commit', 'true'); $this->consumer = new KafkaConsumer($conf); $this->consumer->subscribe([$topicName]); } public function listen(callable $callback) { while (true) { $msg = $this->consumer->consume(1000); if (empty($msg)) continue; if ($msg->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $callback($msg->payload); } } } }

KafkaProducerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; class KafkaProducerService { private static $instance = null; private $producer; private $topics = []; private function __construct() { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('queue.buffering.max.ms', 5); $this->producer = new Producer($conf); } public static function getInstance(): self { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function send(string $topicName, string $message): bool { if (!isset($this->topics[$topicName])) { $this->topics[$topicName] = $this->producer->newTopic($topicName); } $topic = $this->topics[$topicName]; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $result = $this->producer->flush(10000); return $result === RD_KAFKA_RESP_ERR_NO_ERROR; } }

KafkaService.php

<?php /** * Kafka消息服务 * @author 树下水月 * @date 2025年11月27日13:23:23 */ namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; use RdKafka\Admin\TopicSpecification; class KafkaService { /** * 发布数据到kafka * @param string $topic topic信息 * @param array $data 发送数据 数组 * @return bool */ public static function publish(string $topic, array $data): bool { return KafkaProducerService::getInstance()->send( $topic, json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ); } /** * 订阅kafka数据 * @param string $topic topic信息 * @param callable $callback * @return void */ public static function consume(string $topic, callable $callback) { $consumer = new KafkaConsumerService($topic); $consumer->listen($callback); } /** * 获取kafka 支持啥类型 * @param string $kafkaBinPath * @return string */ private function getKafkaMode(string $kafkaBinPath): string { $help = []; exec("$kafkaBinPath/kafka-topics.sh --help 2>&1", $help); // 新版 Kafka (2.0+) 支持 --bootstrap-server foreach ($help as $line) { if (strpos($line, '--bootstrap-server') !== false) { return 'bootstrap'; } } return 'zookeeper'; } /** * 获取 list 命令 */ private function buildListCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --list"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --list"; } } /** * 获取 create 命令 */ private function buildCreateCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $topic, int $partitions, int $replica, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --create --topic $topic --partitions $partitions --replication-factor $replica"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --create --topic $topic --partitions $partitions --replication-factor $replica"; } } /** * 获取 delete 命令 */ private function buildDeleteCommand(string $kafkaBinPath, string $addr, string $zookeepers, string $topic, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --delete --topic $topic"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeepers --delete --topic $topic"; } } /** * 在 PHP 中通过 exec 创建 Kafka topic * @param string $topicName Topic 名称 * @param int $partitions 分区数 * @param int $replication 副本数 * @param string $kafkaBinPath Kafka bin 目录(包含 kafka-topics.sh) * @return array 返回结果 ['success'=>bool, 'message'=>string] */ public function createKafkaTopic($topicName, $partitions = 1, $replication = 1, $kafka_bootstrap, $zookeepers, $is_kafka_client = false, $kafkaBinPath = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = $kafka_bootstrap; $mode = $this->getKafkaMode($kafkaBinPath); //获取模式 zookeeper bootstrap if ($this->isKafkaTopicExist($topicName, [$addr], [$zookeepers], $kafkaBinPath)) { var_dump("Topic {$topicName} 已存在,跳过创建"); return true; } $cmd = $this->buildCreateCommand($kafkaBinPath, $addr, $zookeepers, $topicName, $partitions, $replication, $mode); //执行创建 topic exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { var_dump("当前系统内没有kafka客户端,跳过topic创建"); return true; //没有客户端 不会创建topic 直接跳过 } } /** * 检查 Topic 是否存在(兼容模式) * @param $topicName 需要检查的topic * @param array $brokers * @param array $zookeeper zookeeper * @param $kafkaBin * @return bool * @throws \Exception */ public function isKafkaTopicExist($topicName, array $brokers, array $zookeeper, $kafkaBin = '/yisa_oe/kafka/bin') { $addr = implode(',', $brokers); $zookeepers = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); //获取模式 $cmd = $this->buildListCommand($kafkaBin, $addr, $zookeepers, $mode); //获取topic 是否存在 exec($cmd . " 2>&1", $output, $returnVar); if ($returnVar !== 0) { throw new \Exception("检查 topic 失败:" . implode("\n", $output)); } return in_array($topicName, $output); } /** * 删除 Topic * @param $topicName 需要删除的topic * @param array $brokers * @param array $zookeeps zookeep 信息 * @param $is_kafka_client 是否有kafka客户端 默认false 无 * @param $kafkaBin kafka 对应的bin执行目录 * @return bool */ public function deleteKafkaTopic($topicName, array $brokers, array $zookeeper, $is_kafka_client = false, $kafkaBin = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = implode(',', $brokers); $zookeeper_str = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); $cmd = $this->buildDeleteCommand($kafkaBin, $addr, $zookeeper_str, $topicName, $mode); exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { return true; } } }

我们新增一个路由 这里直接忽略 我们以Test.php 这个控制器为例吧

<?php /** * 测评回调控制器 * @author liupeng * @email liupenga@yisa.com * @date 2025年11月18日14:22:20 */ namespace App\HttpController\Api; use EasySwoole\Http\AbstractInterface\Controller; use App\Service\KafkaService; use EasySwoole\EasySwoole\Config; use EasySwoole\ORM\Exception\Exception; use EasySwoole\Validate\Validate; use App\Model\CommonModel; use App\Model\CommonOrmModel; use EasySwoole\Validate\Functions\Length; class Evaluation extends Controller { public function onRequest(?string $action): bool { return parent::onRequest($action); // TODO: Change the autogenerated stub } public function __construct() { parent::__construct(); $this->config = Config::getInstance()->getConf('common');//配置变量 } public function tet(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $result = $kafka_service->createKafkaTopic($topicName, $partitions, $replication, $kafka_bootstrap, $zookeeps, $is_kafka_client, $kafka_bin_path); //创建kafka 的topic $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 var_dump(KafkaService::publish($topicName, ['ddd' => 33, 'time' => date('Y-m-d H:i:s')])); //写入kafka数据 }

这个tet方法 是创建了一个topic 名字为test的 如果存在 就跳过 如果不存在,创建,在然后就是KafkaService::publish 推送数据到对应的topic 中。

删除Topic

public function tet1(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 $result = $kafka_service->deleteKafkaTopic('Liupeng', $brokers, $zookeeps_arr, $is_kafka_client, $kafka_bin_path); //删除topic }

订阅某个kafka数据

public function dd() { $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'kkk'; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 //消费kafka数据 一致占用 KafkaService::consume($topicName,function ($msg){ echo "收到". $msg . PHP_EOL; }); }

KafkaService::consume 订阅数据

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 23:40:37

解码康师傅源头活水:从用户共鸣到价值引领的年轻化新未来

在Z世代成为消费主力、饮品市场竞争进入“存量博弈”的当下,如何精准捕捉年轻群体需求、构建长效品牌联结,成为行业破局的核心命题。作为深耕饮品行业三十余年的国民品牌,康师傅饮品早已跳出“产品叫卖”的传统营销模式,以“用户洞察为锚、场景渗透为脉、价值共创为核”,构建起…

作者头像 李华
网站建设 2026/4/18 1:16:11

如何将外部镜像文件导入华为云国际站代理商的IMS服务?

将外部镜像文件导入华为云国际站代理商的 IMS 服务&#xff0c;流程和直接在华为云国际站操作 IMS 导入一致&#xff0c;代理商可协助完成全流程&#xff0c;核心是完成镜像准备、上传至 OBS 桶、注册为私有镜像三步&#xff0c;具体操作步骤如下&#xff1a;准备符合规范的外部…

作者头像 李华
网站建设 2026/4/18 5:16:27

SPI注入

简单来说&#xff1a;Api&#xff0c;你调用框架spi&#xff0c;框架调用你一、先记住这四个角色可以先记住这四个角色&#xff1a;Java SPI&#xff1a;JDK 级插件发现&#xff08;最原始&#xff09;Spring Boot SPI&#xff1a;框架级插件发现&#xff08;带生命周期&#x…

作者头像 李华
网站建设 2026/4/18 8:36:41

ARM 汇编指令:MOV

ARM 汇编指令&#xff1a;MOV 本文来自于我关于 ARM 汇编指令系列文章。欢迎阅读、点评与交流~ 1、ARM 汇编指令&#xff1a;MOV 2、ARM 汇编指令&#xff1a;LDR 3、ARM 汇编指令&#xff1a;STR 4、ARM 汇编指令&#xff1a;MRS 和 MSR 5、ARM 汇编指令&#xff1a;ORRS 在 A…

作者头像 李华
网站建设 2026/4/16 1:50:07

第 8 篇 目标检测(下):YOLO与SSD的“一步到位”哲学

《人工智能AI之计算机视觉:从像素到智能》 模块二:核心感知(上)——2D世界的精细化理解 朋友们好。 在上一篇里,我们聊了R-CNN家族。那是一群像严谨的考古学家一样的算法,讲究“先勘探(找候选区),再鉴定(分类和微调)”。听起来特别靠谱,对吧?这种“两步走”的逻…

作者头像 李华
网站建设 2026/4/14 17:26:26

基于spark的新闻文本分类系统(源码+论文+部署+安装)

感兴趣的可以先收藏起来&#xff0c;还有在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;我会一一回复&#xff0c;希望可以帮到大家。一、程序背景在互联网扩张、智能设备普及与短视频浪潮的推动下&#xff0c;新闻传播数字化趋势显著&#…

作者头像 李华