Skip to content

Commit da3ae99

Browse files
authored
kafka streams branch vs parallel execution
1 parent 0bf38f4 commit da3ae99

File tree

1 file changed

+38
-1
lines changed

1 file changed

+38
-1
lines changed

kafka-stream/README.md

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,41 @@ java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar confi
5656
* [official kafka streams](https://kafka.apache.org/documentation/streams/)
5757
* [write application](https://kafka.apache.org/37/documentation/streams/tutorial)
5858
* [quarkus kafka stream](https://quarkus.io/guides/kafka-streams)
59-
59+
60+
61+
### snippet: branch vs parallel
62+
```java
63+
public StreamsBuilder buildTopology(StreamsBuilder builder) {
64+
65+
Serde<String> stringSerde = Serdes.String();
66+
67+
KStream<String, String> inputStream = builder.stream(this.inputTopic, Consumed.with(stringSerde, stringSerde));
68+
// BranchedKStream<String, String> splitedStream = builder
69+
// .stream(this.inputTopic,
70+
// Consumed.with(
71+
// stringSerde,
72+
// stringSerde))
73+
// .split();
74+
75+
for (String eachKey : configLoader.getAllAccessibleKeys()) {
76+
ObjectConfig eachConfig = configLoader.getConfig(eachKey);
77+
.to(eachConfig.outputTopic(), Produced.with(stringSerde, stringSerde));
78+
// splitedStream.branch((k, jsonBody) -> ruleEvaluation(eachConfig.rule(), jsonBody),
79+
// Branched.withConsumer(stream -> stream
80+
// .to(eachConfig.outputTopic(), Produced.with(stringSerde, stringSerde))
81+
// )
82+
// );
83+
}
84+
85+
86+
private boolean ruleEvaluation(String rule, String jsonBody) {
87+
try{
88+
return ruleEvaluation(
89+
rule,
90+
new ObjectMapper().readValue(jsonBody, Map.class));
91+
}catch(JsonProcessingException e) {
92+
logger.error("Error during rule evaluation: " + rule + " for message: " + jsonBody, e);
93+
return false;
94+
}
95+
96+
```

0 commit comments

Comments
 (0)