我正在查看一些kafka主题,这些主题每秒产生约3万条消息。我有一个flink拓扑设置,可以读取其中之一,聚合一点(5秒的窗口),然后(最终)写入数据库。
当我运行拓扑并删除除读取->聚合步骤之外的所有内容时,我每分钟只能收到约3万条消息。没有任何地方会发生背压。
我究竟做错了什么?
编辑:
我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个。
每个消息都是平均2-3Kb的压缩节俭对象
看来我只能达到〜1.5 MB / s。不能接近所提到的100MB / s。
当前代码路径:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = timeData.getOtherId();
tuple4.f2 = timeData.getSections().size();
tuple4.f3 = mapId;
collector.collect(tuple4);
}
}
参考方案
从代码中,我看到了两个可能导致性能问题的潜在组件:
FlinkKafka消费者
节俭解串器
为了了解瓶颈所在,我首先将测量来自Kafka主题的Flink读取的原始读取性能。
因此,您可以在集群上运行以下代码吗?
public class RawKafka {
private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
long received = 0;
long logfreq = 50000;
long lastLog = -1;
long lastElements = 0;
@Override
public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
received++;
if (received % logfreq == 0) {
// throughput over entire time
long now = System.currentTimeMillis();
// throughput for the last "logfreq" elements
if(lastLog == -1) {
// init (the first)
lastLog = now;
lastElements = received;
} else {
long timeDiff = now - lastLog;
long elementDiff = received - lastElements;
double ex = (1000/(double)timeDiff);
LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
// reinit
lastLog = now;
lastElements = received;
}
}
}
});
env.execute("Raw kafka throughput");
}
}
该代码测量从Kafka读取50k元素之间的时间,并记录从Kafka读取的元素数量。
在我的本地计算机上,我的吞吐量约为33万个元素/核心/秒:
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
我真的很想知道您从Kafka读取的吞吐量是多少。
JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…
如何使用箭头符号(->)创建受保护的方法? - java当我们编写以下代码时Stream.of(1,2,3,4,5).filter(i -> (i%2 == 0)).map( i -> i*i ); 表达式i -> (i%2 == 0)或i -> i*i将变为私有方法。在我的用例中,编写了一个junit测试,以确保没有方法是私有的(是的,这是强制性的),并且对于这些lambda表达式而言,…
通过索引访问地图? - java是否可以通过索引访问帐户>?我需要获取地图的第二个元素。 java参考方案 您使用了错误的数据结构。如果需要按键查找,请使用Map。如果需要按索引或插入顺序进行查找,请使用可让您进行索引的内容,例如数组或列表或链表。如果需要通过两者进行查找,则需要创建一个跟踪键和插入顺序的复合数据结构(该实现将由Map和上述数据结构之一支持)。框架中甚至内置了一个:L…
如何在Eclipse中备份用户库? - java我在一个项目中有100多个罐子。我已经为其中大多数手动定义了源jar的位置,以便javadoc可以在IDE中方便地使用。现在,如果必须在另一台计算机上配置IDE,那么我不想再经历整个过程。我可以备份用户库定义并将其导入到另一个Eclipse实例中吗?只要可以在Eclipse中使用javadoc,并且不需要重复为每个jar链接源jar的手动工作,就可以使用替代…
Python re.sub()-> Java - javaThis question is unlikely to help any future visitors; it is only relevant to a small geographic area, a specific moment in time, or an extraordinarily narrow situation that is not…