Why Reactive Programming ?
Traditional Programming
Traditional Programming makes a call to the DB and the code goes into a blocking phase and wait for the result. Once the result is retrieved from the DB, then the correct solution completes this way of writing code.
This way is very common and this is also called synchronous or blocking style of writing code. Basically the thread is involved in the whole interaction is blocked until the response is received from the DB.
What has changed?
- Internet usages spiked up
- Network interactions are pretty common
- Microservices Architectures are pretty much everywhere
- Applications are deployed in cloud environments
- Response times are expected in milliseconds
- No downtime
Today’s Architecture (Backend Restful API)
in the above case, the response time is the summation of a DB + the other API calls response time.
This model works to a certain limit and you can’t have a thread pool based on the number of users you want to support.
Let’s say you have a Usecase where you need to support 100,000 concurrent users. in that case, you can’t define a thread pool with 100,000 that you might be thinking why we can’t have a thead pool size that matches the number of users you want to support.
Thread and it’s side effects
- Thread is an expensive resource
- It takes up 1MB of heap space More threads lead to more heap space which leads to shortage of JVM memory for handling the request.
Can we do better ?
Callbacks
- Asynchronous methods that accept a callback as a parameter and invokes it when the blocking call completes.
- Writing code with Callbacks are hard to compose and difficult to read and maintain
- There is a popular term tied with callbacks which is a Callbackhell
Future
- Released in Java 5
- Write Asynchronous Code
- No easy way to combine the result from multiple futures
- Future.get() is a blocking call
CompletableFuture
- Released in Java 8
- Write Asynchronous code in a functional style
- Easy to compose/combine MultipleFutures
- Future that returns many elements
- Eg., CompletableFuture
- > will need to wait for the whole collection to built and readily available.
- CompletableFuture does not have a handle for infinite values.
What is Reactive Programming ?
- Reactive Programming is a new programming paradigm
- Asynchronous and non blocking
- Data flows as an Event/Message driven stream
- Functional Style Code
- BackPressure on Data Streams
requestForData() is not a blocking call anymore and Calling thread is released to do useful work.
request(n) is a signal to the DB from the app that it is ready to consume the data.
Once the query results are ready, then the data will be sent in the form of a stream of events. This stream is called the reactive stream. The actual data will be sent using the onNext() function to the app.
Once all the results are sent to the app, then there will be a completion even which signals the app that there is no more data and that event will be sent using the onComplete() function.
This is how the code and data flow works in reactive programming.
Backpressure
reactive streams can make easily to overwhelm the app with lots of data. so Backpressure as a concept which is dedicated to solve this problem.
Let’s say we don’t want any more data to DB. then the cancel() request will be sent to the data source and no data will be sent after that. The concept of app controling the data flow is called Backpressure.
When to use Reactive Programming ?
Use Reactive programming when there is need to build and support app that can handle high load.
Reactive App Architecture
- Handle request using non blocking style
- Netty is a non blocking server uses Event-loop style
- Using Project reactor for writing non blocking code
- Spring WebFlux uses the Netty and Project Reactor for building non blocking or reactive APIs
Reactive Streams
Reactive Streams Specification
- Publisher
1 | public interface Publisher<T> { |
The publisher interface has just one method named subscribe() and it takes a parameter with the type named Subscriber.
Publisher can be a database or a remote service or anything which holds the data for the application.
- Subscriber
1 | public interface Subscriber<T> { |
- Subscription
1 | public interface Subscription { |
Subscription interface is a one which connects the app and the data source. in our case, it’s the publisher and subscriber.
- Processor
1 | public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { |
Processor can behave as a Subscriber and Publisher. Not really used this on a day to day basis.
Reactive Streams - How it works together ?
- Success scenario
the Subscriber initiating the request by invoking the subscribe() method of the Publisher. The second step is the Publisher handing out the subscription object by invoking the onSubscribe() method of the Subscriber.
Once a Subscriber has the whole of the Subscription object, then the Subscriber is going to invoke the request function of the Subscription object, requesting the Publisher to send all the data.
Once the publisher receives a request to send the data, then it sends a data using the onNext() function. That’s part of the Subscriber interface on the form of events. Once all the data is sent, then the next step is a Publisher sending an onComplete() event by the onComplete() function.
This is how the Publisher, Subscriber and Subscription interface work together in a success scenario.
- Error scenario
Exceptions are treated the same way as like the data in reactive programming, which means in this case the data are exception will be sent to the app in the form of events.
Introduction to Project Reactor
Project Reactor
Project Reactor is an implementation of Reactive Streams Specification
Project Reactor is a Reactive Library
Spring WebFlux uses Project Reactor by default.
Reactor offers tow reactive and composable APIs, Flux [N] and Mono [0|1]
Reactor Reference Guide Document
Reactor Reactive Types - Flux and Mono
Flux & Mono
Flux and Mono is a reactive type that implements the Reactive Streams Specification
Flux and Mono is part of the reactor-core module
Flux is a reactive type to represent 0 to N elements
Mono is reactive type to represent 0 to 1 element
Flux - 0 to N elements
Marble is at the top, represents the data from the actual data source. since it’s a flux, we have multiple marbles in the data source layer. Each inidividual marble represents the data that’s been sent using the onNext() operator and the Subscriber.
operator is the layer where you can apply transformation techniques to the actual element that’s been emitted from the source. Some of the common and simple transformations are the map() operator and filter() operator. There are a lot of operators in addition to those that are presented project reactor.
The next layer is a result of the transformation that’s been applied the oerator layer. this is the final result that will be handed to the subscriber.
The vertical bar represents that there is no more data. This is the representation to send the onComplete() signal to the Subscriber.
The cross signal represents that some exceptional an error occurred. This is the onError() signal that will be sent to the subscriber.
Mono - 0 to 1 Element
Mono represent a single marble in the nearest source layer. let’s say you have a request which is going to retrieve a single data.
the vertical board and the cross represent the same behavior as a flux, which is the onComplete() and onError() signal.
the operator section is similar to Flux, which has different operator to apply the transformation techniques to the actual element that is emitted from the data souce layer.
Project Setup
https://github.com/kidongYun/reactive_programming_using_reactor
Imperative Style vs Functional Style
Why Functional Programming ?
Reactive programming uses Functional Programming style of code Eg., Code similar to Streams API
Reactive Programming is an extension to Functional Programming
What is Functional Programming ?
This programming model was introduced in Java 8
Functional programming is powered by
- Lambdas
- Method References
- Functional Interfaces
Functional Programming promotes :
- Behavior Parameterization
- Immutability
- Concise code
Imperative Example vs Functional Example
- ImperativeExample
1 | public class ImperativeExample { |
- FunctionalExample
1 | public class FunctionalExample { |
Let’s write our very first Flux
1 | public class FluxAndMonoGeneratorService { |
How do you create a flux? you have a many different option in Flux class. the simplest way among them is fromIterable() function. this function takes in a collection and then it can create the flux for you and return to the flux.
In reality, the flux might be coming from a DB or a remote service called.
The only way to access Flux value is by subscribing to that flux. what i am saying is the only way to access it is by calling the subscribe() function. when you make the call to the subscribe(), that’s when you will have access to these elements and these elements in the flux will be sent in the form of stream one by one.
The subscriber is always attached to the publisher so Flux is a publisher and subscriber being attached to with subscribe().
Question. Why is nameMono() function blocked by nameFlux() function ?
There is a handy method that’s part of the project record library itself. log() is going to do us, this is going to log each and every event that’s happening between the subscriber and the publisher communication.
1 | 10:40:55.594 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) |
request(unbounded) calling is the default. it goes with unbounded amount of data. That means give me all the data that you have once this request is received by the publisher.
Testing Flux using JUnit5
1 | public class FluxAndMonoGeneratorServiceTest { |
In order to test the reactive types, we need to have a special type of reactor library. I hope you all remember in the initial setup, we are a dependency named reactor test. so we are going to use a class that’s present inside the reactor just in order to test the reactor types.
StepVerifier is a something, which is part of the reactor. so any time you are writing reactive code, which uses flux and mono, which is part of the project, then you can use a StepVerifier in order to write the unit test cases.
one you’ve already consumed, what are the remaining number of events it’s going to be to know i’m going to run this case.
Transform using map() operator
map() operator
Used to transform the element from one form to another in a Reactive Stream
Similar to the map() operator in Streams API
1 | public class FluxAndMonoGeneratorService { |
1 | public class FluxAndMonoGeneratorServiceTest { |
Reactive Streams are Immutable
1 | public class FluxAndMonoGeneratorService { |
You might be thinking this would still apply the application operation to this lowercase string But surprisingly, reactive streams are immutable and this won’t make any changes to the original data source.
1 | public class FluxAndMonoGeneratorServiceTest { |
1 | public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) { |
as you can see the above code, map() function returns new map object when it is occured. that is the reason why map() function enable to keep the immutability.
Filter using filter() operator
filter() operator
used to filter elements in a Reactive stream
similar to the filter() operator in Streams API
1 | public Flux<String> nameFlux_map(int springLength) { |
1 |
|
1 | public Flux<String> nameFlux_map(int springLength) { |
1 |
|
Advanced transform using the flatMap() operator
flatMap() operator
Transforms one source element to a Flux of 1 to N elements
eg., “ALEX” -> Flux.just(“A”, “L”, “E”, “X”)
Use it when the transformation returns a Reactive Type like Flux
(Flux or Mono) source code
1 | public Flux<String> nameFlux_flatmap(int springLength) { |
- test code
1 |
|
the one of interesting one is flatmap function in stream takes the function what signature is Function type for getting Flux something. i think it can control the stream data more easier than other way.
You know namesFlux_Flatmap() test code already thought it would not operated as an asychronous behavior. because expectNext() part test it sequentially.
so if you give some delay to nameFlux_flatmap() function then this test code will be failed. because it can be the situation that the post letters can be processed earier than the first one.
so Eventually, you should remember one thing about flatmap. it is the flatmap function is operated as a asynchrous one.
- source code
1 | public Flux<String> nameFlux_flatmap_async(int springLength) { |
- test code
1 |
|
map() vs flatmap()
map()
- One to One Transformation
- Does the simple transformation from T to V
- Used for simple synchronous transfomations
- Does not support transformations that returns Publisher
flatmap()
- One to N transformations
- Does more that just transformation. Subscribes to Flux or Mono that’s part of the transformation and the flattens it and sends it downstream
- Used for asynchronous transformations
- Use it with transformations that returns Publisher
concatmap()
- works similar to flatmap()
- only difference is that concatmap() preserves the ordering sequence of the Reactive Streams.
1 | public Flux<String> nameFlux_concatmap(int springLength) { |
1 |
|
the performance is a tradeoff, when to use flatmap, when to use concatmap.
if you use concatmap, you have the ordering of the elements, but it’s going to take a lot of time because it is sychronous one. and if you use flatmap, you lose the ordering of the elements, but the processing time is going to be faster.
flatMap in Mono
- use it when the transformation returns a Mono
- Returns a Mono
- Use flatMap if the transformation involves making a REST API call or any kind of functionality that can be done asynchronously
flatMap function in Mono stream returns one marvel. but it includes collection data like List.
- source code
1 | public Mono<List<String>> namesMono_flatMap(int stringLength) { |
- test code
1 |
|