如何通过kafka流加入主题 - java

卡夫卡流
我正在尝试流式传输,但存在一些问题,它不起作用。首先,我有3个连接器,但不能使用自己的钥匙。我需要钥匙才能加入它们,对不对?如何使用2个或更多钥匙加入?
我尝试复制这样的内容:
选择*从(选择a。*从用户a内部联接部门b
在a.dep = b.dep和a.group = b.group上
)a.id = b.id上的内部联接user_afy

我想将内部联接的数据保存在一个主题中,并将其用于外部联接。这是我的一个例子。

连接器属性:

....
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=timestamp
query=select id, user, dep,tal, group,time from users
numeric.mapping=best_fit
table.types=TABLE
topic=users  
// I try use this with 1 or more fields but not worked  
transforms=createKey, extractInt  
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey  
transforms.createKey.fields=dep, group  
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key  
transforms.extractInt.field=dep, group  

standalone.properties

bootstrap.servers=localhost:9092  
key.converter=org.apache.kafka.connect.json.JsonConverter  
value.converter=org.apache.kafka.connect.json.JsonConverter  
key.converter.schemas.enable=false  
value.converter.schemas.enable=false  
offset.storage.file.filename=D:/tmp/connect.offsets  
plugin.path=D:/connector/lib  

主题:

Topic users    
{"id":"0001", "user":"Alex", "dep":"ofi", "postal":170, group="ingen",time:"xxx"}    
{"id":"0002", "user":"Emy", "dep":"lab", "postal":170, group="itn",time:"xxx"}    
{"id":"0003", "user":"Lea", "dep":"lab", "postal":170, group="itn",time:"xxx"}    
{"id":"0004", "user":"Silva", "dep":"cent", "postal":170, group="ingen",time:"xxx"}    
{"id":"0005", "user":"Foxy", "dep":"cent", "postal":170, group="ete",time:"xxx"}    

topic user_afy
{"id":"0001", name="bask"}
{"id":"0001", name="Silf"}
{"id":"0002", name="BTT"}
{"id":"0005", name="butf"}


Topic deps  
{"id_dep":"1", "dep":"ofi", "sind"="worker", "group"="ingen."}  
{"id_dep":"2", "dep":"lab", "sind"="worker", "group"="iti."}  
{"id_dep":"3", "dep":"cent", "sind"="worker", "group"="etc."} 

我的代码是官方网站的示例,但我无法对其进行测试

public static void main(String[] Args) {
        Properties props = new Properties();
        props.put(......);

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
        StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, JsonNode> left = builder.stream("user", consumed);
        KTable<String, JsonNode> right = builder.table("deps", consumed);
        KStream<String, String> joined = left.join(right,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
            Joined.with(Serdes.String(), jsonSerde, jsonSerde)
        );
//Edit
       joined.foreach((k, v) -> {
          System.out.println("key="+k+ ", val=" + v);
       });

}

输出如何显示?要创建一个新主题,哈希表是否更适合您要以json格式保存的值?稍后我将创建自定义Serdes

参考方案

“我不能使用自己的钥匙”是什么意思。在Kafka Streams中,您始终可以根据需要设置新密钥。

如果要将数据读取到KTable中,则不能直接更改键。您需要将主题读为KStream,设置新的密钥,然后将KStream转换为KTable(参见Kafka Streams API: KStream to KTable)。

对于多个连续的联接,您只需将相应的操作“链接”在一起。

builder.stream("topic-1").selectKey(...).to("table-topic-1");
KTable t1 = builder.table("table-topic-1");

KStream firstJoinResult = builder.stream(...).selectKey(...).join(t1, ...).

builder.stream("topic-2").selectKey(...).to("table-topic-2");
KTable t2 = builder.table("table-topic-2");

firstJoinResult.selectKey(...).join(t2, ...).to("result-topic");

Java Double与BigDecimal - java

我正在查看一些使用双精度变量来存储(360-359.9998779296875)结果为0.0001220703125的代码。 double变量将其存储为-1.220703125E-4。当我使用BigDecimal时,其存储为0.0001220703125。为什么将它双重存储为-1.220703125E-4? 参考方案 我不会在这里提及精度问题,而只会提及数字…

当回复有时是一个对象有时是一个数组时,如何在使用改造时解析JSON回复? - java

我正在使用Retrofit来获取JSON答复。这是我实施的一部分-@GET("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…

Java-父类正在从子类中调用方法? - java

抱歉,我还是编码的新手,可能还没有掌握所有术语。希望您仍然能理解我的问题。我想得到的输出是:"Cost for Parent is: 77.77" "Cost for Child is: 33.33" 但是,我得到这个:"Cost for Parent is: 33.33" "Cost f…

Java Map,如何将UTF-8字符串正确放置到地图? - java

我有一个地图,LinkedHashMap更确切地说。我想在上面放一个字符串对象。然后,我读取此值以查看实际存储的内容。字符串本身具有非ASCII字符(西里尔文,韩文等)。将其放到地图上然后阅读后,这些字符将替换为??? s。一些代码:Map obj = new LinkedHashMap(); System.out.println("name: &…

找不到StandardAnalyzer类 - java

我已经下载了最新的Lucene 4.6.0和running it in netbeans。Lucence 3.6.x版本运行良好,但最新版本找不到org.apache.lucene.analysis.standard.StandardAnalyzer和IndexFiles.java,这行中显示错误:Analyzer analyzer = new Standa…