Flume进阶

news/2024/5/21 3:10:51

目录

第1关:拦截器的使用

第2关:自定义拦截器


第1关:拦截器的使用

代码文件:


# Define source, channel, sink
#agent名称为a1# Define source
#source类型配置为avro,监听8888端口,后台会自动发送数据到该端口
#拦截后台发送过来的数据,将y.开头的保留下来# Define channel
#channel配置为memery# Define sink
#落地到 hdfs://localhost:9000/flume目录下
#根据时间落地,3s
#数据格式DataStreama1.sources = source1
a1.sinks = sink1
a1.channels = channel1# 配置source组件
a1.sources.source1.type = avro
a1.sources.source1.bind  = 127.0.0.1a1.sources.source1.port  =  8888
##定义文件上传完后的后缀,默认是.COMPLETED
a1.sources.source1.fileSuffix=.FINISHED
##默认是2048,如果文件行数据量超过2048字节(1k),会被截断,导致数据丢失
a1.sources.source1.deserializer.maxLineLength=5120#正则过滤拦截器a1.sources.source1.interceptors = i1a1.sources.source1.interceptors.i1.type = regex_filtera1.sources.source1.interceptors.i1.regex = ^y.*#如果excludeEvents设为false,表示过滤掉不是以A开头的events。#如果excludeEvents设为true,则表示过滤掉以A开头的events。a1.sources.source1.interceptors.i1.excludeEvents = false
# 配置sink组件
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path =hdfs://localhost:9000/flume
#上传文件的前缀
a1.sinks.sink1.hdfs.filePrefix = FlumeData.
#上传文件的后缀
a1.sinks.sink1.hdfs.fileSuffix = .log
#积攒多少个Event才flush到HDFS一次
a1.sinks.sink1.hdfs.batchSize= 100
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.hdfs.writeFormat =Text## roll:滚动切换:控制写文件的切换规则
## 按文件体积(字节)来切
a1.sinks.sink1.hdfs.rollSize = 512000
## 按event条数切   
a1.sinks.sink1.hdfs.rollCount = 1000000
## 按时间间隔切换文件,多久生成一个新的文件
a1.sinks.sink1.hdfs.rollInterval = 4## 控制生成目录的规则
a1.sinks.sink1.hdfs.round = true
##多少时间单位创建一个新的文件夹
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute#是否使用本地时间戳
a1.sinks.sink1.hdfs.useLocalTimeStamp = true# channel组件配置
a1.channels.channel1.type = memory
## event条数
a1.channels.channel1.capacity = 500000
##flume事务控制所需要的缓存容量600条event
a1.channels.channel1.transactionCapacity = 600# 绑定source、channel和sink之间的连接
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

命令行代码:


start-dfs.sh
hadoop dfs -mkdir /flume

第2关:自定义拦截器

代码文件:

package com.yy;
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;public class RegexExtractorExtInterceptor implements Interceptor {  static final String REGEX = "regex";  static final String SERIALIZERS = "serializers";  // 增加代码开始  static final String EXTRACTOR_HEADER = "extractorHeader";  static final boolean DEFAULT_EXTRACTOR_HEADER = false;  static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey";  // 增加代码结束  private static final Logger logger = LoggerFactory  .getLogger(RegexExtractorExtInterceptor.class);  private final Pattern regex;  private final List<NameAndSerializer> serializers;  // 增加代码开始  private final boolean extractorHeader;  private final String extractorHeaderKey;  // 增加代码结束  private RegexExtractorExtInterceptor(Pattern regex,  List<NameAndSerializer> serializers, boolean extractorHeader,  String extractorHeaderKey) {  this.regex = regex;  this.serializers = serializers;  this.extractorHeader = extractorHeader;  this.extractorHeaderKey = extractorHeaderKey;  }  @Override  public void initialize() {  // NO-OP...  }  @Override  public void close() {  // NO-OP...  }  @Override  public Event intercept(Event event) {  String tmpStr;  if(extractorHeader)  {  tmpStr = event.getHeaders().get(extractorHeaderKey);  }  else  {  tmpStr=new String(event.getBody(),  Charsets.UTF_8);  }  Matcher matcher = regex.matcher(tmpStr);  Map<String, String> headers = event.getHeaders();  if (matcher.find()) {  for (int group = 0, count = matcher.groupCount(); group < count; group++) {  int groupIndex = group + 1;  if (groupIndex > serializers.size()) {  if (logger.isDebugEnabled()) {  logger.debug(  "Skipping group {} to {} due to missing serializer",  group, count);  }  break;  }  NameAndSerializer serializer = serializers.get(group);  if (logger.isDebugEnabled()) {  logger.debug("Serializing {} using {}",  serializer.headerName, serializer.serializer);  }  headers.put(serializer.headerName, serializer.serializer  .serialize(matcher.group(groupIndex)));  }  }  return event;  }  @Override  public List<Event> intercept(List<Event> events) {  List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());  for (Event event : events) {  Event interceptedEvent = intercept(event);  if (interceptedEvent != null) {  intercepted.add(interceptedEvent);  }  }  return intercepted;  }  public static class Builder implements Interceptor.Builder {  private Pattern regex;  private List<NameAndSerializer> serializerList;  // 增加代码开始  private boolean extractorHeader;  private String extractorHeaderKey;  // 增加代码结束  private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();  @Override  public void configure(Context context) {  String regexString = context.getString(REGEX);  Preconditions.checkArgument(!StringUtils.isEmpty(regexString),  "Must supply a valid regex string");  regex = Pattern.compile(regexString);  regex.pattern();  regex.matcher("").groupCount();  configureSerializers(context);  // 增加代码开始  extractorHeader = context.getBoolean(EXTRACTOR_HEADER,  DEFAULT_EXTRACTOR_HEADER);  if (extractorHeader) {  extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);  Preconditions.checkArgument(  !StringUtils.isEmpty(extractorHeaderKey),  "必须指定要抽取内容的header key");  }  // 增加代码结束  }  private void configureSerializers(Context context) {  String serializerListStr = context.getString(SERIALIZERS);  Preconditions.checkArgument(  !StringUtils.isEmpty(serializerListStr),  "Must supply at least one name and serializer");  String[] serializerNames = serializerListStr.split("\\s+");  Context serializerContexts = new Context(  context.getSubProperties(SERIALIZERS + "."));  serializerList = Lists  .newArrayListWithCapacity(serializerNames.length);  for (String serializerName : serializerNames) {  Context serializerContext = new Context(  serializerContexts.getSubProperties(serializerName  + "."));  String type = serializerContext.getString("type", "DEFAULT");  String name = serializerContext.getString("name");  Preconditions.checkArgument(!StringUtils.isEmpty(name),  "Supplied name cannot be empty.");  if ("DEFAULT".equals(type)) {  serializerList.add(new NameAndSerializer(name,  defaultSerializer));  } else {  serializerList.add(new NameAndSerializer(name,  getCustomSerializer(type, serializerContext)));  }  }  }  private RegexExtractorInterceptorSerializer getCustomSerializer(  String clazzName, Context context) {  try {  RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class  .forName(clazzName).newInstance();  serializer.configure(context);  return serializer;  } catch (Exception e) {  logger.error("Could not instantiate event serializer.", e);  Throwables.propagate(e);  }  return defaultSerializer;  }  @Override  public Interceptor build() {  Preconditions.checkArgument(regex != null,  "Regex pattern was misconfigured");  Preconditions.checkArgument(serializerList.size() > 0,  "Must supply a valid group match id list");  return new RegexExtractorExtInterceptor(regex, serializerList,  extractorHeader, extractorHeaderKey);  }  }  static class NameAndSerializer {  private final String headerName;  private final RegexExtractorInterceptorSerializer serializer;  public NameAndSerializer(String headerName,  RegexExtractorInterceptorSerializer serializer) {  this.headerName = headerName;  this.serializer = serializer;  }  }  
}  

flume.conf 的配置具体步骤:

1.首先点击右上角的文件夹找到Flume1

2.点击图片中的flume.conf

3.点击后将以下的配置文件进行配置

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'# Define source, channel, sink
#agent名为a1;# Define and configure an Spool directory source
#采集 /opt/flume/data目录下所有文件# Configure channel
#channel选择memery# Define and configure a hdfs sink
#落地到hdfs的hdfs://localhost:9000/flume/文件名的前缀/文件名上的日期
#文件格式设为DataStream
#根据时间回滚,3s
a1.sources=source1  
a1.channels=channel1  
a1.sinks=sink1  
a1.sources.source1.type=spooldir  
a1.sources.source1.spoolDir=/opt/flume/data
a1.sources.source1.fileHeader=true  
a1.sources.source1.basenameHeader=true  
a1.sources.source1.interceptors=i1  
a1.sources.source1.interceptors.i1.type=com.yy.RegexExtractorExtInterceptor$Builder  
a1.sources.source1.interceptors.i1.regex=(.*)\\.(.*)\\.(.*)  
a1.sources.source1.interceptors.i1.extractorHeader=true  
a1.sources.source1.interceptors.i1.extractorHeaderKey=basename  
a1.sources.source1.interceptors.i1.serializers=s1 s2 s3  
a1.sources.source1.interceptors.i1.serializers.s1.name=one  
a1.sources.source1.interceptors.i1.serializers.s2.name=two  
a1.sources.source1.interceptors.i1.serializers.s3.name=three  
a1.sources.source1.channels=channel1  
a1.sinks.sink1.type=hdfs  
a1.sinks.sink1.channel=channel1  
a1.sinks.sink1.hdfs.path=hdfs://localhost:9000/flume/%{one}/%{three}  
a1.sinks.sink1.hdfs.round=true  
a1.sinks.sink1.hdfs.roundValue=10  
a1.sinks.sink1.hdfs.roundUnit=minute  
a1.sinks.sink1.hdfs.fileType=DataStream  
a1.sinks.sink1.hdfs.writeFormat=Text  
a1.sinks.sink1.hdfs.rollInterval=0  
a1.sinks.sink1.hdfs.rollSize=10240  
a1.sinks.sink1.hdfs.rollCount=0  
a1.sinks.sink1.hdfs.idleTimeout=60  
a1.channels.channel1.type=memory  
a1.channels.channel1.capacity=10000  
a1.channels.channel1.transactionCapacity=1000  
a1.channels.channel1.keep-alive=30  

4.最后ctrl+s进行保存测评即可。


http://www.mrgr.cn/p/45567237

相关文章

论文阅读:《Sequence can Secretly Tell You What to Discard》,减少推理阶段的 kv cache

目前各类大模型都支持长文本&#xff0c;例如 kimi chat 以及 gemini pro&#xff0c;都支持 100K 以及更高的上下文长度。但越长的上下文&#xff0c;在推理过程中需要存储的 kv cache 也越多。假设&#xff0c;数据的批次用 b 表示&#xff0c;输入序列的长度仍然用 s 表示&a…

聚观早报 | 苹果新款iPad Pro发布;国产特斯拉4月交付量

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 5月9日消息 苹果新款iPad Pro发布 国产特斯拉4月交付量 iOS 18新功能爆料 真我GT Neo6续航细节 三星Galaxy Z F…

#Scurm冲刺第五天

Scurm冲刺第五天 1. 站立式会议内容昨日已完成任务 今日计划完成任务前端UI设计代码编写(收藏页面,商品详情页,个人中心页) 前端UI设计代码编写(购物车页面,订单页面,订单详情页,搜索后商品展示页),前端界面合理跳转功能实现后端管理员模块功能实现(登录注册功能,用户…

m1_day7

课程内容:数组的排序引用数据类型的数组面向对象封装继承多态数组的排序:手动排序 冒泡排序 *自动排序Arrays.sort(数组对象);只能升序排序import java.util.*;引用数据类型的数组:当我们创建一个引用数据类型的数组的时候 其实里面一个对象都没有 里面都是默认值null 为了防…

数据分析:基于sparcc的co-occurrence网络

介绍 Sparcc是基于16s或metagenomics数据等计算组成数据之间关联关系的算法。通常使用count matrix数据。 安装Sparcc软件 git clone gitgithub.com:JCSzamosi/SparCC3.git export PATH/path/SparCC3:$PATHwhich SparCC.py导入数据 注&#xff1a;使用rarefy抽平的count ma…

『先进技术助力』Kompas AI:智能AI代理在工作中的应用与效率提升

『智能化未来』Kompas AI如何改变我们的工作方式&#xff1f; 在这个信息时代&#xff0c;利用AI聊天机器人来处理机械性的工作已经成为一种趋势。ChatGPT作为一种智能助手&#xff0c;不仅能够提高工作效率&#xff0c;还可以帮助我们更明智地做出决策&#xff0c;从而释放出更…

【视频】多元线性回归模型原理讲解与R语言实例

原文链接:https://tecdat.cn/?p=36149 原文出处:拓端数据部落公众号 分析师:Xue Yang 近年来,随着计量经济学和统计学的快速发展,回归模型作为一种有效的数据分析工具,被广泛应用于金融市场的分析中。回归模型能够通过建立变量之间的数学关系,揭示变量之间的相互作用机…

Python随机波动性SV模型:贝叶斯推断马尔可夫链蒙特卡洛MCMC分析英镑/美元汇率时间序列数据

全文链接:https://tecdat.cn/?p=33885 原文出处:拓端数据部落公众号 本文描述了帮助客户使用马尔可夫链蒙特卡洛(MCMC)方法通过贝叶斯方法估计基本的单变量随机波动模型,就像Kim等人(1998年)所做的那样。 定义模型以及从条件后验中抽取样本的函数的代码也在Python脚本中…

Hbase 常用shell操作

目录 1、创建表 1.1、启动HBase Shell 1.2、创建表 1.3、查看表 1.4、删除表 2、插入数据 2.1、put命令 3、查看数据 3.1、get命令 3.2、查询数据中文显示 4、更新数据 4.1、使用put来更新数据 5、删除数据 5.1、delete命令 5.2、删除指定列的数据 5.3、delete…

vs2019 里 C++ 20规范的 string 类的源码注释

&#xff08;1&#xff09;读源码&#xff0c;可以让我们更好的使用这个类&#xff0c;掌握这个类&#xff0c;知道咱们使用了库代码以后&#xff0c;程序大致具体是怎么执行的。而不用担心程序出不知名的意外的问题。也便于随后的代码调试。 string 类实际是 库中 basic_strin…

c语言题库之序列合并

文章目录 前言C语言题目&#xff1a;分析1. 合并逻辑2.图解合并逻辑 代码实现注意事项总结思考 前言 在编程中&#xff0c;我们经常遇到需要将两个有序序列合并为一个有序序列的问题。下面&#xff0c;我们就来详细探讨一下如何解决这个问题&#xff0c;包括输入处理、合并逻辑…

团队作业4--项目冲刺 第4篇 Scrum 冲刺博客

这个作业属于哪个课程 软件工程这个作业要求在哪里 团队作业4——项目冲刺这个作业的目标 团队完成任务的分配,明确团队每个人在接下来七天敏捷冲刺的目标其他参考文献这个作业所属团队 SuperNewCode团队成员 张楠 曾琳备 黄铭涛 张小宇 周广1.每日举行站立时会议2.燃尽图3.每…

详细介绍ARM-ORACLE Database 19c数据库下载

目录 1. 前言 2. 获取方式 2.1 ORACLE专栏 2.2 ORACLE下载站点 1. 前言 现有网络上已有非常多关于ORACLE数据库机下载的介绍&#xff0c;但对于ARM平台的介绍不多&#xff0c;借此机会我将该版的下载步骤做如下说明&#xff0c;希望能够一些不明之人提供帮助和参考 2. 获…

网络管理实验四、SNMP协议分析

1 实验概括 实验目的 捕获SNMP报文&#xff0c;通过报文分析进一步理解SNMP的报文结构、MIB-2树的结构、理解管理信息结构SMI及其规定的ASN.1。 实验内容 1、自行挑选两个网管对象&#xff0c;分别使用get&#xff0c;get-next取其值。 2、使用抓包软件抓取数据包。 3、分析并…

04-23 周二 shell环境下读取使用jq 读取json文件

04-23 周二 shell环境下读取使用jq 读取json文件 时间版本修改人描述04-23V0.1宋全恒新建文档 简介 工具列表 Shell脚本处理JSON数据工具jq jshon是另外一个读取json数据的工具 而且其支持XML和YAML格式文件 linux shell环境下处理yml文件 #!/bin/bash# 加载shyaml库 . /…

NewStarCTF 2023 week1 writeup

NewStarCTF 2023 week1 writeup Web 1.泄漏的秘密 url/robots.txt查看robots协议,找到第一部分的flag PART ONE: flag{r0bots_1s_s0_us3ful url/www.zip查看网站备份,找到第二部分的flag $PART_TWO = "_4nd_www.zip_1s_s0_d4ng3rous}"; flag:flag{r0bots_1s_s0_us3…

敏捷冲刺-5月9日

敏捷冲刺-Day-04所属课程 软件工程2024作业要求 团队作业4—项目冲刺作业目标 完成第 3 篇 Scrum 冲刺博客冲刺日志集合贴 https://www.cnblogs.com/YXCS-cya/p/181788031.项目燃尽图 1.1 第四日-5月9日进度 当前进度逐渐加快2.会议记录 2.1 会议主题 第 4 天 Scrum 冲刺-项目中…

解决mybatis的配置文件没代码提示的问题

1.将org.apache.ibatis.builder.xml包里的两个dtd文件复制出来&#xff0c;jar包里复制 2.复制dtd的url地址&#xff1a; http://mybatis.org/dtd/mybatis-3-mapper.dtd 一样的做法&#xff01; 3.关闭两个配置文件&#xff0c;重新打开&#xff0c;就可以有代码提示了&…

基于Springboot+Vue的Java项目-毕业就业信息管理系统开发实战(附演示视频+源码+LW)

大家好&#xff01;我是程序员一帆&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &am…

鸿蒙开发接口Ability框架:【@ohos.application.Want (Want)】

Want Want模块提供系统的基本通信组件的能力。 说明&#xff1a; 本模块首批接口从API version 8 开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import Want from ohos.application.Want; 开发前请熟悉鸿蒙开发指导文档&#xff1…