当前位置: 首页 > news >正文

数据采集-->kafka-->hdfs

数据采集到kafka

flume:

a1.sources = r1
a1.channels = c1a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/installs/flume1.9/job/a.log
a1.sources.r1.positionFile = /opt/installs/flume1.9/job/taildir-kafka.jsona1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
a1.channels.c1.kafka.topic = topica
a1.channels.c1.parseAsFlumeEvent = falsea1.sources.r1.channels = c1

执行命令:

flume-ng agent --conf conf  --name a1 --conf-file job/taildir-kafka.conf -Dflume.root.logger=INFO,console

 向a.log添加测试数据: 

消费者:

 数据从kafka到hdfs

flume:

 

a1.sources = r1
a1.channels = c1 
a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
a1.sources.r1.kafka.topics = topica
a1.sources.r1.kafka.consumer.group.id = g1a1.channels.c1.type = memory
a1.channels.c1.capacity=5000
a1.channels.c1.transactionCapacity=5000a1.sinks.k1.type = hdfs
a1.sinks.k1.batchSize = 5000
a1.sinks.k1.hdfs.path = hdfs://hadoop11:8020/flume/date=%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.rollInterval =0 
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

执行命令:

flume-ng agent --conf conf  --name a1 --conf-file job/kafka-hdfs.conf -Dflume.root.logger=INFO,console

向a.log添加测试数据: 

消费者:

hdfs:

 


http://www.mrgr.cn/news/9257.html

相关文章:

  • 自动化与高效设计:推理技术在FPGA中的应用
  • Obsidian Publish的开源替代品Markopolis
  • [数据集][目标检测]管道漏水泄漏破损检测数据集VOC+YOLO格式2614张4类
  • 《深度学习》 OpenCV 计算机视觉入门 (中篇)
  • 解决WIndows10下更新蓝牙驱动屡屡失败问题
  • C/C++语言基础--结构体知识详解(包括:结构体数组、字节对齐、位段等内容)
  • Git 的配置
  • find 命令
  • WEB渗透Win提权篇-RDPFirewall
  • 79、ansible-----playbook2
  • MacOS如何升级ruby版本?
  • GDB的使用(3)
  • Spring Boot整合MyBatis-Plus的详细讲解
  • uniapp h5手机如何打开本地跑的前端项目进行本地调试
  • (十八)Flink CEP 详解
  • 视频在线去水印解析相册怎么弄,轻松掌握五大技巧
  • 《机器学习》—— 使用过采样方法实现逻辑回归分类问题
  • Embarcadero Dev-C++ 6.3安装教程
  • [学习笔记]深度学习详解-Datawhale学习组
  • Android 为什么子线程不能更新UI