Apache Flink:Python流API中的Kafka连接器,“无法加载用户类” - python

我正在尝试Flink的新Python流API,并尝试使用./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py运行我的脚本。 python脚本非常简单,我只是想从一个现有的主题中获取内容并将所有内容发送到stdout(或默认情况下output方法在其中发出输出数据的日志目录中的* .out文件)。

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

我从Maven仓库中抓取了几个jar文件,分别是flink-connector-kafka-0.9_2.11-1.6.1.jarflink-connector-kafka-base_2.11-1.6.1.jarkafka-clients-0.9.0.1.jar,并将它们复制到Flink的lib目录中。除非我误解了文档,否则Flink足以加载kafka连接器。确实,如果我删除了所有这些jar,导入就会失败,但这似乎不足以实际调用该计划。
添加for循环以将其动态添加到sys.path也不起作用。这是在控制台中打印的内容:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

这是我在日志中看到的:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

有没有办法解决此问题并使连接器可用于Python?我怀疑这是Jython的Classloader问题,但我不知道如何进一步研究(同样鉴于我不具备Java知识)。非常感谢。

参考方案

您在这里使用了错误的Kafka用户。在您的代码中,它是FlinkKafkaConsumer09,但是您正在使用的库是flink-connector-kafka-0.11_2.11-1.6.1.jar,用于FlinkKafkaConsumer011。尝试用此FlinkKafkaConsumer09替换FlinkKafkaConsumer011,或使用lib文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前文件。

如何在Flask中生成临时下载? - python

我有一个Flask应用,可让用户下载MP3文件。如何使下载的URL仅在特定时间段内有效?例如,我不想让任何人简单地转到example.com/static/sound.mp3并访问文件,而是希望验证每个请求以防止不必要的带宽。我正在使用Apache服务器,但是如果更容易实现,我可能会考虑切换到另一个服务器。另外,我不想使用Flask来提供文件,因为这会通过迫…

如何在齐柏林飞艇中使用水蟒? - python

我想在齐柏林飞艇中使用水蟒。所以我修改了/zeppelin/conf/zeppelin-env.sh中的配置文件像下面一样。export SPARK_HOME=/home/jin/spark export PYTHONPATH=/home/jin/anaconda3/bin/python export PYSPARK_PYTHON=/home/jin/spa…

为什么Spark的show()函数非常慢? - python

我有df.select("*").filter(df.itemid==itemid).show() 并没有终止,但是如果我这样做print df.select("*").filter(df.itemid==itemid) 不到一秒钟即可打印出来。为什么是这样? 参考方案 这是因为select和filter只是在建立执行…

Python uuid4,如何限制唯一字符的长度 - python

在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…

Spark使用上一行的值将新列添加到数据框 - python

我想知道如何在Spark(Pyspark)中实现以下目标初始数据框:+--+---+ |id|num| +--+---+ |4 |9.0| +--+---+ |3 |7.0| +--+---+ |2 |3.0| +--+---+ |1 |5.0| +--+---+ 结果数据框:+--+---+-------+ |id|num|new_Col| +--+---…