Skip to content

Commit de1034b

Browse files
committed
Implemented reply channel
1 parent a4db54d commit de1034b

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

src/main/java/mu/integration/consumer/rabbitmq/Pojo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@
66
*/
77
public class Pojo {
88

9+
public Pojo() {
10+
11+
}
12+
13+
public Pojo(String csvLine) {
14+
15+
String[] array = csvLine.split(",");
16+
this.setCol1(array[0]);
17+
this.setCol2(array[1]);
18+
this.setCol3(array[2]);
19+
20+
}
21+
922
private String col1;
1023

1124
private String col2;
Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package mu.integration.consumer.rabbitmq;
22

3-
import org.springframework.amqp.core.Queue;
43
import org.springframework.amqp.rabbit.annotation.RabbitListener;
5-
import org.springframework.context.annotation.Bean;
64
import org.springframework.integration.support.MessageBuilder;
75
import org.springframework.messaging.Message;
86
import org.springframework.messaging.MessageHeaders;
9-
import org.springframework.messaging.handler.annotation.SendTo;
107
import org.springframework.stereotype.Component;
118

129
/**
@@ -16,34 +13,23 @@
1613
@Component
1714
public class RabbitmqListener {
1815

16+
private static final String AMQ_CORRELATION_ID = "amqp_correlationId";
1917

2018
@RabbitListener(queues = "myQueue")
21-
@SendTo("foo_response")
22-
public Message<String> receive(Message<String> message) {
23-
24-
System.out.println("\n\n*********************");
25-
26-
System.out.println(message.getHeaders());
2719

28-
Pojo record = new Pojo();
20+
public Message<String> receive(Message<String> message) {
2921

30-
String[] array = message.getPayload().split(",");
31-
record.setCol1(array[0]);
32-
record.setCol2(array[1]);
33-
record.setCol3(array[2]);
22+
System.out.println("\n\n*********** Message Received ***********");
3423

24+
//Call a service class to validate pojo
25+
Pojo record = new Pojo(message.getPayload());
3526
System.out.println(record.toString());
3627

37-
return MessageBuilder.withPayload("Hello")
28+
return MessageBuilder.withPayload("PROCESSED: " + message.getHeaders().get(AMQ_CORRELATION_ID))
3829
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
3930
.build();
4031

4132
}
4233

4334

44-
@Bean
45-
public Queue queue() {
46-
return new Queue("foo_response", false);
47-
}
48-
4935
}

0 commit comments

Comments
 (0)