• linkedu视频
  • 平面设计
  • 电脑入门
  • 操作系统
  • 办公应用
  • 电脑硬件
  • 动画设计
  • 3D设计
  • 网页设计
  • CAD设计
  • 影音处理
  • 数据库
  • 程序设计
  • 认证考试
  • 信息管理
  • 信息安全
菜单
linkedu.com
  • 网页制作
  • 数据库
  • 程序设计
  • 操作系统
  • CMS教程
  • 游戏攻略
  • 脚本语言
  • 平面设计
  • 软件教程
  • 网络安全
  • 电脑知识
  • 服务器
  • 视频教程
  • dedecms
  • ecshop
  • z-blog
  • UcHome
  • UCenter
  • drupal
  • WordPress
  • 帝国cms
  • phpcms
  • 动易cms
  • phpwind
  • discuz
  • 科汛cms
  • 风讯cms
  • 建站教程
  • 运营技巧
您的位置:首页 > CMS教程 >建站教程 > Laravel中Kafka的使用详解

Laravel中Kafka的使用详解

作者:站长图库 字体:[增加 减小] 来源:互联网

站长图库向大家介绍了Laravel,Kafka的使用等相关知识,希望对您有所帮助

这篇文章主要介绍了Laravel中Kafka的使用详解,kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力,有对于消息队列感兴趣的同学可以参考下

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?phpnamespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB;use Monolog\Logger;use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka{    public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka    public $topic = 'test';//管道名称    public $partition = 0;     protected $producer = null;    protected $consumer = null;     public function __construct()    {        if (empty($this->broker_list)) {            throw new InvalidConfigException("broker not config");        }        $rk = new \RdKafka\Producer();        if (empty($rk)) {            throw new InvalidConfigException("producer error");        }        $rk->setLogLevel(LOG_DEBUG);        if (!$rk->addBrokers($this->broker_list)) {            throw new InvalidConfigException("producer error");        }        $this->producer = $rk;    }     /**     * 生产者     * @param array $messages     * @return mixed     */    public function send($messages = [],$topic)    {        $topic = $this->producer->newTopic($topic);        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));    }     /**     * 消费者     */    public function consumer($object, $callback){        $conf = new \RdKafka\Conf();        $conf->set('group.id', 0);        $conf->set('metadata.broker.list', $this->broker_list);             $topicConf = new \RdKafka\TopicConf();        $topicConf->set('auto.offset.reset', 'smallest');             $conf->setDefaultTopicConf($topicConf);             $consumer = new \RdKafka\KafkaConsumer($conf);             $consumer->subscribe([$this->topic]);             echo "waiting for messages.....\n";        while(true) {            $message = $consumer->consume(120*1000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                echo "message payload....";                $object->$callback($message->payload);                break;            }            sleep(1);        }    }}?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

public function test(){     $topic = 'tool';//输入使用管道名称    $data['shop_id'] = 58;    $data['bar_code']=586;    $data['goods_num'] = 1;    $data['goods_unit'] = '个';     $Kafka = new Kafka();    $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json    var_dump($Error_Msg); }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php $conf = new RdKafka\Conf(); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf);$rk->addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("tool", $topicConf);//读取的管道 // Start consuming partition 0$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) {    $message = $topic->consume(0, 120*10000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            //没有错误打印信息            $message = json_decode(json_encode($message),true);            $data = json_decode($message['payload'],true);            var_dump($data);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:           
  


 
分享到:QQ空间新浪微博腾讯微博微信百度贴吧QQ好友复制网址打印

您可能想查找下面的文章:

  • laravel与thinkphp之间的区别与优缺点
  • 说说在Laravel中怎么执行Shell命令 ?
  • Laravel中用Observer事件致Redis队列异常问题
  • 手把手带你使用Vue + Laravel开发一个简单的 CRUD 应用
  • 分享个人推荐的Laravel或其它框架的编程规范
  • Laravel中三种中间件的作用
  • 解决Laravel使用laravel-excel扩展包(maatwebsite/excel)导入报错问题
  • 分享一个顺丰同城配送的扩展包并在laravel中使用
  • 介绍Laravel8路由模块新增missing方法
  • 解析如何进行Laravel表单验证分层设计和验证场景应用

相关文章

  • ThinkPHP支持的四种URL模式:普通模式、PATHINFO、REWRITE和兼容模式
  • Photoshop制作血迹喷溅效果的艺术字
  • 怎样利用Javascript简单实现星空连线的效果
  • PS制作金属字教程
  • Composer怎么执行降级操作?教你降级到版本1
  • 如何清除CentOS6或CentOS7上的磁盘空间
  • Photoshop巧用素材合成绚丽美女海报教程
  • php如何将html转为图片
  • PhotoShop打造水与火的碰撞视觉特效合成教程
  • 关键词选择技巧之长尾关键词法

文章分类

  • dedecms
  • ecshop
  • z-blog
  • UcHome
  • UCenter
  • drupal
  • WordPress
  • 帝国cms
  • phpcms
  • 动易cms
  • phpwind
  • discuz
  • 科汛cms
  • 风讯cms
  • 建站教程
  • 运营技巧

最近更新的内容

    • 微信小程序获取设备信息api示例
    • 宝塔面板升级专业破解版 免费使用所有功能【亲测可用】
    • Thinkphp5分页函数paginate中的each()传入自定义参数
    • 详解Laravel前端工程化之mix
    • 分享实现PHP红包算法的思路(附开发代码)
    • PTcms4.28安装搭建详细教程
    • css中&表示什么意思
    • Photoshop设计立体效果的网站推荐图标
    • 如何给WordPress主题评论框加上阿鲁表情
    • js实现滑动进度条

关于我们 - 联系我们 - 免责声明 - 网站地图

©2020-2025 All Rights Reserved. linkedu.com 版权所有