• linkedu视频
  • 平面设计
  • 电脑入门
  • 操作系统
  • 办公应用
  • 电脑硬件
  • 动画设计
  • 3D设计
  • 网页设计
  • CAD设计
  • 影音处理
  • 数据库
  • 程序设计
  • 认证考试
  • 信息管理
  • 信息安全
菜单
linkedu.com
  • 网页制作
  • 数据库
  • 程序设计
  • 操作系统
  • CMS教程
  • 游戏攻略
  • 脚本语言
  • 平面设计
  • 软件教程
  • 网络安全
  • 电脑知识
  • 服务器
  • 视频教程
  • dedecms
  • ecshop
  • z-blog
  • UcHome
  • UCenter
  • drupal
  • WordPress
  • 帝国cms
  • phpcms
  • 动易cms
  • phpwind
  • discuz
  • 科汛cms
  • 风讯cms
  • 建站教程
  • 运营技巧
您的位置:首页 > CMS教程 >建站教程 > Laravel中Kafka的使用详解

Laravel中Kafka的使用详解

作者:站长图库 字体:[增加 减小] 来源:互联网 时间:2022-04-29

站长图库向大家介绍了Laravel,Kafka的使用等相关知识,希望对您有所帮助

这篇文章主要介绍了Laravel中Kafka的使用详解,kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力,有对于消息队列感兴趣的同学可以参考下

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?phpnamespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB;use Monolog\Logger;use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka{    public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka    public $topic = 'test';//管道名称    public $partition = 0;     protected $producer = null;    protected $consumer = null;     public function __construct()    {        if (empty($this->broker_list)) {            throw new InvalidConfigException("broker not config");        }        $rk = new \RdKafka\Producer();        if (empty($rk)) {            throw new InvalidConfigException("producer error");        }        $rk->setLogLevel(LOG_DEBUG);        if (!$rk->addBrokers($this->broker_list)) {            throw new InvalidConfigException("producer error");        }        $this->producer = $rk;    }     /**     * 生产者     * @param array $messages     * @return mixed     */    public function send($messages = [],$topic)    {        $topic = $this->producer->newTopic($topic);        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));    }     /**     * 消费者     */    public function consumer($object, $callback){        $conf = new \RdKafka\Conf();        $conf->set('group.id', 0);        $conf->set('metadata.broker.list', $this->broker_list);             $topicConf = new \RdKafka\TopicConf();        $topicConf->set('auto.offset.reset', 'smallest');             $conf->setDefaultTopicConf($topicConf);             $consumer = new \RdKafka\KafkaConsumer($conf);             $consumer->subscribe([$this->topic]);             echo "waiting for messages.....\n";        while(true) {            $message = $consumer->consume(120*1000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                echo "message payload....";                $object->$callback($message->payload);                break;            }            sleep(1);        }    }}?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

public function test(){     $topic = 'tool';//输入使用管道名称    $data['shop_id'] = 58;    $data['bar_code']=586;    $data['goods_num'] = 1;    $data['goods_unit'] = '个';     $Kafka = new Kafka();    $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json    var_dump($Error_Msg); }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php $conf = new RdKafka\Conf(); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf);$rk->addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("tool", $topicConf);//读取的管道 // Start consuming partition 0$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) {    $message = $topic->consume(0, 120*10000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            //没有错误打印信息            $message = json_decode(json_encode($message),true);            $data = json_decode($message['payload'],true);            var_dump($data);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:            echo "等待接收信息\n";            break;        case RD_KAFKA_RESP_ERR__TIMED_OUT:            echo "超时\n";            break;        default:            throw new \Exception($message->errstr(), $message->err);            break;    }    sleep(1);} ?>



分享到:QQ空间新浪微博腾讯微博微信百度贴吧QQ好友复制网址打印

您可能想查找下面的文章:

  • laravel与thinkphp之间的区别与优缺点
  • 说说在Laravel中怎么执行Shell命令 ?
  • Laravel中用Observer事件致Redis队列异常问题
  • 手把手带你使用Vue + Laravel开发一个简单的 CRUD 应用
  • 分享个人推荐的Laravel或其它框架的编程规范
  • Laravel中三种中间件的作用
  • 解决Laravel使用laravel-excel扩展包(maatwebsite/excel)导入报错问题
  • 分享一个顺丰同城配送的扩展包并在laravel中使用
  • 介绍Laravel8路由模块新增missing方法
  • 解析如何进行Laravel表单验证分层设计和验证场景应用

相关文章

  • 2022-04-29Javascript如何实现json字符串与对象转换
  • 2022-04-29微信小程序反编译提取源代码方法
  • 2022-04-29详解wordpress非根目录部署nginx关键配置
  • 2022-04-29解决php Composer出现SSL报错问题
  • 2022-04-29Photoshop绘制白色开关插座ICON图标
  • 2022-04-29PHP怎么实现评论回复功能
  • 2022-04-29介绍PHP + MySQL 实现数据分页显示
  • 2022-04-29Photoshop制作可口的饼干文字特效
  • 2022-04-29robots.txt 语法详解:*、$、?等字符的含义及用法
  • 2022-04-29聊聊怎么将小程序项目转为uni-app项目

文章分类

  • dedecms
  • ecshop
  • z-blog
  • UcHome
  • UCenter
  • drupal
  • WordPress
  • 帝国cms
  • phpcms
  • 动易cms
  • phpwind
  • discuz
  • 科汛cms
  • 风讯cms
  • 建站教程
  • 运营技巧

最近更新的内容

    • 介绍thinkPHP配置虚拟域名简化URL路径
    • Javascript中常见的内置对象有哪些
    • 浅析CSS中怎么实现线性渐变(linear-gradient)
    • DedeCMS使用sql语句获取文章链接地址
    • 你知道PHP中重定向网页跳转页面的方法有哪些吗?一起说说吧
    • Photoshop使用素材制作唯美的花体字
    • Photoshop制作逼真的木刻效果艺术字
    • node_modules中如何优雅的修改依赖库?方法介绍
    • PHP如何去掉所有HTML标签?
    • 聊聊ThinkPHP3.2.3从php5升级到php7艰辛之路

关于我们 - 联系我们 - 免责声明 - 网站地图

©2020-2025 All Rights Reserved. linkedu.com 版权所有