Kafka SaslAuthenticationException SASL_SSL协议的临时发生 - java

我正在运行一些Java 8 Kafka应用程序,其中一些是Kafka流,其他则是普通的生产者/消费者。

对于它们中的每一个,都没有功能性问题,它们在大多数情况下运行良好。

但是,对于他们每个人,我都在随机出现SaslAuthenticationException。由于它们每两周左右发生一次,因此我不确定如何复制/推断出根本原因:

错误:

Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

堆栈跟踪:

Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:441) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:135) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:244) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:194) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:289) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:280) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:215) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508) [kafka-clients-2.0.1.jar!/:na]

这就是我向Kafka提供JAAS Kerberos身份验证的方式(在我的配置文件中,我提供了诸如kdc,realm,keytab,principal之类的信息):

@Value("${kafka.sasl.kerberos.kdc}")
private String kdc;
@Value("${kafka.sasl.kerberos.realm}")
private String realm;
@Value("${kafka.sasl.kerberos.keytab}")
private Resource keytab;
@Value("${kafka.sasl.kerberos.principal}")
private String principal;

@Bean
public InMemoryConfiguration kafkaOpts() throws IOException {

    System.setProperty("java.security.krb5.kdc", kdc);
    System.setProperty("java.security.krb5.realm", realm);

    Map<String, Object> options = new HashMap<>();
    options.put("keyTab", copyResourceToTempFile(keytab, ".keytab").toString());
    options.put("principal", principal);
    options.put("useKeyTab","true");
    options.put("storeKey","true");
    AppConfigurationEntry kafkaClientConfig = new AppConfigurationEntry(
            "com.sun.security.auth.module.Krb5LoginModule", LoginModuleControlFlag.REQUIRED, options);
    Map<String, AppConfigurationEntry[]> jaasConfigEntries = new HashMap<>();
    jaasConfigEntries.put("KafkaClient", new AppConfigurationEntry[] {kafkaClientConfig});
    InMemoryConfiguration jaasConfig = new InMemoryConfiguration(jaasConfigEntries);
    javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
    return jaasConfig;
}

public static Path copyResourceToTempFile(Resource resource, String extension) {

    try (InputStream in = resource.getInputStream()) {
        Path tempFile = Files.createTempFile("spring-boot-", extension);
        Files.copy(in, tempFile, StandardCopyOption.REPLACE_EXISTING);
        return tempFile;
    } catch (IOException e) {
        log.error("Error creating resource to file",e);
        return null;
    }

}

参考方案

我们也面临同样的问题。与kafka连接后,突然我们失去了连接,应用程序重试登录,但失败。
但是我们使用以下配置更新jaas auth配置文件:

com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;

这对我来说是完美的。

如何通过kafka流加入主题 - java

卡夫卡流我正在尝试流式传输,但存在一些问题,它不起作用。首先,我有3个连接器,但不能使用自己的钥匙。我需要钥匙才能加入它们,对不对?如何使用2个或更多钥匙加入?我尝试复制这样的内容: 选择*从(选择a。*从用户a内部联接部门b 在a.dep = b.dep和a.group = b.group上 )a.id = b.id上的内部联接user_afy我想将内部联…

使用Apache Kafka 0.10.0 API和Java创建Kafka经纪人集群 - java

我想使用Kafka 0.10 API和Java来创建代理群集。据我所读kafka_2.11-0.10.0.0.jar是否支持使用创建代理:import kafka.cluster.Broker; import kafka.cluster.Cluster; 但是我找不到这样做的任何文档。我最近读了[1],它讲述了如何在Kafka API中使用Java创建主题。…

Java Double与BigDecimal - java

我正在查看一些使用双精度变量来存储(360-359.9998779296875)结果为0.0001220703125的代码。 double变量将其存储为-1.220703125E-4。当我使用BigDecimal时,其存储为0.0001220703125。为什么将它双重存储为-1.220703125E-4? 参考方案 我不会在这里提及精度问题,而只会提及数字…

当回复有时是一个对象有时是一个数组时,如何在使用改造时解析JSON回复? - java

我正在使用Retrofit来获取JSON答复。这是我实施的一部分-@GET("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…

JAVA:字节码和二进制有什么区别? - java

java字节代码(已编译的语言,也称为目标代码)与机器代码(当前计算机的本机代码)之间有什么区别?我读过一些书,他们将字节码称为二进制指令,但我不知道为什么。 参考方案 字节码是独立于平台的,在Windows中运行的编译器编译的字节码仍将在linux / unix / mac中运行。机器代码是特定于平台的,如果在Windows x86中编译,则它将仅在Win…