Search Tutorials


Spring Cloud Stream Tutorial - Consume Message from RabbitMQ Simple Example | JavaInUse

Spring Cloud Stream Tutorial - Consume Message from RabbitMQ Simple Example

In a previous tutorial we had implemented an example to publish message to RabbitMQ using Spring Cloud Stream. In this example we will see how to consume message using Spring Cloud Stream.

Spring Cloud - Table Of Contents

Microservice Registration and Discovery with Spring cloud using Netflix Eureka- Part 1. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 2. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 3. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 4. Spring Cloud- Netflix Eureka + Ribbon Simple Example Spring Cloud- Netflix Eureka + Ribbon + Hystrix Fallback Simple Example Spring Cloud- Netflix Hystrix Circuit Breaker Simple Example Spring Cloud- Netflix Feign REST Client Simple Example Spring Cloud- Netflix Zuul +Eureka Simple Example Spring Cloud Config Server using Native Mode Simple Example Spring Cloud Config Server Using Git Simple Example Spring Boot Admin Simple Example Spring Cloud Stream Tutorial - Publish Message to RabbitMQ Simple Example Spring Cloud Stream Tutorial - Consume Message from RabbitMQ Simple Example Spring Cloud Tutorial - Publish Events Using Spring Cloud Bus Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow Spring Cloud Tutorial - Distributed Log Tracing using Sleuth and Zipkin Example Spring Cloud Tutorial - Spring Cloud Gateway Hello World Example Spring Cloud Tutorial - Spring Cloud Gateway Filters Example Spring Cloud Tutorial - Spring Cloud Gateway + Netflix Eureka Example Spring Cloud Tutorial - Spring Cloud Gateway + Netflix Eureka + Netflix Hystrix Example Spring Cloud Tutorial - Secure Secrets using Spring Cloud Config + Vault Example

Video

This tutorial is explained in the below Youtube Video.

Spring Cloud Concepts-


Spring Cloud Stream - Consume RabbitMQ Example
  • Binder - Depending upon the messaging system we will have to specify a the messaging platform dependency, which in this case is RabbitMQ
  • <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
  • Sink - In Spring Cloud Stream, sink is used to consume message from queue. @StreamListener(target = Sink.INPUT) public void processRegisterEmployees(String employee){ System.out.println("Employees Registered --"+ employee); }
  • Channel - A channel represents an input and output pipe between the Spring Cloud Stream Application and the Middleware Platform. A channel abstracts the queue that will either publish or consume the message. A channel is always associated with a queue. With this approach, we do not need to use the queue name in the application code. So if tomorrow the queue needs to be changed, we dont need to change the application code.
    For example in the EmployeeRegistrationSource we have specified the channel name as employeeRegistrationChannel. In application.properties we have associated this channel with a RabbitMQ Exchange.

Lets Begin-

The Maven project will be as follows -
Spring Cloud Stream - Consume RabbitMQ Eclipse
The pom.xml will be as follows with the binder dependency of RabbitMQ as follows-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javainuse</groupId>
	<artifactId>employee-registration-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>com.javainuse</name>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<spring-cloud.version>Edgware.SR1</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

       
  



Create the Spring Boot Bootstrap class with the SpringBootApplication annotation as follows-
package com.javainuse;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
@SpringBootApplication
public class EmployeeRegistrationProcessingApplication {

	public static void main(String[] args) {
		SpringApplication.run(EmployeeRegistrationProcessingApplication.class, args);
	}

	@StreamListener(target = Sink.INPUT)
	public void processRegisterEmployees(String employee) {
		System.out.println("Employees Registered by Client " + employee);
	}
}
  
Next we specify the properties. Here we specify the RabbitMQ properties. Also we associate the channel and the queue to be used -
server.port=8090
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.input.destination=employeeRegistrations
spring.cloud.stream.bindings.input.group=employeeRegistrationQueue
  
We are done with the required Java code. Now lets start RabbitMQ. As we had explained in detail in the Getting started with RabbitMQ perform the steps to start the RabbitMQ.
Previous chapter we had published a message to the employeeRegistrations exchange. We will be consuming the same message here. Next start the Spring Boot Application by running it as a Java Application.
Go to the RabbitMQ console-http://localhost:15672/. We can see in the Exchange section, a queue named employeeRegistrationQueue gets created and it gets bound to the employeeRegistrations exchange. The queue gets the message from this exchange and is then consumed by our Spring Cloud Stream Application.
Spring Cloud Stream - Consume RabbitMQ Message Exchange

Spring Cloud Stream - Using RabbitMQ Queue

Spring Cloud Stream

Download Source Code

Download it -
Spring Cloud Stream - RabbitMQ Comsume Message Example

See Also

Spring Boot Hello World Application- Create simple controller and jsp view using Maven Spring Boot Tutorial-Spring Data JPA Spring Boot + Simple Security Configuration Pagination using Spring Boot Simple Example Spring Boot + ActiveMQ Hello world Example Spring Boot + Swagger Example Hello World Example Spring Boot + Swagger- Understanding the various Swagger Annotations Spring Boot Main Menu Spring Boot Interview Questions