二百三十三、Flume——Flume采集JSON文件到Kafka,再用Flume采集Kafka数据到HDFS中

一、目的

由于使用了新的Kafka协议,因为根据新的协议推送模拟数据到Kafka中,再Flume采集Kafka数据到HDFS中

二、技术选型

(一)Kettle工具

准备使用Kettle的JSON input控件和Kafka producer控件,但是搞了1天没搞定,博客上也找不到相关的资料,后面再研究研究

之前用kettle采集Kafka数据写入HDFS中(不建议使用这种方式采集Kafka数据到HDFS中

(二)Flume工具

三、实施步骤

(一)准备json文件

注意:Json数据用JSOB工具压缩一下,1个完整的JSON为1行

(二)创建Kafka主题

[root@hurys23 bin]# ./kafka-topics.sh  --create --bootstrap-server  192.168.0.23:9092 --topic topic_internal_data_queue  --partitions 1 --replication-factor 1

(三)创建文件目录

[root@gree128 userfriends]# cd  /opt/kb15tmp/

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/queue

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/data/queue

(四)配置Flume采集json任务文件

queue.sources=queueSource
queue.channels=queueChannel
queue.sinks=queueSink

queue.sources.queueSource.type=spooldir
queue.sources.queueSource.spoolDir=/opt/kb15tmp/flumelogfile/queue
queue.sources.queueSource.deserializer=LINE
queue.sources.queueSource.deserializer.maxLineLength=320000
queue.sources.queueSource.includePattern=queue_[0-9]{4}-[0-9]{2}-[0-9]{2}.json

queue.channels.queueChannel.type=file
queue.channels.queueChannel.checkpointDir=/opt/kb15tmp/checkpoint/queue
queue.channels.queueChannel.dataDirs=/opt/kb15tmp/checkpoint/data/queue

queue.sinks.queueSink.type=org.apache.flume.sink.kafka.KafkaSink
queue.sinks.queueSink.batchSize=640
queue.sinks.queueSink.brokerList=192.168.0.23:9092
queue.sinks.queueSink.topic=topic_internal_data_queue

queue.sources.queueSource.channels=queueChannel
queue.sinks.queueSink.channel=queueChannel

(五)修改Flume采集Kafka任务文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.23:9092
a1.sources.s1.kafka.topics = topic_internal_data_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
​​​​

(六)打开监控,拷贝文件

[root@hurys23 kb15tmp]# cp queue.json /opt/kb15tmp/flumelogfile/queue/queue_2024-04-19.json

(七)执行Kafka消费窗口

[root@hurys23 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.23:9092  --topic topic_internal_data_queue  --from-beginning

(八)​​​​​​​运行Flume采集Kafka到HDFS的任务​​​​​​​

[root@hurys23 flume190]# bin/flume-ng agent -n a1  -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

(九)​​​​​​​运行Flume采集json文件到Kafka的任务

[root@hurys23 flume190]# bin/flume-ng agent --name queue --conf ./conf/ --conf-file ./conf/KB15conf/queue.conf   -Dflume.root.logger=INFO,console

(十)查看HDFS文件

(十一)ODS层验证JSON格式是否正确,是否可以解析

--刷新表分区
msck repair table ods_queue;

里面的JSON字段都可以解析出来,搞定!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/558465.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何用idm下载迅雷文件 idm怎么安装到浏览器 idm怎么设置中文

如果不是vip用户使用迅雷下载数据文件,其下载速度是很慢的,有的时候还会被限速,所以很多小伙们就开始使用idm下载迅雷文件,idm这款软件最大的优势就是下载速度快,还有就是具备网页捕获功能,能够下载网页上的…

【uniapp】 合成海报组件

之前公司的同事写过一个微信小程序用的 合成海报的组件 非常十分好用 最近的项目是uni的 把组件改造一下也可以用 记录一下 <template><view><canvas type"2d" class"_mycanvas" id"my-canvas" canvas-id"my-canvas" …

全开源小狐狸Ai系统 小狐狸ai付费创作系统 ChatGPT智能机器人2.7.6免授权版

内容目录 一、详细介绍二、效果展示1.部分代码2.效果图展示 三、学习资料下载 一、详细介绍 测试环境&#xff1a;Linux系统CentOS7.6、宝塔、PHP7.4、MySQL5.6&#xff0c;根目录public&#xff0c;伪静态thinkPHP&#xff0c;开启ssl证书 具有文章改写、广告营销文案、编程…

Windows:web端UI自动化=python+selenium+pycharm框架

本篇写怎么写一个UI自动化代码。mac和Windows是一样的 都是这样写 不过&#xff0c;习惯用Windows了 如果python没有安装可以看我另一篇安装python的教程 先安装python先 下载完python 下载pip 1 安装pip $ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py # 下载…

宝塔面板使用docker+nginx+gunicorn部署Django项目实战教程

第一步&#xff1a;创建Django项目 使用pip install django安装创建django项目的依赖在电脑某个根目录下执行django-admin startproject app创建一个名为app的Django项目。目录结构如下: ├── app │ ├── init.py │ ├── asgi.py │ ├── settings.py │ ├── url…

机器学习:考试复习提纲

该页仅为复习资料&#xff0c;内含博客链接均通过搜索得到。 当然直接访问我的GitHub博客会更方便。 1. 线性回归 Linear Regression https://www.cnblogs.com/geo-will/p/10468253.html 要求1&#xff1a;可以按照自己的理解简述线性回归问题。 回归分析是一种预测性的建模…

buuctf re 37-40

[WUSTCTF2020]Cr0ssfun 打开 #include<iostream> using namespace std; int main() {char a1[32];a1[1] c;a1[25] ; a1[27] e;a1[4] 2;a1[17] r;a1[29] f;a1[17] r;a1[24] _;a1[2] t;a1[9] c;a1[32] };a1[19] v;a1[5] 0;a1[14] n;a1[15] d;a1[8] {;a1[18]…

【Leetcode每日一题】 动态规划 - 地下城游戏(难度⭐⭐⭐)(61)

1. 题目解析 题目链接&#xff1a;174. 地下城游戏 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 一、状态表定义 在解决地下城游戏问题时&#xff0c;我们首先需要对状态进行恰当的定义。一个直观的想法是&#x…

Oracle EBS Interface/API(54)- GL日记账审批

背景: 客户化创建薪酬凭证或者银企付款入账日记账以后,用户希望自动提交审批流程,无需到系统标准功能点击审批,减少用户操作。 快速参考 参考点内容功能导航N: GL->日记账->输入并发请求None基表GL.GL_JE_BATCHESAPI参考下面介绍错误信息表None接口FormNone接口Reque…

wiringpi库的应用 -- sg90 定时器 oled

sg 90舵机: 接线: VCC -- 红 GND -- 地 信号线 -- 黄 -- pwm 定时器: 先玩定时器: sg90 需要的pwm波需要定时器输出&#xff0c;so我们得先来玩一下定时器 分析&#xff1a;实现定时器&#xff0c;通过itimerval结构体以及函数setitimer产生的信号&#xff0c;系统…

Flume在大数据集群下的配置以及监控工具Ganglia的部署安装

前提&#xff1a;需要有三台虚拟机&#xff08;hadoop102,103,104&#xff09;配置好相关基础环境 安装 将安装包上传到/opt/software中 tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/修改 apache-flume-1.9.0-bin 的名称为 flume mv /opt/module/…

Web安全知识

第二章 虚拟机运行架构&#xff1a; 1.寄居结构 2.原生架构 软件 注&#xff1a;Hyper-V是在Windows 2008操作系统上 附录 连接FTP服务器过程&#xff1a; 1.下载了软件&#xff1a; 2.连接到ftp://10.0.105.223/服务器&#xff08;访问老师课堂资源地址&#xff09; 关闭…

Node.JS后端开发笔记整理(简洁版)

前端 1. 开发环境和技术栈 开发工具&#xff1a;Visual Studio CodeNode.js版本&#xff1a;18.19.0&#xff08;建议保持在18&#xff09;包管理器&#xff1a;npm前端框架&#xff1a;Vue3.4脚本语言&#xff1a;TypeScript构建工具&#xff1a;Vite后端框架&#xff1a;Ex…

Django项目使用uwsgi+nginx部署上线

Django项目使用uwsginginx部署上线 前言settings 配置安装uwsgi 和配置uwsgi推荐配置文件启用wsgi不使用nginx的配置&#xff08;不推荐&#xff09;使用nginx的配置 安装 nginx和配置niginx 配置 运行参考资料 前言 代码已经开发完成&#xff0c;正式部署上线 settings 配置…

视频教程下载:用ChatGPT玩转海外抖音TikTok

CHATGPT for TikTok是一门前沿课程&#xff0c;旨在帮助您充分发挥TikTok广告活动的全部潜力。随着数字营销的爆炸性增长&#xff0c;企业需要使用先进的工具来保持竞争优势。在这门课程中&#xff0c;您将学习如何利用CHATGPT——一种先进的人工智能工具——来创建与目标受众产…

潜藏10年的恶意软件被发现;利用漏洞在K8S上挖矿;AWS、Google和Azure 出现信息泄露危机 | 安全周报0419

关键词&#xff1a;OfflRouter、恶意软件、VBA宏病毒、机密文件、可执行文件、iOS间谍软件、LightSpy、F_Warehouse、Azure CLI、AWS CLI、Google Cloud CLI 1. 近十年来&#xff0c;OfflRouter恶意软件在乌克兰一直未被发现 自2015年以来&#xff0c;部分乌克兰政府网络一直…

【软考】软件设计师中级

视频课 计算机组成原理 进制转换 定点数vs浮点数 校验码 计算机体系结构 指令系统 I/O 存储系统 直接映射&#xff1a;简单粗暴的死板派 全相联映射&#xff1a;跳脱的自由发挥派 组相联映射&#xff1a;折中派&#xff0c;组间直接映射&组内全相联映射 命中率&#xf…

你的mongodb客户端是哪个呢?

MongoDB 是一种流行的文档数据库&#xff0c;它可以支持多种场景和应用。有很多客户端工具可以用来管理和操作 MongoDB&#xff0c;以下是一些常用的工具&#xff0c;以及它们的介绍&#xff1a; 一、MongoDB Shell MongoDB Shell 是连接&#xff08;和使用&#xff09;MongoD…

【银角大王——Django课程Day1】

Django框架第一课 安装Django框架方式一&#xff08;命令行的形式创建Django项目&#xff09;方式二&#xff08;适合企业版的pycharm&#xff09;默认文件介绍app文件介绍快速上手我的导包一直爆红是因为我没使用解释器&#xff0c;没导入包&#xff0c;去设置里面导入包即可—…

(保姆级教学)跨站请求伪造漏洞

1. CSRF漏洞 CSRF&#xff08;Cross-site request forgery&#xff09;跨站请求伪造&#xff0c;也被称为One Click Attack 或者Session Riding&#xff0c;通常缩写为CSRF或者XSRF&#xff0c;是一种对网站的恶意利用。尽管听起来像跨站脚本&#xff08;XSS&#xff09;&…