Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow | JavaInUse




Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow



Spring Cloud Data Flow is a toolkit to build real-time data integration and data processing pipelines by establishing message flows between Spring Boot applications that could be deployed on top of different runtimes.
Long lived applications require Stream Applications while Short lived applications require Task Applications.
In this example we make use of Stream Applications. Previously we had already developed Spring Cloud Stream applications to understand the concept of Spring Cloud Stream Source and Spring Cloud Sink and their benefit.
Pipelines consist of Spring Boot apps, built using the Spring Cloud Stream or Spring Cloud Task microservice frameworks. SCDF can be accessed using the REST API exposed by it or the web UI console.
We can make use of metrics, health checks, and the remote management of each microservice application Also we can scale stream and batch pipelines without interrupting data flows. With SCDF we build data pipelines for use cases like data ingestion, real-time analytics, and data import and export. SCDF is composed of the following Spring Projects-

Spring Cloud - Table Of Contents

Microservice Registration and Discovery with Spring cloud using Netflix Eureka- Part 1. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 2. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 3. Microservice Registration and Discovery with Spring cloud using Netflix Eureka - Part 4. Spring Cloud- Netflix Eureka + Ribbon Simple Example Spring Cloud- Netflix Eureka + Ribbon + Hystrix Fallback Simple Example Spring Cloud- Netflix Hystrix Circuit Breaker Simple Example Spring Cloud- Netflix Feign REST Client Simple Example Spring Cloud- Netflix Zuul +Eureka Simple Example Spring Cloud Config Server using Native Mode Simple Example Spring Cloud Config Server Using Git Simple Example Spring Boot Admin Simple Example Spring Cloud Stream Tutorial - Publish Message to RabbitMQ Simple Example Spring Cloud Stream Tutorial - Consume Message from RabbitMQ Simple Example Spring Cloud Tutorial - Publish Events Using Spring Cloud Bus Spring Cloud Tutorial - Stream Processing Using Spring Cloud Data Flow

Video

This tutorial is explained in the below Youtube Video.

Lets Begin-

We will be creating Spring Boot Microservices as follows and deploy them using SCDF.
  • Create the Source Module as follows-
    We will create a module using Spring cloud stream we had created in an earlier tutorial - The pom.xml is follows-
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.javainuse</groupId>
    	<artifactId>source</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
            <spring-cloud.version>Brixton.SR7</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>{spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    
    
    Create the SourceApplication.java as follows-
    package com.javainuse;
    
    import java.util.Date;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.context.annotation.Bean;
    import org.springframework.integration.annotation.InboundChannelAdapter;
    import org.springframework.integration.annotation.Poller;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.support.MessageBuilder;
    
    @EnableBinding(Source.class)
    @SpringBootApplication
    public class SourceApplication {
    
    	@Bean
    	@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    	public MessageSource<Long> timeMessageSource() {
    
    		return () -> MessageBuilder.withPayload(new Date().getTime()).build();
    	}
    
    	public static void main(String[] args) {
    		SpringApplication.run(SourceApplication.class, args);
    	}
    }
    
  • Create the Processor Module as follows-
    The pom.xml is follows-
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.javainuse</groupId>
    	<artifactId>processor</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
            <spring-cloud.version>Brixton.SR7</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>{spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    
    Create the ProcessorApplication.java as follows-
    package com.javainuse;
    
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Processor;
    import org.springframework.integration.annotation.Transformer;
    
    @EnableBinding(Processor.class)
    @SpringBootApplication
    public class ProcessorApplication {
    
        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        public Object transform(Long date) {
    
            DateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
            return dateFormat.format(date);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ProcessorApplication.class, args);
        }
    }
    
  • Create the Sink Module as follows-
    The pom.xml is follows-
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<artifactId>sink</artifactId>
    	<groupId>com.javainuse</groupId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
            <spring-cloud.version>Brixton.SR7</spring-cloud.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
    	</dependencies>
    
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>{spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    	<artifactId>sink</artifactId>
    </project>
    
    Create the SinkApplication.java as follows-
    package com.javainuse;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @EnableBinding(Sink.class)
    @SpringBootApplication
    public class SinkApplication {
    
    	private static Logger logger = LoggerFactory.getLogger(SinkApplication.class);
    
    	@StreamListener(Sink.INPUT)
    	public void loggerSink(String date) {
    
    		logger.info("Received: " + date);
    	}
    
    	public static void main(String[] args) {
    		SpringApplication.run(SinkApplication.class, args);
    	}
    }
    
We are done with the required Java code. Now lets start RabbitMQ. As we had explained in detail in the Getting started with RabbitMQ perform the steps to start the RabbitMQ.
Run maven clean install command on all the above modules. Using Spring Cloud Shell we will now deploy the applications as follows-
  • a. Download the Spring Cloud Data Flow and start it.
    Download the jar using http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-server-local/1.3.0.M1/spring-cloud-dataflow-server-local-1.3.0.M1.jar
    Start Spring Cloud using-
    java -jar spring-cloud-dataflow-server-local-1.3.0.M1.jar
    
    If we now go localhost:9393 we will see the Spring Cloud Data Flow Console
  • b. Download Spring Cloud Shell jars and start it.
    Download the jar using http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-shell/1.3.0.M1/spring-cloud-dataflow-shell-1.3.0.M1.jar
    Start Spring Cloud using-
    java -jar spring-cloud-dataflow-shell-1.3.0.M1.jar
    
  • c. Install the applications to runtime.
    In the Spring Cloud Data Shell, install the application
    dataflow:>app register --name source-app --type source --uri maven://com.javainuse:source:jar:0.0.1-SNAPSHOT
    dataflow:>app register --name processor-app --type processor --uri maven://com.javainuse:processor:jar:0.0.1-SNAPSHOT
    dataflow:>app register --name sink-app --type sink --uri maven://com.javainuse:sink:jar:0.0.1-SNAPSHOT
    
    Go to Spring Cloud Data Flow UI Console - http://localhost:9393/dashboard
  • d. Create Stream.
    stream create --name log-data --definition 'source-app | processor-app | sink-app'
    
  • e. Start the Stream.
    stream deploy --name log-data
    
    Stream has been deployed
If we now see the sink logs which are created at the location mentioned in the Spring Cloud Console-
We can see Sink Log, the application is receiving the messages-

Download Source Code

Download it -
Spring Cloud Data Flow Source Application
Spring Cloud Data Flow Processor Application
Sprong Cloud Data Flow Sink Application

See Also

Spring Boot Hello World Application- Create simple controller and jsp view using Maven Spring Boot Tutorial-Spring Data JPA Spring Boot + Simple Security Configuration Pagination using Spring Boot Simple Example Spring Boot + ActiveMQ Hello world Example Spring Boot + Swagger Example Hello World Example Spring Boot + Swagger- Understanding the various Swagger Annotations Spring Boot Main Menu Spring Boot Interview Questions