Webflux

Reative Streams

RxJava 와 같은 리액티브 라이브러리는 마이크로소프트 사에서 맨 처음에 그 생태계를 구축했는데, 이 후에 Reactive Streams 라는 리액티브 프로그래밍을 위한 표준 명세가 나오면서 지금은 Project Reactor 와 같은 그 기반에 다양한 구현체들이 등장하였다.

a standardization for Java emerged through the Reactive Streams effort, a specification that defines a set of interfaces and interaction rules for reactive libraries on the JVM

Webflux 의존성

1
implementation 'org.springframework.boot:spring-boot-starter-webflux'

ServerRequest

Webflux 프로젝트에서 REST API 요청, 응답 객체를 ServerRequest, ServerResponse 를 통상적으로 사용하는 것 같다. 이 객체를 사용하면서 PathVariable 등과 같이 기존에는 어노테이션을 활용해서 값을 가져오는 방법들이 다소 달라진다.
어려운 내용은 아니고 단지 사용 방법이 변경된 것이기 때문에 필요한 부분이 있을 때 마다 검색해서 사용해도 이슈 없을 듯.

  • 쿼리스트링 값 가져오기
  • PathVariable 값 가져오기

RouterFunction route(RequestPredicate predicate, HandlerFunction handlerFunction)

Spring RouterFunction is a functional interface with route functional method. The role of RouterFunction is routing incoming requests to specified handler function.

라우터와 핸들러를 매핑해주는 route() 함수는 두개의 파라미터를 받는데 첫번째 requestPredicate 에서 어떤 요청 정보를 매핑시켜줄지 결정할 수 있다. 가령 요청으로 들어오는 url path, http header 정보들 등이 있다.

두번째 파라미터인 HandlerFunction 에서 실제 작업이 이루어지는 handler 부분을 넣어주는데 이 HandlerFunction 함수 시그니처는 아래와 같다.

HandlerFunction

1
2
3
4
@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
Mono<T> handle(ServerRequest request);
}

이를 보면 알 수 있듯이, 요청 타입은 ServerRequest, 응답 타입은 ServerResponse 로 제한되어 있다. 즉 웹플럭스를 사용하게되면 REST API 부분에서 사용하게 될 요청 응답 객체로 ServerRequest, ServerResponse 가 필수 임을 알 수 있다.

HandlderFunction 을 보면 리턴 타입이 Mono 로 되어있는데 왜 Mono 타입으로만 제한되어 제공이 될까? (왜 Flux 타입으로 REST 응답이 내려가지 않을까??) -> REST API 에서는 최종적으로 Mono 타입으로만 내려간다. (HTTP 프로토콜의 한계가 아닐까?)

ReactiveCrudRepository

아직 JDBC 드라이버가 비동기 코드를 지원하지 않아 이부분은 아직까진 어쩔 수 없긴 합니다.
MongoDB나 Redis처럼 비동기 client를 사용하는 NoSQL들은 현재 Reactive용 Repository를 따로 사용할 수 있습니다. ReactiveCrudRepository 등을 이용해서 Repository단에서 바로 Flux나 Mono로 조회가 가능합니다.

QuerydslPredicateExecutor

Querydsl 에 있는 Predicate 객체 (= BooleanExpression 의 상위 객체) 를 파라미터로 받아서 이 값을 기준으로 Query 를 날릴 수 있도록 Spring JPA 에서 지원해주는 것 같음. Spring JPA 에서 Querydsl 을 서포트 해주는 중간 다리 느낌의 인터페이스

QuerydslRepositorySupport

querydsl 을 활용해서 쿼리를 작성할 때 뭔가 도움을 주는 클래스 같다.

  • from() 활용하면 select 부분 생략하고 쿼리 작성이 가능하다.

delegate pattern

https://kotlinworld.com/85

Scheduler (리액터 쓰레드 스케줄링)

리액터를 사용할 때 쓰레드에 관련 작업을 처리해주는 객체. 기본적으로는 단일 쓰레드로 동작하지만 Scheduler 를 활용하면 다양한 형태로 쓰레드를 사용할 수 있음.

parallel(): ExecutorService기반으로 단일 스레드 고정 크기(Fixed) 스레드 풀을 사용하여 병렬 작업에 적합함.
single(): Runnable을 사용하여 지연이 적은 일회성 작업에 최적화
elastic(): 스레드 갯수는 무한정으로 증가할 수 있고 수행시간이 오래걸리는 블로킹 작업에 대한 대안으로 사용할 수 있게 최적화 되어있다.
boundedElastic(): 스레드 갯수가 정해져있고 elastic과 동일하게 수행시간이 오래걸리는 블로킹 작업에 대한 대안으로 사용할 수 있게 최적화 되어있다.
immediate(): 호출자의 스레드를 즉시 실행한다.
fromExecutorService(ExecutorService) : 새로운 Excutors 인스턴스를 생성한다.

기본적으로 리액티브 스트림은 subscribe 를 하고 주고 받아야할 데이터가 계속 있다면 next() 호출, 마지막 데이터를 주고 받을 때에는 onComplete() 혹은 실패의 경우 일 때 onError() 를 던진다. 여기서 이 next() 신호를 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다

Flux#parallel(int parallelism)

위 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다.

Mono.zip()

@EnableWebFlux

웹플럭스를 사용하기 위해서는 Router 관련 Config 객체에 @EnableWebFlux 어노테이션을 붙여줘야 하지만, 스프링 부트에서는 이를 기본적으로 자동설정해주고 있다. 그렇기 때문에 생략 가능하다.

The @EnableWebFlux enables WebFlux features for annotated controller and functional web programming model. In Spring Boot Application, @EnableWebFlux is not needed because Spring Boot autoconfigures WebFlux features.

WebClient

Spring WebClient is a non-blocking reactive client API that performs HTTP requests.

reference

https://www.concretepage.com/spring-5/spring-webflux-functional-endpoints

https://projectreactor.io/docs/core/release/reference/

R2DBC

https://gompangs.tistory.com/entry/Spring-R2DBC-MySQL

https://taes-k.github.io/2020/08/17/spring-reactive-2/

Reactive 로 사용되는 Repository 의 벤더가 무엇일까

현재 데이터베이스가 리액티브 형태로 제공되는 것은 없는 것 같다..

블로킹 DB 에서 데이터를 webflux 에서 가져올 때 별도의 스레드를 사용해서 처리하지만, save 하는 경우는 이런 처리 없이 그냥 하는 거 같음.

Persistable

save의 기본 동작은 식별자가 없으면 persist, 식별자가 있으면 merge 이다. merge 는 데이터베이스에서 식별자로 조회를 해본 후 없으면 insert, 있으면 update -> 이게 성능이 떨어뜨리는 요인이 된다고 함.

Persistance 인터페이스를 해당 엔티티에 구현하게 되면 id 자동 할당시에 동일 성능을 유지할 수 있다.

1
2
3
4
5
6
7
8
9
@Override 
public String getId() {
return id;
}

@Override
public boolean isNew() {
return createdDate == null;
}

isNew() 에서 DB 에 저장되지 않은 새로운 엔티티를 어떻게 식별할지 재정의가 가능하다

https://yoonbing9.tistory.com/39

https://stackoverflow.com/questions/59468908/reactive-repository-throws-exception-when-saving-a-new-object

sharding sphere

아파치 오픈소스로 구현된 분산 DB 미들웨어 솔류션으로 3개의 독립 제품으로 구성

  • Sharding-JDBC
  • Sharding-Proxy
  • Sharding-Sidecar

https://velog.io/@ette9844/Sharding-Sphere-Sharding-Sphere-%EC%97%90-%EB%8C%80%ED%95%98%EC%97%AC

FluxSink

Wrapper API around a downstream Subscriber for emitting any number of next signals followed by zero or one onError/onComplete.

1
2
3
4
5
6
7
8
9
10
11
12
public Mono<ServerResponse> flux(ServerRequest serverRequest) {
Flux<Integer> flux = Flux.create((FluxSink<Integer> sink) -> {
sink.onRequest(request -> {
for (int i=0; i<=request + 3; i++) {
sink.next(i);
}
});
});

log.info(flux.toString());
return Mono.empty();
}

FluxSink#onRequest(LongConsumer) 메서드의 Consumer는 Subscriber가 데이터를 요청했을 때 불린다

https://javacan.tistory.com/entry/Reactor-Start-3-RS-create-stream

ServerRequest.bodyToMono

1
<T> Mono<T> bodyToMono(Class<? extends T> elementClass);

Webflux 에서는 ServerRequest 로 REST 요청을 받는데 여기서 body 에 해당하는 영역을 DTO로 매핑해주고 싶을 때 사용할 수 있는 함수.

Mono.map : Transform the item emitted by this Mono by applying a synchronous function to it.

Mono.flatMap : Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

Mono.flatmap

map과 flatMap은 둘 다 스트림의 중간에 값을 변환해주는 역할을 한다.
map은 1 : 1로 반환을 보증하고 flatMap은 1 : N을 변환할 수 있다.

flatMap을 사용하게 되는 경우는 다음과 같다.
Mono -> Mono 또는 Mono -> Flux로 변환이 필요한 경우

map 은 반환 값이 대상 Object이고 flatMap은 reactor의 Publisher (Mono / Flux)이다.

Mono -> Mono의 flatMap의 경우 성능 상 이점이 아닌 Publisher 객체 타입 변환이 목적이기 때문에 flatMap을 사용한다.

Mono -> Flux의 flatMap의 경우 Publisher 객체 타입도 변환하고 변환된 이후 비동기로 병렬 동작하는 publisher를 사용하는 경우 forEach를 통한 동기로 실행하는 경우보다 빠른 처리가 가능해진다.

Mono -> Mono의 flatMap의 경우 비동기로 처리하는 의미가 없기 때문에 굳이 flatMap을 쓸 이유는 없다.

https://luvstudy.tistory.com/95

null이 반환되면 reactive flow가 중지되기 때문에 Mono.empty(), Flux.empty()와 같이 지정된 빈 값 반환 처리를 해야 한다.

collectList()

Flux -> Mono>

Mapstruct

빌드할 때 구현체 파일 만들어 지려면 Getter, Setter 필요

https://www.baeldung.com/mapstruct-ignore-unmapped-properties

매퍼 스프링 빈으로 등록하고 싶다면??

1
2
3
4
5
6
7
@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE, componentModel = "spring")
public interface AliasMapper {
AliasMapper INSTANCE = Mappers.getMapper(AliasMapper.class);

Alias toDomain(AliasDto.Save.Req from);
}

componentModel = “spring” 추가

JobExecutionListener

  • 일단 특정 job 들이 수행되기 이전이나 이후에 작업을 할 수 있도록 하는 라이프 사이클 적인 객체 같다
  • 프록시 패턴이나 AOP 와 같은 느낌으로 구현되어 있을 것 같다…

FactoryBean

Share