NoClassDefFoundError:kafka / api / OffsetRequest - java

我正在尝试使用apache storm,kafka和trident编写用于实时处理的应用程序
但是在初始化TridentKafkaConfig时我看到此错误

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
at storm.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:30)
at spout.TestSpout.<clinit>(TestSpout.java:22)
at IOTTridentTopology.initializeTridentTopology(IOTTridentTopology.java:31)
at IOTTridentTopology.main(IOTTridentTopology.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 10 more

我的壶嘴课是

public class TestSpout extends OpaqueTridentKafkaSpout {

private static TridentKafkaConfig config;
private static BrokerHosts HOSTS = new ZkHosts(TridentConfig.ZKHOSTS);
private static String TOPIC = "test";
private static int BUFFER_SIZE = TridentConfig.BUFFER_SIZE;

static{
    config = new TridentKafkaConfig(HOSTS, TOPIC);
    config.scheme = new SchemeAsMultiScheme(new RawScheme());
    config.bufferSizeBytes = BUFFER_SIZE;
}

public TestSpout(TridentKafkaConfig config) {
    super(config);
}

public TestSpout() {
    super(config);
}
}

主类:

 public static void main(String[] args) {
    initializeTridentTopology();
}

private static void initializeTridentTopology() {
    TridentTopology topology = new TridentTopology();
    TestSpout spout = new TestSpout();
    //////////////// test  //////////////////////

    topology.newStream("testspout", spout).each(spout.getOutputFields(), new TestFunction(), new Fields());

    ///////////////  end test ///////////////////

    LocalCluster cluster = new LocalCluster();

    Config config = new Config();
    config.setDebug(false);
    config.setMaxTaskParallelism(1);
    config.registerSerialization(storm.kafka.trident.GlobalPartitionInformation.class);
    config.registerSerialization(java.util.TreeMap.class);
    config.setNumWorkers(5);

    config.setFallBackOnJavaSerialization(true);

    cluster.submitTopology("KafkaTrident", config, topology.build());

}

和我的pom.xml:

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd“>
4.0.0

<groupId>IOT</groupId>
<artifactId>ver0.1</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>


    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.3</version>
    </dependency>


    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>0.9.3</version>
    </dependency>



</dependencies>

我正在尝试不同版本的storm-kafka(0.9.3和0.9.4和0.9.5和0.9.6和0.10.0)和Storm-core(9.3和9.4和9.6)

但是我仍然看到我以前的错误

通过谷歌搜索我发现了这个链接但是...

ClassNotFoundException: kafka.api.OffsetRequest

参考方案

经过一番谷歌搜索后我找到了此链接
https://github.com/wurstmeister/storm-kafka-0.8-plus-test
并在pom.xml文件中找到了我的答案

通过添加此代码并找到兼容的kafka版本,所有问题均已解决

 <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.9.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

检查Optional中是否存在null属性,并返回String Java Stream API - java

我有以下class Person private String firstName; private String familyName; // Setters and Getters 我有以下方法public String getFullName(Optional<Person> persons) { return persons .map(p…

Java 8-Streams API-在LIST上操作 - java

我有一个包含以下内容的ArrayList-CHENNAI,MUMBAI,DELHI,CHENNAI,DELHI,CHENNAI。另外,假设将这3个城市名称定义为ENUM,并针对每个城市指定一个值。 (CHENNAI = 1,MUMBAI = 2和DELHI = 3)在Java 8 Streams中,我能够计算城市值的总和,在这种情况下,城市值的总和将为1 +…

JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java

我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…

kafka-> flink-性能问题 - java

我正在查看一些kafka主题,这些主题每秒产生约3万条消息。我有一个flink拓扑设置,可以读取其中之一,聚合一点(5秒的窗口),然后(最终)写入数据库。当我运行拓扑并删除除读取->聚合步骤之外的所有内容时,我每分钟只能收到约3万条消息。没有任何地方会发生背压。我究竟做错了什么?编辑:我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个…

Java 8根据条件应用流过滤器 - java

在Java 8中,有一种方法可以根据条件将过滤器应用于流,例我有这个流if (isAccessDisplayEnabled) { src = (List < Source > ) sourceMeta.getAllSources.parallelStream() .filter(k - > isAccessDisplayEnabled((S…