Reactive Programming in Modern Java using Project Reactor

Why Reactive Programming ?

Traditional Programming

Traditional Programming makes a call to the DB and the code goes into a blocking phase and wait for the result. Once the result is retrieved from the DB, then the correct solution completes this way of writing code.

This way is very common and this is also called synchronous or blocking style of writing code. Basically the thread is involved in the whole interaction is blocked until the response is received from the DB.

What has changed?

  • Internet usages spiked up
  • Network interactions are pretty common
  • Microservices Architectures are pretty much everywhere
  • Applications are deployed in cloud environments
  • Response times are expected in milliseconds
  • No downtime

Today’s Architecture (Backend Restful API)

in the above case, the response time is the summation of a DB + the other API calls response time.

This model works to a certain limit and you can’t have a thread pool based on the number of users you want to support.

Let’s say you have a Usecase where you need to support 100,000 concurrent users. in that case, you can’t define a thread pool with 100,000 that you might be thinking why we can’t have a thead pool size that matches the number of users you want to support.

Thread and it’s side effects

  • Thread is an expensive resource
  • It takes up 1MB of heap space More threads lead to more heap space which leads to shortage of JVM memory for handling the request.

Can we do better ?

Callbacks

  • Asynchronous methods that accept a callback as a parameter and invokes it when the blocking call completes.
  • Writing code with Callbacks are hard to compose and difficult to read and maintain
  • There is a popular term tied with callbacks which is a Callbackhell

Future

  • Released in Java 5
  • Write Asynchronous Code
  • No easy way to combine the result from multiple futures
  • Future.get() is a blocking call

CompletableFuture

  • Released in Java 8
  • Write Asynchronous code in a functional style
  • Easy to compose/combine MultipleFutures
  • Future that returns many elements
  • Eg., CompletableFuture> will need to wait for the whole collection to built and readily available.
  • CompletableFuture does not have a handle for infinite values.

What is Reactive Programming ?

  • Reactive Programming is a new programming paradigm
  • Asynchronous and non blocking
  • Data flows as an Event/Message driven stream
  • Functional Style Code
  • BackPressure on Data Streams

requestForData() is not a blocking call anymore and Calling thread is released to do useful work.

request(n) is a signal to the DB from the app that it is ready to consume the data.

Once the query results are ready, then the data will be sent in the form of a stream of events. This stream is called the reactive stream. The actual data will be sent using the onNext() function to the app.

Once all the results are sent to the app, then there will be a completion even which signals the app that there is no more data and that event will be sent using the onComplete() function.

This is how the code and data flow works in reactive programming.

Backpressure

reactive streams can make easily to overwhelm the app with lots of data. so Backpressure as a concept which is dedicated to solve this problem.

Let’s say we don’t want any more data to DB. then the cancel() request will be sent to the data source and no data will be sent after that. The concept of app controling the data flow is called Backpressure.

When to use Reactive Programming ?

Use Reactive programming when there is need to build and support app that can handle high load.

Reactive App Architecture

  • Handle request using non blocking style
  • Netty is a non blocking server uses Event-loop style
  • Using Project reactor for writing non blocking code
  • Spring WebFlux uses the Netty and Project Reactor for building non blocking or reactive APIs

Reactive Streams

Reactive Streams Specification

  • Publisher
1
2
3
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}

The publisher interface has just one method named subscribe() and it takes a parameter with the type named Subscriber.

Publisher can be a database or a remote service or anything which holds the data for the application.

  • Subscriber
1
2
3
4
5
6
7
8
9
public interface Subscriber<T> {
public void onSubscribe(Subscription s);

public void onNext(T t);

public void onError(Throwable t);

public void onComplete();
}
  • Subscription
1
2
3
4
5
public interface Subscription {
public void request(long n);

public void cancel();
}

Subscription interface is a one which connects the app and the data source. in our case, it’s the publisher and subscriber.

  • Processor
1
2
3
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

Processor can behave as a Subscriber and Publisher. Not really used this on a day to day basis.

Reactive Streams - How it works together ?

  • Success scenario

the Subscriber initiating the request by invoking the subscribe() method of the Publisher. The second step is the Publisher handing out the subscription object by invoking the onSubscribe() method of the Subscriber.

Once a Subscriber has the whole of the Subscription object, then the Subscriber is going to invoke the request function of the Subscription object, requesting the Publisher to send all the data.

Once the publisher receives a request to send the data, then it sends a data using the onNext() function. That’s part of the Subscriber interface on the form of events. Once all the data is sent, then the next step is a Publisher sending an onComplete() event by the onComplete() function.

This is how the Publisher, Subscriber and Subscription interface work together in a success scenario.

  • Error scenario

Exceptions are treated the same way as like the data in reactive programming, which means in this case the data are exception will be sent to the app in the form of events.

Introduction to Project Reactor

Project Reactor

  • Project Reactor is an implementation of Reactive Streams Specification

  • Project Reactor is a Reactive Library

  • Spring WebFlux uses Project Reactor by default.

Reactor offers tow reactive and composable APIs, Flux [N] and Mono [0|1]

Reactor Reference Guide Document

Reactor Reactive Types - Flux and Mono

Flux & Mono

  • Flux and Mono is a reactive type that implements the Reactive Streams Specification

  • Flux and Mono is part of the reactor-core module

  • Flux is a reactive type to represent 0 to N elements

  • Mono is reactive type to represent 0 to 1 element

Flux - 0 to N elements

Marble is at the top, represents the data from the actual data source. since it’s a flux, we have multiple marbles in the data source layer. Each inidividual marble represents the data that’s been sent using the onNext() operator and the Subscriber.

operator is the layer where you can apply transformation techniques to the actual element that’s been emitted from the source. Some of the common and simple transformations are the map() operator and filter() operator. There are a lot of operators in addition to those that are presented project reactor.

The next layer is a result of the transformation that’s been applied the oerator layer. this is the final result that will be handed to the subscriber.

The vertical bar represents that there is no more data. This is the representation to send the onComplete() signal to the Subscriber.

The cross signal represents that some exceptional an error occurred. This is the onError() signal that will be sent to the subscriber.

Mono - 0 to 1 Element

Mono represent a single marble in the nearest source layer. let’s say you have a request which is going to retrieve a single data.

the vertical board and the cross represent the same behavior as a flux, which is the onComplete() and onError() signal.

the operator section is similar to Flux, which has different operator to apply the transformation techniques to the actual element that is emitted from the data souce layer.

Project Setup

https://github.com/kidongYun/reactive_programming_using_reactor

Imperative Style vs Functional Style

Why Functional Programming ?

  • Reactive programming uses Functional Programming style of code Eg., Code similar to Streams API

  • Reactive Programming is an extension to Functional Programming

What is Functional Programming ?

  • This programming model was introduced in Java 8

  • Functional programming is powered by

    • Lambdas
    • Method References
    • Functional Interfaces
  • Functional Programming promotes :

    • Behavior Parameterization
    • Immutability
    • Concise code

Imperative Example vs Functional Example

  • ImperativeExample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ImperativeExample {
public static void main(String[] args) {
var namesList = List.of("alex", "ben", "chloe", "adam", "adam");
var newNamesList = namesGreaterThanSize(namesList, 3);

System.out.println("newNamesList : " + newNamesList);
}

private static List<String> namesGreaterThanSize(List<String> namesList, int size) {
var newNamesList = new ArrayList<String>();

for(String name : namesList) {
if(name.length() > size && !newNamesList.contains(name)) {
newNamesList.add(name.toUpperCase());
}
}

return newNamesList;
}
}

  • FunctionalExample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FunctionalExample {
public static void main(String[] args) {
var namesList = List.of("alex", "ben", "chloe", "adam", "adam");
var newNamesList = namesGreaterThanSize(namesList, 3);

System.out.println("newNamesList : " + newNamesList);
}

private static List<String> namesGreaterThanSize(List<String> namesList, int size) {
return namesList
.stream()
.filter(s -> s.length() > size)
.map(String::toUpperCase)
.distinct()
.sorted()
.collect(Collectors.toList());
}
}

Let’s write our very first Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FluxAndMonoGeneratorService {
public Flux<String> namesFlux() {
return Flux.fromIterable(List.of("alex", "ben", "chloe")); // db or a remote service call
}

public Mono<String> nameMono() {
return Mono.just("alex");
}

public static void main(String[] args) {
FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();

fluxAndMonoGeneratorService.namesFlux()
.subscribe(name -> {
System.out.println("Flux Name is : " + name);
});

// Question. Why should be here blocked ?

fluxAndMonoGeneratorService.nameMono()
.subscribe(name -> {
System.out.println("Mono Name is : " + name);
});
}
}

How do you create a flux? you have a many different option in Flux class. the simplest way among them is fromIterable() function. this function takes in a collection and then it can create the flux for you and return to the flux.
In reality, the flux might be coming from a DB or a remote service called.

The only way to access Flux value is by subscribing to that flux. what i am saying is the only way to access it is by calling the subscribe() function. when you make the call to the subscribe(), that’s when you will have access to these elements and these elements in the flux will be sent in the form of stream one by one.

The subscriber is always attached to the publisher so Flux is a publisher and subscriber being attached to with subscribe().

Question. Why is nameMono() function blocked by nameFlux() function ?

There is a handy method that’s part of the project record library itself. log() is going to do us, this is going to log each and every event that’s happening between the subscriber and the publisher communication.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
10:40:55.594 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
10:40:55.595 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
10:40:55.596 [main] INFO reactor.Flux.Iterable.1 - | onNext(alex)
Flux Name is : alex
10:40:55.597 [main] INFO reactor.Flux.Iterable.1 - | onNext(ben)
Flux Name is : ben
10:40:55.597 [main] INFO reactor.Flux.Iterable.1 - | onNext(chloe)
Flux Name is : chloe
10:40:55.597 [main] INFO reactor.Flux.Iterable.1 - | onComplete()
10:40:55.618 [main] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
10:40:55.618 [main] INFO reactor.Mono.Just.2 - | request(unbounded)
10:40:55.618 [main] INFO reactor.Mono.Just.2 - | onNext(alex)
Mono Name is : alex
10:40:55.618 [main] INFO reactor.Mono.Just.2 - | onComplete()

request(unbounded) calling is the default. it goes with unbounded amount of data. That means give me all the data that you have once this request is received by the publisher.

Testing Flux using JUnit5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FluxAndMonoGeneratorServiceTest {
FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();

@Test
void namesFlux() {
// given

// when
var namesFlux = fluxAndMonoGeneratorService.namesFlux();

// then
StepVerifier.create(namesFlux)
// .expectNext("alex", "ben", "chloe")

// .expectNextCount(3)

.expectNext("alex")
.expectNextCount(2)

.verifyComplete();
}

@Test
void nameMono() {
// given

// when
var nameMono = fluxAndMonoGeneratorService.nameMono();
// then
StepVerifier.create(nameMono)
.expectNext("alex")
.verifyComplete();
}
}

In order to test the reactive types, we need to have a special type of reactor library. I hope you all remember in the initial setup, we are a dependency named reactor test. so we are going to use a class that’s present inside the reactor just in order to test the reactor types.

StepVerifier is a something, which is part of the reactor. so any time you are writing reactive code, which uses flux and mono, which is part of the project, then you can use a StepVerifier in order to write the unit test cases.

one you’ve already consumed, what are the remaining number of events it’s going to be to know i’m going to run this case.

Transform using map() operator

map() operator

  • Used to transform the element from one form to another in a Reactive Stream

  • Similar to the map() operator in Streams API

1
2
3
4
5
6
7
8
9
10
11
12
public class FluxAndMonoGeneratorService {

// ...

public Flux<String> nameFlux_map() {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase)
.log();
}

// ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FluxAndMonoGeneratorServiceTest {

// ...

@Test
void namesFlux_map() {
// given

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_map();

// then
StepVerifier.create(namesFlux)
.expectNext("ALEX", "BEN", "CHLOE")
.verifyComplete();
}
}

Reactive Streams are Immutable

1
2
3
4
5
6
7
8
9
10
11
12
public class FluxAndMonoGeneratorService {

// ...

public Flux<String> nameFlux_immutability() {
var nameFlux = Flux.fromIterable(List.of("alex", "ben", "chloe"));
nameFlux.map(String::toUpperCase);
return nameFlux;
}

// ...
}

You might be thinking this would still apply the application operation to this lowercase string But surprisingly, reactive streams are immutable and this won’t make any changes to the original data source.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FluxAndMonoGeneratorServiceTest {

// ...

@Test
void namesFlux_immutability() {
// given

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_immutability();

// then
StepVerifier.create(namesFlux)
.expectNext("alex", "ben", "chloe")
.verifyComplete();
}
}
1
2
3
4
5
6
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new FluxMapFuseable<>(this, mapper));
}
return onAssembly(new FluxMap<>(this, mapper));
}

as you can see the above code, map() function returns new map object when it is occured. that is the reason why map() function enable to keep the immutability.

Filter using filter() operator

filter() operator

  • used to filter elements in a Reactive stream

  • similar to the filter() operator in Streams API

1
2
3
4
5
6
public Flux<String> nameFlux_map(int springLength) {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase)
.filter(s -> s.length() > springLength)
.log();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
void namesFlux_map() {
// given
int stringLength = 3;

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_map(stringLength);

// then
StepVerifier.create(namesFlux)
.expectNext("ALEX", "CHLOE")
.verifyComplete();
}
1
2
3
4
5
6
7
public Flux<String> nameFlux_map(int springLength) {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase)
.filter(s -> s.length() > springLength)
.map(s -> s.length() + "-" + s)
.log();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
void namesFlux_map() {
// given
int stringLength = 3;

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_map(stringLength);

// then
StepVerifier.create(namesFlux)
.expectNext("4-ALEX", "5-CHLOE")
.verifyComplete();
}

Advanced transform using the flatMap() operator

flatMap() operator

  • Transforms one source element to a Flux of 1 to N elements

  • eg., “ALEX” -> Flux.just(“A”, “L”, “E”, “X”)

  • Use it when the transformation returns a Reactive Type like Flux (Flux or Mono)

  • source code

1
2
3
4
5
6
7
8
9
10
11
12
13
public Flux<String> nameFlux_flatmap(int springLength) {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase) // ALEX, BEN, CHLOE
.filter(s -> s.length() > springLength) // ALEX, CHLOE
.flatMap(this::splitString) // A, L, E, X, C, H, L, O, E
.log();
}

// ALEX -> Flux(A,L,E,X)
public Flux<String> splitString(String name) {
var charArray = name.split("");
return Flux.fromArray(charArray);
}
  • test code
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
void namesFlux_flatmap() {
// given
int stringLength = 3;

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_flatmap(stringLength);

// then
StepVerifier.create(namesFlux)
.expectNext("A", "L", "E", "X", "C", "H", "L", "O", "E")
.verifyComplete();
}

the one of interesting one is flatmap function in stream takes the function what signature is Function type for getting Flux something. i think it can control the stream data more easier than other way.

You know namesFlux_Flatmap() test code already thought it would not operated as an asychronous behavior. because expectNext() part test it sequentially.

so if you give some delay to nameFlux_flatmap() function then this test code will be failed. because it can be the situation that the post letters can be processed earier than the first one.

so Eventually, you should remember one thing about flatmap. it is the flatmap function is operated as a asynchrous one.

  • source code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Flux<String> nameFlux_flatmap_async(int springLength) {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase) // ALEX, BEN, CHLOE
.filter(s -> s.length() > springLength) // ALEX, CHLOE
.flatMap(this::splitString_withDelay) // A, L, E, X, C, H, L, O, E
.log();
}

// ALEX -> Flux(A,L,E,X)
public Flux<String> splitString_withDelay(String name) {
var charArray = name.split("");
var delay = new Random().nextInt(1000);
return Flux.fromArray(charArray)
.delayElements(Duration.ofMillis(delay));
}
  • test code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    @Test
void namesFlux_flatmap_async() {
// given
int stringLength = 3;

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_flatmap_async(stringLength);

// then
StepVerifier.create(namesFlux)
// .expectNext("A", "L", "E", "X", "C", "H", "L", "O", "E")
.expectNextCount(9)
.verifyComplete();
}

map() vs flatmap()

map()

  • One to One Transformation
  • Does the simple transformation from T to V
  • Used for simple synchronous transfomations
  • Does not support transformations that returns Publisher

flatmap()

  • One to N transformations
  • Does more that just transformation. Subscribes to Flux or Mono that’s part of the transformation and the flattens it and sends it downstream
  • Used for asynchronous transformations
  • Use it with transformations that returns Publisher

concatmap()

  • works similar to flatmap()
  • only difference is that concatmap() preserves the ordering sequence of the Reactive Streams.
1
2
3
4
5
6
7
public Flux<String> nameFlux_concatmap(int springLength) {
return Flux.fromIterable(List.of("alex", "ben", "chloe"))
.map(String::toUpperCase) // ALEX, BEN, CHLOE
.filter(s -> s.length() > springLength) // ALEX, CHLOE
.concatMap(this::splitString_withDelay) // A, L, E, X, C, H, L, O, E
.log();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    @Test
void namesFlux_concatmap() {
// given
int stringLength = 3;

// when
var namesFlux = fluxAndMonoGeneratorService.nameFlux_concatmap(stringLength);

// then
StepVerifier.create(namesFlux)
.expectNext("A", "L", "E", "X", "C", "H", "L", "O", "E")
// .expectNextCount(9)
.verifyComplete();
}

the performance is a tradeoff, when to use flatmap, when to use concatmap.
if you use concatmap, you have the ordering of the elements, but it’s going to take a lot of time because it is sychronous one. and if you use flatmap, you lose the ordering of the elements, but the processing time is going to be faster.

flatMap in Mono

  • use it when the transformation returns a Mono
  • Returns a Mono
  • Use flatMap if the transformation involves making a REST API call or any kind of functionality that can be done asynchronously

flatMap function in Mono stream returns one marvel. but it includes collection data like List.

  • source code
1
2
3
4
5
6
7
public Mono<List<String>> namesMono_flatMap(int stringLength) {
return Mono.just("alex")
.map(String::toUpperCase)
.filter(s -> s.length() > stringLength)
.flatMap(this::splitStringMono)
.log();
}
  • test code
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
void namesMono_flatMap() {
// given
int stringLength = 3;

// when
var value = fluxAndMonoGeneratorService.namesMono_flatMap(stringLength);

// then
StepVerifier.create(value)
.expectNext(List.of("A", "L", "E", "X"))
.verifyComplete();
}

flatMapMany() in Mono

Share