Search Tutorials


Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow | JavaInUse

Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow

Spring Cloud Data Flow is a toolkit to build real-time data integration and data processing pipelines by establishing message flows between Spring Boot applications that could be deployed on top of different runtimes.
cloud-15_1
Long lived applications require Stream Applications while Short lived applications require Task Applications.
In this example we make use of Stream Applications. Previously we had already developed Spring Cloud Stream applications to understand the concept of Spring Cloud Stream Source and Spring Cloud Sink and their benefit.
cloud-15_5
Pipelines consist of Spring Boot apps, built using the Spring Cloud Stream or Spring Cloud Task microservice frameworks. SCDF can be accessed using the REST API exposed by it or the web UI console.
We can make use of metrics, health checks, and the remote management of each microservice application Also we can scale stream and batch pipelines without interrupting data flows. With SCDF we build data pipelines for use cases like data ingestion, real-time analytics, and data import and export. SCDF is composed of the following Spring Projects-
cloud-15_6

Spring Cloud - Table Of Contents

Microservice Registration and Discovery with Spring cloud using Netflix Eureka- Part 1. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 2. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 3. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 4. Spring Cloud- Netflix Eureka + Ribbon Simple Example Spring Cloud- Netflix Eureka + Ribbon + Hystrix Fallback Simple Example Spring Cloud- Netflix Hystrix Circuit Breaker Simple Example Spring Cloud- Netflix Feign REST Client Simple Example Spring Cloud- Netflix Zuul +Eureka Simple Example Spring Cloud Config Server using Native Mode Simple Example Spring Cloud Config Server Using Git Simple Example Spring Boot Admin Simple Example Spring Cloud Stream Tutorial - Publish Message to RabbitMQ Simple Example Spring Cloud Stream Tutorial - Consume Message from RabbitMQ Simple Example Spring Cloud Tutorial - Publish Events Using Spring Cloud Bus Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow Spring Cloud Tutorial - Distributed Log Tracing using Sleuth and Zipkin Example Spring Cloud Tutorial - Spring Cloud Gateway Hello World Example Spring Cloud Tutorial - Spring Cloud Gateway Filters Example Spring Cloud Tutorial - Spring Cloud Gateway + Netflix Eureka Example Spring Cloud Tutorial - Spring Cloud Gateway + Netflix Eureka + Netflix Hystrix Example Spring Cloud Tutorial - Secure Secrets using Spring Cloud Config + Vault Example

Video

This tutorial is explained in the below Youtube Video.

Lets Begin-

We will be creating Spring Boot Microservices as follows and deploy them using SCDF.
cloud-15_7
  • Create the Source Module as follows-
    cloud-15_2



We will create a module using Spring cloud stream we had created in an earlier tutorial - The pom.xml is follows-
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javainuse</groupId>
	<artifactId>source</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
        <spring-cloud.version>Brixton.SR7</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>{spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

Create the SourceApplication.java as follows-
package com.javainuse;

import java.util.Date;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;

@EnableBinding(Source.class)
@SpringBootApplication
public class SourceApplication {

	@Bean
	@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
	public MessageSource<Long> timeMessageSource() {

		return () -> MessageBuilder.withPayload(new Date().getTime()).build();
	}

	public static void main(String[] args) {
		SpringApplication.run(SourceApplication.class, args);
	}
}
  • Create the Processor Module as follows-
    cloud-15_3
    The pom.xml is follows-
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.javainuse</groupId>
    	<artifactId>processor</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
            <spring-cloud.version>Brixton.SR7</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>{spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    
    Create the ProcessorApplication.java as follows-
    package com.javainuse;
    
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Processor;
    import org.springframework.integration.annotation.Transformer;
    
    @EnableBinding(Processor.class)
    @SpringBootApplication
    public class ProcessorApplication {
    
        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        public Object transform(Long date) {
    
            DateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
            return dateFormat.format(date);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ProcessorApplication.class, args);
        }
    }
    
  • Create the Sink Module as follows-
    cloud-15_4
    The pom.xml is follows-
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<artifactId>sink</artifactId>
    	<groupId>com.javainuse</groupId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
            <spring-cloud.version>Brixton.SR7</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>{spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    	<artifactId>sink</artifactId>
    </project>
    
    Create the SinkApplication.java as follows-
    package com.javainuse;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @EnableBinding(Sink.class)
    @SpringBootApplication
    public class SinkApplication {
    
    	private static Logger logger = LoggerFactory.getLogger(SinkApplication.class);
    
    	@StreamListener(Sink.INPUT)
    	public void loggerSink(String date) {
    
    		logger.info("Received: " + date);
    	}
    
    	public static void main(String[] args) {
    		SpringApplication.run(SinkApplication.class, args);
    	}
    }
    
  • We are done with the required Java code. Now lets start RabbitMQ. As we had explained in detail in the Getting started with RabbitMQ perform the steps to start the RabbitMQ.
    Run maven clean install command on all the above modules. Using Spring Cloud Shell we will now deploy the applications as follows-
    • a. Download the Spring Cloud Data Flow and start it.
      Download the jar using http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-server-local/1.3.0.M1/spring-cloud-dataflow-server-local-1.3.0.M1.jar
      Start Spring Cloud using-
      java -jar spring-cloud-dataflow-server-local-1.3.0.M1.jar
      

      cloud-15_16
      If we now go localhost:9393 we will see the Spring Cloud Data Flow Console
    • b. Download Spring Cloud Shell jars and start it.
      Download the jar using http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-shell/1.3.0.M1/spring-cloud-dataflow-shell-1.3.0.M1.jar
      Start Spring Cloud using-
      java -jar spring-cloud-dataflow-shell-1.3.0.M1.jar
      

      cloud-15_8
    • c. Install the applications to runtime.
      In the Spring Cloud Data Shell, install the application
      dataflow:>app register --name source-app --type source --uri maven://com.javainuse:source:jar:0.0.1-SNAPSHOT
      dataflow:>app register --name processor-app --type processor --uri maven://com.javainuse:processor:jar:0.0.1-SNAPSHOT
      dataflow:>app register --name sink-app --type sink --uri maven://com.javainuse:sink:jar:0.0.1-SNAPSHOT
      

      cloud-15_9
      Go to Spring Cloud Data Flow UI Console - http://localhost:9393/dashboard
      cloud-15_12
    • d. Create Stream.
      stream create --name log-data --definition 'source-app | processor-app | sink-app'
      

      cloud-15_10
    • e. Start the Stream.
      stream deploy --name log-data
      
      Stream has been deployed
      cloud-15_11
    If we now see the sink logs which are created at the location mentioned in the Spring Cloud Console-
    cloud-15_14
    We can see Sink Log, the application is receiving the messages-
    cloud-15_15

    Download Source Code

    Download it -
    Spring Cloud Data Flow Source Application
    Spring Cloud Data Flow Processor Application
    Sprong Cloud Data Flow Sink Application

    See Also

    Spring Boot Hello World Application- Create simple controller and jsp view using Maven Spring Boot Tutorial-Spring Data JPA Spring Boot + Simple Security Configuration Pagination using Spring Boot Simple Example Spring Boot + ActiveMQ Hello world Example Spring Boot + Swagger Example Hello World Example Spring Boot + Swagger- Understanding the various Swagger Annotations Spring Boot Main Menu Spring Boot Interview Questions