标签:apache-kafka
-
如何使用Python在Apache Kafka中以编程方式创建主题 - python
到目前为止,我还没有看到没有使用配置选项自动创建主题的显式实现主题创建的python客户端。 参考方案 您可以使用 kafka-python 或 confluent_kafka 客户端(这是librdkafka的轻量级包装器)以编程方式创建主题。 使用 kafka-python from kafka.admin import KafkaAdminClient […]
-
消费消息时如何访问Kafka标头? - java
以下是我的配置 <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel […]
-
如果我不关闭kafka生产者,会发生什么? - java
我正在处理xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的send方法是异步的,因此,有时当我关闭生产者时它抛出了java.lang.IllegalStateException: Cannot send after the producer is closed.我读过的地方可以让生产者打开 […]
-
Kafka结构化流KafkaSourceProvider无法实例化 - java
我正在一个流项目中,我有这样的ping统计信息的kafka流: 64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=1 ttl=62 time=0.913 ms 64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp […]
-
Apache Flink:Python流API中的Kafka连接器,“无法加载用户类” - python
我正在尝试Flink的新Python流API,并尝试使用./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py运行我的脚本。 python脚本非常简单,我只是想从一个现有的主题中获取内容并将所有内容发送到stdout(或默认情况下output方法在其中发出输出数据的日志目录中的* .ou […]
-
NoClassDefFoundError:kafka / api / OffsetRequest - java
我正在尝试使用apache storm,kafka和trident编写用于实时处理的应用程序但是在初始化TridentKafkaConfig时我看到此错误 Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest at storm. […]
-
kafka-> flink-性能问题 - java
我正在查看一些kafka主题,这些主题每秒产生约3万条消息。我有一个flink拓扑设置,可以读取其中之一,聚合一点(5秒的窗口),然后(最终)写入数据库。 当我运行拓扑并删除除读取->聚合步骤之外的所有内容时,我每分钟只能收到约3万条消息。没有任何地方会发生背压。 我究竟做错了什么? 编辑: 我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且 […]
-
Kafka SaslAuthenticationException SASL_SSL协议的临时发生 - java
我正在运行一些Java 8 Kafka应用程序,其中一些是Kafka流,其他则是普通的生产者/消费者。 对于它们中的每一个,都没有功能性问题,它们在大多数情况下运行良好。 但是,对于他们每个人,我都在随机出现SaslAuthenticationException。由于它们每两周左右发生一次,因此我不确定如何复制/推断出根本原因: 错误: Failed to […]
-
如何从Kafka用Python解码/反序列化Avro - python
我正在使用Python(使用Confluent Kafka Python库的使用者)从远程服务器接收Kafka Avro消息,该消息使用json字典表示点击流数据,并带有用户代理,位置,URL等字段。这是消息的样子: b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xx […]
-
如何通过kafka流加入主题 - java
卡夫卡流我正在尝试流式传输,但存在一些问题,它不起作用。首先,我有3个连接器,但不能使用自己的钥匙。我需要钥匙才能加入它们,对不对?如何使用2个或更多钥匙加入?我尝试复制这样的内容: 选择*从(选择a。*从用户a内部联接部门b 在a.dep = b.dep和a.group = b.group上 )a.id = b.id上的内部联接user_afy 我想将内部 […]