Kafka with Spring Cloud Stream

Kafka with Spring Cloud Stream

This post talks about step-by-step process to enable messaging in a micro-service using Kafka with Spring cloud stream.

Spring cloud stream is a framework under the umbrella project spring cloud, which enables developers to build event driven microservices with messaging systems like Kafka and RabbitMQ.  

Asynchronous messaging systems are always an important part of any modern enterprise software solution. The evolution of microservices has shortened the time-to-market for any software product, but this is not possible without necessary tools and frameworks. Spring Cloud Stream is a framework which is built on top of Spring integration. It integrates with Spring boot seamlessly to build efficient micro-services in less time that connect with shared messaging systems. Spring cloud stream provides multiple binder implementations such as Kafka, RabbitMQ and various others. The details are provided here.

Here is a step-by-step procedure to build a simple microservice application based on spring boot and uses spring cloud stream to connect with a Kafka instance.

  • Install Kafka and create a topic. I am using a Kafka broker running on my local windows machine for this demonstration, but it can be an installation on a Unix machine as well. Steps for Kafka installation on windows machine are provided here.
  • Create a spring boot starter project either using STS IDE or from spring initializr. I am providing the pom.xml for reference.
  • The spring cloud stream project needs to be configured with Kafka broker url, topic and other binder configuration. Below is an example of configuration in application.yml
  • We will need at least one producer and a consumer to test the message send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using spring cloud stream.
  • We will also create a Rest Controller class which will accept the message over http and will pass it to the producer. This is just to make the testing convenient.
  • Run the below maven commands to build and run this project.
mvn clean install
mvn spring-boot:run
  • Hit the POST endpoint “/sendMessage/string” and check the application console logs. Here is an example output the application produced when I hit this endpoint with message “hello” in the rest body.
2019-10-01 14:37:22.764  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a string message : {"contents":"hello","time":1569920841187}
  • Hit the POST endpoint “/sendMessage/complexType” and check the application console logs.
2019-10-01 14:37:22.773  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a complex message : [2:37:21 PM]: hello

The annotation @EnableBinding takes one or more interfaces as parameters. In this example, we have used Sink and Source interfaces, which declare input and output channels, respectively. You can also define your own interfaces for this purpose.

@StreamListener annotation is a convenient way provided by Spring Cloud stream for content-based routing. It works based on pub-sub model and every @StreamListener receives its own copy of the message.

I have used two stream listeners in this project, one for consuming plain string messages and another one for messages with a complex type – ChatMessage. The producer sends messages attached with a header “type” with a logical value and consumer can apply conditions to filter messages using @StreamListener.

You can find the complete project here.


An expert software professional, Music junkie, Reader.. blogger ? can barely write!

Leave a Reply

Your email address will not be published. Required fields are marked *