博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Flume】flume于transactionCapacity和batchSize进行详细的分析和质疑的概念
阅读量:5807 次
发布时间:2019-06-18

本文共 2594 字,大约阅读时间需要 8 分钟。

我不知道你用flume读者熟悉无论这两个概念

一开始我是有点困惑,?

没感觉到transactionCapacity的作用啊?

batchSize又是干啥的啊?

……

……

带着这些问题,我们深入源代码来看一下:

batchSize

batchSize这个概念首先它出如今哪里呢?

kafkaSink的process方法

HDFS Sink

Exec Source

通过上面这三张图,相信大家应该知道batchSize从哪来的了

batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。

即一次性你能够处理batchSize个event,这个一次性就是指在一个事务中。

当你处理的event数量超出了batchSize。那么事务就会提交了。

注意,这里有一个隐晦的地方,就是batchSize一定不能大于transactionCapacity

以下再来说说transactionCapacity

首先。从这个图中我们就能够看出transactionCapacity这个概念的来源了,它来自于通道中。不同于batchSize(Source,Sink)

那么。在通道中是怎样使用该事务容量的呢??

内存通道中有个内部类MemoryTransaction

private class MemoryTransaction extends BasicTransactionSemantics {    private LinkedBlockingDeque
takeList; private LinkedBlockingDeque
putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque
(transCapacity); takeList = new LinkedBlockingDeque
(transCapacity); channelCounter = counter; }
这里就用到了事务容量,它就是putList和takeList的容量大小

putList就是用来存放put操作带来的event          channel的put

if (!putList.offer(event)) {        throw new ChannelException(          "Put queue for MemoryTransaction of capacity " +            putList.size() + " full, consider committing more frequently, " +            "increasing capacity or increasing thread count");      }
每一次put前,都会预判put是否成功,从异常的提示信息就能够看出来。put不成功即事务容量满了

takeList存放的event是用来被take操作消耗的,返回拿到的一个event            channel的take

if(takeList.remainingCapacity() == 0) {        throw new ChannelException("Take list for MemoryTransaction, capacity " +            takeList.size() + " full, consider committing more frequently, " +            "increasing capacity, or increasing thread count");      }
take前也会预判,假设takeList已经满了。说明take操作太慢了,出现了event堆积的现象,这时候你应该调整事务容量

什么情况下。事务会提交呢,事务提交做了什么呢??

commit即事务提交

两种情况:

1、put的event提交

while(!putList.isEmpty()) {            if(!queue.offer(putList.removeFirst())) {              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");            }
event所有放到queue中。queue才是真正的flume中event的队列。它的容量是capacity。看上一张图就可以。

2、take的event提交

由于在take操作的时候就已经将event从queue中取出了。而queue中取出的event正是靠put的提交来的

最后。再看看事务是怎样回滚的??

事务回滚针对take操作,你把event拿出去。结果处理失败了,那当然得丢回来,等待下一次处理了!

由于进入了rollback操作,说明commit操作出现异常,也就是commit操作失败了,那putList和takeList两个队列当然也没有被清空

while(!takeList.isEmpty()) {          queue.addFirst(takeList.removeLast());        }
循环将event又一次加入到queue中。

不知道说。大家对这个更清楚它是否??

版权声明:本文博主原创文章。博客,未经同意不得转载。

你可能感兴趣的文章
数据结构——串的朴素模式和KMP匹配算法
查看>>
FreeMarker-Built-ins for strings
查看>>
验证DataGridView控件的数据输入
查看>>
POJ1033
查看>>
argparse - 命令行选项与参数解析(转)
查看>>
一维数组
查看>>
Linux学习笔记之三
查看>>
POJ1061 青蛙的约会(扩展欧几里得)题解
查看>>
关于Android studio团队协同开发连接到已有项目
查看>>
Sql获取表的信息
查看>>
Java-大数据-图汇集
查看>>
一、数论算法
查看>>
Asp.net MVC 中Controller的返回类型大全
查看>>
用一条SQL语句实现斐波那契数列
查看>>
[高中作文赏析]跋涉与成功
查看>>
swift-辞典NSDictionary定义,变化的关键,删/加入关键
查看>>
python----slots属性安全类
查看>>
《Programming WPF》翻译 第5章 1.不使用样式
查看>>
.NET垃圾回收:非托管资源,IDispose和析构函数的结合
查看>>
H2内存数据库 支持存储到文件
查看>>