본문 바로가기
[개발] Java & Kotlin/Reactor

4. Reactor 프로그래밍 방식의 Flux, Mono 생성 (동기식)

by 비어원 2022. 5. 14.
728x90

이번 장에서는 Mono와 Flux를 직접 프로그래밍으로 만들어보는 것을 소개하겠다. 대표적으로는 이전 게시물에서 예제 코드로 잠깐 다뤘었던 Flux.handle()이나 Flux.generate() 메서드 등이 있다. 팩토리 메서드가 아닌 프로그래밍 방식으로 Flux를 생성하면 무한 개의 데이터를 Push하는 Flux를 구현할 수도 있다. 여기서 다루는 모든 메서드는 API를 노출하여 Sink 라고 하는 이벤트를 트리거한다는 사실을 공유한다.

프로그래밍 방식으로 Flux를 생성하는 방법은 크게 동기식비동기식으로 나뉜다. 일단 동기식 프로그래밍 방식을 알아보자.

 

동기식 프로그래밍 방식에는 대표적으로 generate()handle()이 있다. 이 메서드들은 이전 게시물에서 예제로 간간히 나온 메서드들이다.

  • generate(): 특정 로직으로 데이터를 하나씩 생성 (유/무한)
  • handle(): 특정 로직으로 upstream의 데이터를 조작

 

generate()

Flux를 프로그래밍 방식으로 생성하는 가장 간단한 방법은 generate 메서드를 이용하는 것이다. generate() 메서드는 동기적인 방식으로 데이터를 하나씩 방출하는 방식이다. generate() 의 메서드를 정의하는 코드를 한번 살펴보자.

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
    Objects.requireNonNull(generator, "generator");
    return onAssembly(new FluxGenerate<>(generator));
}

 

메서드 파라미터로는 Consumer<SynchronousSink<T>> 타입이다. 여기서 Consumer<T> 타입은 람다식으로 표현할 수 있으며 보통은 이런 형식으로 많이 사용한다.

val flux = Flux.generate<Int> { sink ->
    ...
}

 

여기서 sink는 SynchronousSink<T> 타입이고 sink.next() 메서드는 콜백 호출 당 최대 한 번만 호출될 수 있다.(람다 식 내에서 최대 한 번 호출) 그리고 stream 종료를 알리기 위해 선택적으로 sink.error(Throwable) 또는 sink.complete() 메서드를 호출할 수도 있다. downstream에게 각각 onError 또는 onComplete 시그널을 보내는 것과 같다.

 

generate() 메서드를 사용하면서 가장 유용한 점은 다음에 무엇을 방출할지 결정하기 위해 sink를 사용할 때 참조할 수 있는 상태를 유지할 수 있도록 상태변수를 사용할 수 있다. Flux.generate() 메서드는 초기 상태를 제공하는 메서드인 Supplier<S> 와 다음 sink를 생성하는 generator 함수로 구성되어있는데, generator 함수는 BiFunction<S, SynchronousSink<T>> 로 구성되어있으며, <S> 는 상태변수의 타입이다. generator 함수의 리턴값은 다음 단계의 상태가 된다.

 

 

generate() 메서드를 이용하여 피보나치 수를 순서대로 push하는 Publisher를 만들어보자. 피보나치 수는 고등학교 수학시간에 자주나와서 점화식만 간단히 소개하겠다.

 

 

val fibonacciFlux = Flux.generate<Int, Pair<Int, Int>> (
    { 0 to 1 },
    { pair, sink ->
        sink.next(pair.first)

        // 람다의 return 값: 다음 단계의 상태
        pair.copy(first = pair.second, second = pair.first + pair.second)
    }
)

fibonacciFlux.subscribe(
    object: BaseSubscriber<Int>() {
        override fun hookOnSubscribe(subscription: Subscription) {
            request(10)
        }

        override fun hookOnNext(value: Int) {
            println(value)
        }
    }
)

 

실행 결과:

0
1
1
2
3
5
8
13
21
34

 

코드를 보면 초기 상태 변수를 Pair로 F0(=0), F1(=1)을 저장하고, sink.next() 에서 pair.first를 내보내고 상태 변수를 firstsecond, second를 그 다음 항인 first + second로 저장하였다. 그럼 그 다음 request에서는 항상 이전 request의 다음 항을 방출하게 될 것이다.

 

handle()

handle 메서드는 Mono(Flux)의 인스턴스 메서드로서, 기존 스트림에 어떤 로직을 추가하여 핸들링할 때 사용한다. 이 메서드는 generate 메서드와 유사하며, SynchronousSink를 사용하고 오직 1:1 방출만 허용한다. 대신, handle 메서드는 각 source의 요소에서 임의의 값을 생성할 수 있고, 일부 요소는 건너뛸 수도 있다. 그래서 handle() 메서드는 map과 filter 역할을 모두 하는 만능 메서드이다. 그래서 스트림의 복잡한 처리를 하는 경우 handle()을 유용하게 사용할 수 있다. handle() 메서드의 정의를 한번 살펴보자.

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

 

이 메서드의 매개변수에는 BiConsumer 타입인데 이 또한 람다식으로 표현할 수 있다. (data와 sink의 타입이 달라도 상관 없다.)

val flux = Flux.range(1, 10) // 기존 스트림
    .handle<Int> { data, sink ->    // data: Int, sink: SynchronousSink<Int>
        ...
    }
  • handle에 제네릭 타입은 명시해야 한다. sink의 타입을 추론할 수 없기 때문이다.

 

handle의 예제는 여러가지가 있다. 가장 먼저 기본적으로 기존 스트림의 값을 두 배 증가시키는 handle() 메서드를 작성해보자.

val flux = Flux.range(1, 10)
    .handle<Int> { data, sink ->
        sink.next(data * 2)
    }

flux.subscribe { println(it) }

 

실행 결과:

2
4
6
8
10
12
14
16
18
20

 

위의 예제에서 추가로, 3의 배수는 거르도록 handle()을 작성할 수도 있다. 건너뛰는 것은 sink.next()를 호출하지 않으면 된다.

val flux = Flux.range(1, 10)
    .handle<Int> { data, sink ->
        if (data % 3 != 0) sink.next(data * 2)
    }

flux.subscribe { println(it) }

 

실행 결과:

2
4
8
10
14
16
20

 

728x90

'[개발] Java & Kotlin > Reactor' 카테고리의 다른 글

3. Reactor Backpressure  (0) 2022.05.01
2. Reactor Core (Mono, Flux, Subscribe)  (0) 2022.04.29
1. Reactor 소개  (0) 2022.04.27

댓글