kafka-> flink-性能问题 - java

我正在查看一些kafka主题,这些主题每秒产生约3万条消息。我有一个flink拓扑设置,可以读取其中之一,聚合一点(5秒的窗口),然后(最终)写入数据库。

当我运行拓扑并删除除读取->聚合步骤之外的所有内容时,我每分钟只能收到约3万条消息。没有任何地方会发生背压。

我究竟做错了什么?

编辑:

我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个。
每个消息都是平均2-3Kb的压缩节俭对象

看来我只能达到〜1.5 MB / s。不能接近所提到的100MB / s。

当前代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = timeData.getOtherId();
        tuple4.f2 = timeData.getSections().size();
        tuple4.f3 = mapId;

        collector.collect(tuple4);
    }
}

参考方案

从代码中,我看到了两个可能导致性能问题的潜在组件:

FlinkKafka消费者
节俭解串器

为了了解瓶颈所在,我首先将测量来自Kafka主题的Flink读取的原始读取性能。

因此,您可以在集群上运行以下代码吗?

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

    dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
        long received = 0;
        long logfreq = 50000;
        long lastLog = -1;
        long lastElements = 0;

        @Override
        public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
            received++;
            if (received % logfreq == 0) {
                // throughput over entire time
                long now = System.currentTimeMillis();

                // throughput for the last "logfreq" elements
                if(lastLog == -1) {
                    // init (the first)
                    lastLog = now;
                    lastElements = received;
                } else {
                    long timeDiff = now - lastLog;
                    long elementDiff = received - lastElements;
                    double ex = (1000/(double)timeDiff);
                    LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                            timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                    // reinit
                    lastLog = now;
                    lastElements = received;
                }
            }
        }
    });

    env.execute("Raw kafka throughput");
}
}

该代码测量从Kafka读取50k元素之间的时间,并记录从Kafka读取的元素数量。
在我的本地计算机上,我的吞吐量约为33万个元素/核心/秒:

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

我真的很想知道您从Kafka读取的吞吐量是多少。

JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java

我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…

如何使用箭头符号(->)创建受保护的方法? - java

当我们编写以下代码时Stream.of(1,2,3,4,5).filter(i -> (i%2 == 0)).map( i -> i*i ); 表达式i -> (i%2 == 0)或i -> i*i将变为私有方法。在我的用例中,编写了一个junit测试,以确保没有方法是私有的(是的,这是强制性的),并且对于这些lambda表达式而言,…

通过索引访问地图? - java

是否可以通过索引访问帐户>?我需要获取地图的第二个元素。 java参考方案 您使用了错误的数据结构。如果需要按键查找,请使用Map。如果需要按索引或插入顺序进行查找,请使用可让您进行索引的内容,例如数组或列表或链表。如果需要通过两者进行查找,则需要创建一个跟踪键和插入顺序的复合数据结构(该实现将由Map和上述数据结构之一支持)。框架中甚至内置了一个:L…

如何在Eclipse中备份用户库? - java

我在一个项目中有100多个罐子。我已经为其中大多数手动定义了源jar的位置,以便javadoc可以在IDE中方便地使用。现在,如果必须在另一台计算机上配置IDE,那么我不想再经历整个过程。我可以备份用户库定义并将其导入到另一个Eclipse实例中吗?只要可以在Eclipse中使用javadoc,并且不需要重复为每个jar链接源jar的手动工作,就可以使用替代…

Python re.sub()-> Java - java

This question is unlikely to help any future visitors; it is only relevant to a small geographic area, a specific moment in time, or an extraordinarily narrow situation that is not…