The current example uses spring-cloud-function framework as its core which allows users to only worry about functional aspects of their requirement while taking care-off non-functional aspects. For more information on Spring Cloud Function please visit our project page.
The example consists of a Spring boot configuration class DemoApplication which contains a sample function which you can interact with following via AMQP and Apache Kafka.
Assuming you have RabbitMQ and Kafka running, start the application and send a Message to RabbitMQ.
We included a demo test case which effectively automates this demo by sending Cloud Event to RabbitMQ and receives one from Apache Kafka.
Message<byte[]> messageToAMQP = CloudEventMessageBuilder
.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
.setSource("https://cloudevent.demo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .
Note how we are using CloudEventMessageBuilder here to only set source as Cloud Event attribute while relying on default values for the rest of the
required Cloud Event attributes. We’re also using build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) to ensure that the attributes are prefixed with cloudEvents:
prefix (see Cloud Events AMQP protocol bindings).
Also note that on the receiving end Cloud Events attributes are now prefixed with ce_
prefix (see Cloud Events Kafka protocol bindings),
since it was determined by the framework that the target destination is Apache Kafka.
This last point is worth elaborating a bit. We already established that setting Cloud Event attributes is a non-functional aspect and because
of it we’ve exposed a mechanism for you to deal with it outside of your business logic. But what about attribute prefixes? Note that we are running the
same code in different execution contexts. This means that the attribute prefixes actually depend on the execution context. So by being aware of the execution
context, the framework ensures the correctness of the Cloud Event attribute prefixes.
You can also use RabbitMQ dashboard (if you have it installed) and send message to hire-in-0
exchange.
To stay compliant with Cloud Event specification you should provide attributes with AMQP appropriate prefixes (i.e., cloudEvents:
). For example:
cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001
And your data:
{"firstName":"John", "lastName":"Doe"}