본문 바로가기
[개발] Java & Kotlin/Reactor

2. Reactor Core (Mono, Flux, Subscribe)

by 비어원 2022. 4. 29.
728x90

이번장에는 Reactor를 직접 사용하여 Reactive programming을 어떻게 하는지 알아보자. 필자는 Kotlin을 주로 사용하기 때문에 언어는 Kotlin을 사용한다.

 

Dependency

먼저 Reactor를 사용하기 위한 환경과 디펜던시를 맞추자. Reactor가 Java8 함수형 API(CompletableFuture, Stream, Duration)와 관련이 있기 때문에 Reactor를 사용하기 위해서는 Java 8 이상이어야 한다. 그리고 Reactor 라이브러리를 사용하기 위해서 gradle 설정을 추가하자. (KotlinDSL)

 

plugins {
    id("io.spring.dependency-management") version "1.0.9.RELEASE"
}

dependencies {
    implementation("io.projectreactor:reactor-core:3.3.22.RELEASE")
}

 

Mono, Flux

Reactor에서는 Publisher를 구현함과 동시에 풍부한 연산자도 제공해주는 조합 가능한 reactive type을 제공하는데 이것이 바로 MonoFlux 이다. Mono 는 0 또는 1([0,1])개의 항목을 가지는 리액티브 시퀀스를 나타내며, Flux 는 여러 개([0,N]) 항목을 가지는 리액티브 시퀀스를 나타낸다.

 

Mono

Mono 는 [0,1] 개의 항목에 대한 비동기 시퀀스를 나타내는 Publisher이다. Publisher는 Subscriber에게 데이터를 전달할 때는 onNext 시그널을, 데이터를 모두 전달했다고 알릴 때는 onComplete를, 에러를 전달할 때는 onError 시그널을 트리거하는데, Subscriber에서는 각각 onNext(), onComplete(), onError() 메서드로 전달된다. 즉, subscriber는 이 세 경우애 대한 핸들링 로직을 구현하면 된다.

 

일단 Mono 객체를 만들어보자. Mono에서는 Mono.just() 팩토리 메서드를 통해 Mono 객체를 간단히 만들 수 있다. Kotlin은 타입 추론을 지원하기 때문에 별도의 타입을 명시하지 않아도 된다.

 

val monoString = Mono.just("beer1")
val monoInt = Mono.just(123)

 

데이터가 있는 Mono객체도 있지만 데이터가 없는 객체도 있다. 데이터가 없는 Mono 객체는 Mono.empty()를 통해 생성할 수 있다. 이 경우에는 제네릭에 타입을 명시해야 한다. (타입 추론을 할 단서가 없기 때문이다.) Reactor에서 null의 개념과 비슷한 것 같다.

 

val emptyMono = Mono.empty<Int>()

 

그리고 Mono 객체는 Subscriber에게 3가지 시그널을 보낼 수 있는데 시그널 자체를 방출하지 않는 Mono 객체도 정의할 수 있다. (이 객체는 어디에서 쓰는지는 아직 모르겠다.) Mono.never() 메서드를 통해 생성할 수 있다.

 

val monoNever = Mono.never<Int>()

 

Flux

Flux 는 [0,N] 개의 항목에 대한 비동기 시퀀스를 나타내는 Publisher이다.

 

Flux는 Mono에 비해 여러가지 팩토리 메서드를 제공한다. 먼저 Flux.just(vararg data: T!) 메서드를 통해 Flux를 생성할 수 있다.

 

val oneFluxInt = Flux.just(1)
val manyFluxInt = Flux.just(1, 2, 3, 4)

 

그리고 Flux.fromIterable() 메서드를 통해 Iterable 객체를 Flux로 변환할 수 있다.

 

val listToFlux = Flux.fromIterable(listOf(1,2,3,4))
val setToFlux =  Flux.fromIterable(setOf(1,2,3,4))

 

그리고 Flux.range(start, count) 메서드를 통해 Int형 Flux를 생성할 수 있다.

val rangeFlux = Flux.range(5, 10) // 5, 6, 7, ..., 14

 

Subscribe

Flux (Mono) 객체를 생성만 하면 아무 일도 일어나지 않는다. 실제로 Flux(Mono)를 구독을 해야 데이터를 읽어올 수 있다. Flux(Mono)를 구독하는 subscribe 메서드는 여러가지가 있는데 하나하나 살펴보자.

 

subscribe(); // (1): 시퀀스를 구독하고 트리거한다.

subscribe(Consumer<? super T> consumer); // (2): 방출된 값 각각으로 어떤 행동을 한다.

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // (3): (2) + 에러가 발생할 때는 별도의 행동을 한다.

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); // (4): (3) + 시퀀스가 완료되었을 때는 또 다른 행동을 한다.

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);
// (5): (4) +  몇 개의 데이터를 구독할 것인지 (취소할 것인지) 정의

 

subscribe (1)

먼저 (1)의 subscribe()는 시퀀스를 구독하고 트리거하여 Flux의 값을 받아온다.

 

val intFlux = Flux.range(1, 5).log()

intFlux.subscribe()
  • Flux.log() 메서드는 Publisher가 보내는 신호를 로그로 남기는 Operator이다.

 

다음 코드를 실행하면 아래와 같은 결과가 나온다.

 

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
[ INFO] (main) | onComplete()

 

Publisher와 Subscriber의 데이터 전달 과정은 보통 다음과 같은 순서로 이루어진다.

 

  1. subscribe()를 하는 순간 onSubscribe() 메서드가 호출된다. 이 때 Publisher와 연동돤 subscription을 받는다. 전달받은 subscription을 이용하여 Publisher에게 데이터를 요청한다.
  2. 그 다음, downstream에서 request(unbounded) 를 호출하여 upstream에게 무제한 요청이 전파된다.
  3. Publisher가 onNext 시그널을 보내 데이터를 하나씩 전달한다. subscriber에게 데이터가 전달될 경우 onNext(data) 메서드가 호출딘다.
  4. Publisher가 모든 데이터를 다 전달한 경우 subscriber에게 onComplete 시그널을 보낸다. 이 때 subscriber의 onComplete() 메서드가 호출된다.

 

위의 경우에는 onSubscribe(), request(), onNext(), onComplete(), onError() 메서드를 구현하지 않고 기본 메서드를 사용하고 있어서 onComplete()를 호출하고는 아무 일도 발생하지 않는다.

 

subscribe (2)

(2)의 subscribe()는 Flux에서 방출한 값 각각에 대한 핸들링 로직을 정의할 수 있다. 예제에서는 방출된 값 하나하나를 print한다. 여기서 정의하는 로직이 onNext()에 대한 핸들링 로직이다.

Params: consumer – the consumer to invoke on each value (onNext signal)

 

val intFlux = Flux.range(1, 5)

intFlux.subscribe { num ->
    println("item: $num")
}

 

실행 결과:

 

item: 1
item: 2
item: 3
item: 4
item: 5

 

subscribe (3)

세 번째 subscribe() 는 Flux에서 에러 신호를 보낼 경우 별도로 어떤 행동을 정의할 수 있다. Flux에서 에러를 방출하는 순간 구독은 종료된다.

 

class NumberException: RuntimeException("숫자가 왜 4지?")

val intFlux = Flux.range(1, 5)
.handle<Int> { num, sink ->
    if (num == 4) sink.error(NumberException())
    else sink.next(num)
}

intFlux.subscribe(
    { num ->
        println("Item: $num")
    },
    { error ->
        println("Error message: ${error.message}")
    }
)
  • handle()은 Publisher에 대한 Operator 로직을 정의한다. Publisher에서 방출되는 각 값은 handle() Operator를 거쳐 데이터가 변형된다. handle()에 대한 내용은 추후에 다뤄보도록 하겠다.

 

subscribe()의 첫 번째 람다는 onNext에 대한 핸들링 로직이고, 두 번째 람다는 onError 시그널에 대한 핸들링 로직이다. handle 메서드에서는 숫자 4를 받으면 RuntimeException을 시그널로 보낸다. 에러 시그널을 보내면 subscriber에서 에러 onError() 메서드가 호출된다. 에러 시그널을 받은 subscriber는 구독이 종료된다.

 

실행 결과:

 

Item: 1
Item: 2
Item: 3
Error message: 숫자가 왜 4지?

 

subscribe (4)

네 번째 subscribe()에서는 완료 신호에 대한 행동도 정의할 수 있다.

 

class NumberException: RuntimeException("숫자가 왜 10이지?")

val intFlux = Flux.range(1, 5)
    .handle<Int> { num, sink ->
        if (num == 10) sink.error(NumberException())
        else sink.next(num)
    }

intFlux.subscribe(
    { num ->
        println("Item: $num")
    },
    { error ->
        println("Error message: ${error.message}")
    },
    {
        println("subscription is completed")
    }
)

 

실행 결과:

 

Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
subscription is completed

 

subscribe (5)

다섯 번째 subscribe() 에서는 몇 개의 데이터를 구독할 것인지 또는 구독 취소를 할 것인지 정할 수 있다.

 

class NumberException: RuntimeException("숫자가 왜 1000이지?")

val intFlux = Flux.range(10, 50).log()
    .handle<Int> { num, sink ->
        if (num == 1000) sink.error(NumberException())
        else sink.next(num)
    }

intFlux.subscribe(
    { num ->
        println("Item: $num")
    },
    { error ->
        println("Error message: ${error.message}")
    },
    {
        println("subscription is completed")
    },
    { sub ->
        sub.request(5)
        // sub.cancel()
    }
)

 

여기서는 데이터가 50개의 Flux를 구독할 때 처음 5개만 요청하도록 subscribe() 메서드를 정의하였다. 결과는 당연히 5개 데이터만 방출된다. 여기서는5개 데이터를 받고 완료신호를 받지 않아서 구독이 종료되지는 않는다.

 

실행 결과:

 

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscriptionConditional)
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(10)
Item: 10
[ INFO] (main) | onNext(11)
Item: 11
[ INFO] (main) | onNext(12)
Item: 12
[ INFO] (main) | onNext(13)
Item: 13
[ INFO] (main) | onNext(14)
Item: 14

 

그럼 정확히 5개의 데이터를 받았는데 구독을 임의로 종료시킬 수는 없을까? 답은 있다. subscribe()의 리턴 값은 Disposable 인터페이스이기 때문에 subscribe() 메서드 뒤에 dispose() 메서드를 호출하면 된다. 그러면 원하는 데이터의 개수만큼 받고나면 바로 취소된다.

 

class NumberException: RuntimeException("숫자가 왜 1000이지?")

val intFlux = Flux.range(10, 50).log()
    .handle<Int> { num, sink ->
        if (num == 1000) sink.error(NumberException())
        else sink.next(num)
    }

intFlux.subscribe(
    { num ->
        println("Item: $num")
    },
    { error ->
        println("Error message: ${error.message}")
    },
    {
        println("subscription is completed")
    },
    { sub ->
        sub.request(5)
        // sub.cancel()
    }
).dispose()

 

실행 결과:

 

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscriptionConditional)
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(10)
Item: 10
[ INFO] (main) | onNext(11)
Item: 11
[ INFO] (main) | onNext(12)
Item: 12
[ INFO] (main) | onNext(13)
Item: 13
[ INFO] (main) | onNext(14)
Item: 14
[ INFO] (main) | cancel()

 

BaseSubscriber

람다식으로 subscriber를 구성하는 방법 외에 더 일반적인 기능을 사용할 수 있도록 Subscriber를 구성할 수도 있다. Reactor에서는 이러한 Subscriber를 구성하기 위해서 BaseSubscriber 클래스를 상속하는 클래스를 구현하면 된다.

 

그런데 주의할 점은 BaseSubscriber 인스턴스는 다른 Publisher를 구독한 경우에는 이미 구독 중인 Publisher의 구독을 취소하기 때문에 재활용이 불가능하다. 인스턴스에 여러 Publisher를 구독한다면 구독자의 onNext 메서드가 병렬로 호출되어야 하는데 이는 onNext 메서드가 병렬로 호출되지 않아야 한다는 Reactive Stream의 규칙을 위반하기 때문이다. 그래서 잘 사용하지는 않는 방식인 것 같다.

 

먼저 BaseSubscriber를 상속하는 클래스를 간단히 구현해보자.

 

class SampleSubscriber<T>(
    private val exitValue: T
): BaseSubscriber<T>() {

    override fun hookOnSubscribe(subscription: Subscription) {
        println("Subscribed")
        request(1)
    }

    override fun hookOnNext(value: T) {
        println(value)

        if (value != exitValue)
            request(1)
    }

    override fun hookOnError(throwable: Throwable) { }

    override fun hookOnCancel() { }

    override fun hookOnComplete() { }

    override fun hookFinally(type: SignalType) { }
}

 

보통 BaseSubscriber 를 커스텀 할 때 최소한 hookOnSubscribe() 메서드와 hookOnNext 메서드는 오버라이딩 하여 구현해줘야 한다.

 

hookOnSubscribe() 메서드는 처음 구독 후 몇개의 데이터를 요청할 것인지 request 이벤트를 보내는 메서드이다. HookOnNext() 메서드는 onNext 이벤트를 받아 데이터를 처리하는 기능을 한다. 여기서는 데이터 처리 로직을 구현하면 된다. 추가로, request 메서드로 몇개의 데이터를 더 요청하기 위해 request 메서드를 사용할 수도 있다. 예시에서는 exitValue를 받으면 더 이상 request를 하지 않도록 구현되어 있다.

 

실제로 SampleSubscribe로 구독하는 코드는 아래에 있다. 4의 값을 받으면 출력 후 더 이상 request를 받지 않는다.

 

val sampleSubscriber = SampleSubscriber<Int>(4)
val intFlux = Flux.range(1, 10)

intFlux.subscribe(sampleSubscriber)

 

실행 결과:

 

Subscribed
1
2
3
4

 

마무리

이번 장에서는 Mono와 Flux, 그리고 Publisher를 구독하는 subscribe() 메서드에 대해 알아보았다. 그 다음 장에는 Reactor (Reactive Programming)의 핵심인 Backpressure에 대하여 알아보자.

 

2022.05.01 - [[개발] Java & Kotlin/Reactor] - 3. Reactor Backpressure

 

3. Reactor Backpressure

이번 시간에는 Reactor에서 제공하는 Backpressure에 대하여 알아보겠다. Subscriber가 Publisher를 구독하면 Publisher가 데이터를 push하여 Subscriber에게 데이터를 전달한다. 이 때 Publisher는 Subscriber에..

beer1.tistory.com

 

728x90

'[개발] Java & Kotlin > Reactor' 카테고리의 다른 글

4. Reactor 프로그래밍 방식의 Flux, Mono 생성 (동기식)  (0) 2022.05.14
3. Reactor Backpressure  (0) 2022.05.01
1. Reactor 소개  (0) 2022.04.27

댓글