消费消息时如何访问Kafka标头? - java

以下是我的配置

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>

inputFromKafka经过下面的转换

public Message<?> transform(final Message<?> message) {

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    }

上面的print语句仅打印两个一致的标题
KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}
在发送Kafka消息时,传递了一些标头,包括KafkaHeaders.MESSAGE_KEY,但我也没有回来,想知道是否还有办法做到这一点?

参考方案

不幸的是,这种方式行不通...
Producer部分(KafkaProducerMessageHandler)看起来像这样:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

如您所见,我们不会将任何messageHeaders发送到Kafka topic。仅payload,并且恰好在Kafka协议指定的messageKey下。

从另一侧,Consumer一侧(KafkaHighLevelConsumerMessageSource)执行以下逻辑:

if (!payloadMap.containsKey(messageAndMetadata.partition())) {
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);
}

如您所见,我们不在乎messageKey
KafkaMessageDrivenChannelAdapter(<int-kafka:message-driven-channel-adapter>)适合您!它在将消息发送到通道之前执行此操作:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) {
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}

Spring MVC中的输入验证 - java

我知道Commons Validator框架是Struts项目在服务器端和客户端验证输入值的事实上的标准。Spring MVC项目是否也是如此?我得到的印象可能不是,大多数Struts书籍和论坛都谈论Commons Validator框架,但是只有少数Spring书籍和论坛可以。在Spring MVC项目中验证输入的最佳实践是什么?干杯! 参考方案 在引入S…

Java:“自动装配”继承与依赖注入 - java

Improve this question 我通常以常见的简单形式使用Spring框架: 控制器服务存储库通常,我会在CommonService类中放一个通用服务,并使所有其他服务扩展到类中。一个开发人员告诉我,最好在每个服务中插入CommonClass而不是使用继承。我的问题是,有一个方法比另一个更好吗? JVM或性能是否会受到另一个影响?更新资料Comm…

Spring MVC Web应用程序检测暴力攻击的最佳方法? - java

Spring 3.0 MVC中是否有专门用于帮助检测Web应用程序的身份验证/登录页面上的蛮力攻击的功能? 参考方案 经过长期验证的实践是,如果身份验证失败,则会引入随机但相当大的延迟。这样,合法用户将立即登录,但攻击者每次尝试将花费500ms-1s,这使整个暴力概念不切实际(将永远存在)。合法用户偶尔失败的登录只会使他们稍有延迟,并且不会引起注意。如果需要…

Java中的“ <<”运算符 - java

最喜欢的语句来自Java的Character类:(1 << Character.PARAGRAPH_SEPARATOR)) >> type PARAGRAPH_SEPARATOR是字节,type是整数。这句话中的操作员,他们做什么?如何以及在哪里可以使用这些运算符?这是oracles java.lang.Character文档。该类中…

Spring Data Cassandra的事务管理 - java

我正在使用Spring和Cassandra作为基础数据库。曾提到过弹簧伞项目“ spring data cassandra”。与休眠不同,在这里无法找到如何管理事务。如果您中的某些人已经合并,请共享要包含的事务管理器的详细信息。 参考方案 Cassandra不支持传统(ACID)的事务。在某些特殊情况下,可以通过一些构造来实现事务原子性,例如原子批处理(请参…