본문 바로가기
[개발] Java & Kotlin/Reactor

3. Reactor Backpressure

by 비어원 2022. 5. 1.
728x90

이번 시간에는 Reactor에서 제공하는 Backpressure에 대하여 알아보겠다.


Subscriber가 Publisher를 구독하면 Publisher가 데이터를 push하여 Subscriber에게 데이터를 전달한다. 이 때 Publisher는 Subscriber에게 onNext 시그널을 보내 Subscriber에게 데이터를 전달한다는 것을 알린다. 또는 Publisher는 Subscriber에게 onComplete 시그널을 보내 완료가 되었음을 알리거나 onError 시그널을 보내 에러가 발생했음을 알린다. 그런데 Reactor에서는 반대로 Subscriber가 Publisher에게 어떠한 신호를 보내게 할 수 있는데 이를 Backpressure 이라고 한다.



기본 Backpressure: Unbounded request

첫 번째 요청은 구독 시점에서 최종 Subscriber로부터 온다. 요청을 하는 가장 간단한 방법은 즉시 구독 시점에서 unbounded request, 즉 request(Long.MAX_VALUE) 를 트리거하는 방법이다. Reactor에서는 다음과 같은 방식으로 unbounded request를 요청할 수 있다.

  • subscribe() 호출 (Consumer<? super Subscription> 람다식을 따로 정의하지 않으면 unbounded request)
  • block(), blockFirst(), blockLast() 호출
  • toIterable() (Iterable 객체로 변형) 또는 toStream() (Reactive stream을 일반적인 Stream으로 변형)

Backpressure 커스터마이징

그렇다면 unbounded request를 보내지 않고 요청을 커스터마이징 하는 방법은 어떤 것들이 있을까? 가장 간단한 방법은 BaseSubscriber를 상속하는 Subscriber를 직접 구현하여 hookOnSubscribe 메서드를 오버라이딩 하면 된다.


Flux.range(1, 10)
    .doOnRequest { println("request of $it") }
    .subscribe(
        object : BaseSubscriber<Int>() {

            // 구독 시점에서 요청 개수를 정의한다.
            override fun hookOnSubscribe(subscription: Subscription) {
                request(1)
            }

             // Publisher에게 데이터를 받을 때에 대한 핸들링 로직, 데이터를 받으면 취소한다.        
            override fun hookOnNext(value: Int) {
                println("Cancelling after having received $value")
                cancel()
            }
        }
    )

위의 예제에서는 구독 시점에 하나의 데이터를 요청하고, 데이터를 푸쉬 받는 시점에 hookOnNext 메서드가 호출되어 데이터를 받으면 구독을 취소하도록 구현하였다.


또 다른 예제를 보여주자면, 1부터 10까지의 무작위 데이터를 무한히 Push 해주는 Flux를 하나 만든다. Flux.generate 메서드를 통해 구현하였다. (generate() 공식 문서)


그리고 Publisher로부터 받은 데이터가 10이 될 때 까지 요청을 받고 데이터가 10이면 구독을 취소하는 subscriber를 구현해보았다. 이런 식으로 요구사항에 맞게 구독 시점을 커스터마이징 하는 것이 가능하다.


val randomInfiniteFlux = Flux.generate<Int> { sink ->
    sink.next(Random.nextInt(1, 11))
}.doOnRequest { println("request of $it") }

randomInfiniteFlux.subscribe(
    object : BaseSubscriber<Int>() {
        override fun hookOnSubscribe(subscription: Subscription) {
            request(1)
        }

        override fun hookOnNext(value: Int) {
            println("data is $value, if data is 10 then cancel")
            if (value == 10) {
                println("cancel!")
                cancel()
            }
            else {
                println("request")
                request(1)
            }
        }
    }
)

실행 결과 : (Random이라 다를 수도 있음, 10을 받을 때 종료만 되면 장땡)

request of 1
data is 1, if data is 10 then cancel
request
request of 1
data is 1, if data is 10 then cancel
request
request of 1
data is 5, if data is 10 then cancel
request
request of 1
data is 9, if data is 10 then cancel
request
request of 1
data is 10, if data is 10 then cancel
cancel!

Buffer 사용하기

buffer(N) 은 upstream애서 방출된 데이터가 N개 모이면 downstream으로 방출하게 하는 operator이다. 만약, upstream에서 N개의 데이터를 방출하지 못하고 complete 된다면 방출한 만큼의 데이터를 downstream으로 보내게 된다. buffer(N) 가 upstream에서 받은 데이터를 버퍼 단위로 방출하기 때문에 리턴 타입은 Mono<List<T>> 또는 Flux<List<T>> 이다.


이번에는 1부터 40까지의 정수를 push하는 Publisher를 만들고, 이 Publisher에게 buffer(10)을 달아보자.


val upstream = Flux.range(1, 40)

upstream.buffer(10)
    .subscribe(
        object: BaseSubscriber<List<Int>>() {
            override fun hookOnSubscribe(subscription: Subscription) {
                println("request 2")
                request(2)
            }

            override fun hookOnNext(value: List<Int>) {
                println("value: $value")
                request(2)
            }

            override fun hookOnComplete() {
                println("Completed!")
            }
        }
    )

실행 결과:

request 2
value: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
value: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
value: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
value: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

Subscriber가 구독하고 Publisher가 Subscriber에게 데이터를 전달하는 과정을 순차적으로 설명하면 다음과 같다.

  1. 맨 먼저 Subscriber가 request(2)를 Publisher에게 요청하면 buffer(10)에서 upstream에게 request(20)을 요청한다.
  2. 그 후 upstream에서는 요청을 받고 데이터를 하나하나 Push한다.
  3. buffer(10)은 upstream에게 받은 데이터를 subscriber에게 바로 보내지 않고 데이터를 10개까지 모을 때 까지 기다린다.
  4. 데이터가 10개가 되었다면 subscriber(downstream)으로 전달한다.
  5. Publisher가 40개의 데이터를 모두 전달하였다면 onComplete 신호를 보낸다.
  6. onComplete 신호는 buffer(10)을 통과하여 subscriber에게 전달되고 구독이 완료된다.

지금은 buffer size가 10이고 upstream이 40개의 데이터를 전송하여서 buffer size와 딱 맞지만, 만약 버퍼가 모두 채워지지 않고 upstream에게 onComplete 시그널을 받으면 buffer()는 어떻게 데이터를 처리할까? 궁금하면 upstream의 range값을 조절해보면 된다. 결론부터 말하면 buffer()는 upstream에게 onComplete 시그널을 받으면 버퍼에 남은 데이터를 모두 downstream으로 보내고 onComplete를 전달한다.


val upstream = Flux.range(1, 38) // 40에서 38로 수정
...

실행 결과:

request 2
value: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
value: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
value: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
value: [31, 32, 33, 34, 35, 36, 37, 38]
Completed!

만약 버퍼가 쌓이는 도중 upstream에서 에러가 발생한다면 버퍼에 남아있는 데이터는 어떻게 될까? 정답은 남아있는 데이터를 downstream에 전달하지 않고 onError를 전달한다. 예제 코드는 handle 메서드를 정의하여 데이터가 15일 경우 RuntimeException을 방출하도록 코드를 정의하였다.


val upstream = Flux.range(1, 38)
    .handle<Int> { data, sink ->
        if (data == 15) sink.error(RuntimeException("숫자가 15네"))
        else sink.next(data)
    }

upstream.buffer(10)
    .subscribe(
        object: BaseSubscriber<List<Int>>() {
            override fun hookOnSubscribe(subscription: Subscription) {
                println("request 2")
                request(2)
            }

            override fun hookOnNext(value: List<Int>) {
                println("value: $value")
                request(2)
            }

            override fun hookOnComplete() {
                println("Completed!")
            }

            override fun hookOnError(throwable: Throwable) {
                println("Error!")
            }
        }
    )

실행 결과:

request 2
value: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Error!

Prefetch

Reactor에서는 prefetch 파라미터를 이용하여 downstream에서 upstream에게 request를 보내 downstream이 요구한 데이터를 upstream이 미리 방출하도록 시킬 수 있다. prefetch는 Flux.publishOn(scheduler, prefetch) 메서드를 아용하면 설정할 수 있다. publishOn에서 설정하지 않는다면 기본적으로 MathMax(16, 256)으로 설정될 것이다. (코드 까보면 그렇게 나와있음)


prefetch는 Publisher에서 요청받을 데이터를 선 반영하는 전략이며, 이는 Replenishing Optimization 을 구현한다. operator가 prefetch의 75%정도를 수행했다면, prefetch의 75%정도를 다시 upstream에게 미리 요청하는 전략이다. 이게 어떤 의미인지 자세히 알기 위한 간단한 예제코드를 보자.


여기서는 쓰레드에 대한 로그도 찍으면 좋을 것 같아서 slf4j logger 디펜던시도 추가하였다.


build.gradle.kts

dependencies {
    implementation("org.slf4j:slf4j-api:1.7.25")
    implementation("org.slf4j:slf4j-simple:1.7.25")
}

비교 1) prefetch 미적용

먼저 아래 코드는 prefetch를 하지 않는 publisher-subscriber 에 대한 예시인데, upstream에서 데이터를 방출하는 데 걸리는 시간은 1000ms 라고 가정하자. 그리고 subscriber에서는 데이터를 처리하고 request를 하는 시간까지 걸리는 시간을 500ms라고 하면 request 사이의 간격은 총 1500ms 정도가 될 것이다.


var i = 1
val upstream = Flux.generate<Int> {
    Thread.sleep(1000L)
    it.next(i++)
}.log()

upstream.subscribe(
    object: BaseSubscriber<Int>() {
        val logger = LoggerFactory.getLogger(javaClass)
        var time: Long = 0
        override fun hookOnSubscribe(subscription: Subscription) {
            time = System.currentTimeMillis()
            request(10)
        }

        override fun hookOnNext(value: Int) {
            val fetchTime = System.currentTimeMillis() - time
            logger.info("value: $value, fetchTime: $fetchTime")
            time = System.currentTimeMillis()

            Thread.sleep(500L)
        }
    }
)

실행 결과:

[main] INFO reactor.Flux.Generate.1 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[main] INFO reactor.Flux.Generate.1 - | request(10)
[main] INFO reactor.Flux.Generate.1 - | onNext(1)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 1, fetchTime: 1005
[main] INFO reactor.Flux.Generate.1 - | onNext(2)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 2, fetchTime: 1510
[main] INFO reactor.Flux.Generate.1 - | onNext(3)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 3, fetchTime: 1504
[main] INFO reactor.Flux.Generate.1 - | onNext(4)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 4, fetchTime: 1502
[main] INFO reactor.Flux.Generate.1 - | onNext(5)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 5, fetchTime: 1510
[main] INFO reactor.Flux.Generate.1 - | onNext(6)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 6, fetchTime: 1505
[main] INFO reactor.Flux.Generate.1 - | onNext(7)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 7, fetchTime: 1506
[main] INFO reactor.Flux.Generate.1 - | onNext(8)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 8, fetchTime: 1506
[main] INFO reactor.Flux.Generate.1 - | onNext(9)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 9, fetchTime: 1502
[main] INFO reactor.Flux.Generate.1 - | onNext(10)
[main] INFO src.tutorial.MainKt$noPrefetch$1 - value: 10, fetchTime: 1503

Prefetch를 적용할 경우 Publisher와 Subscriber의 시퀀스를 그림으로 표현하면 다음과 같다.



실행 결과를 보면 첫 데이터를 받는 데 걸리는 시간은 약 1000ms이지만 두 번째 부터는 Publisher가 데이터를 푸쉬하는 데 1000ms, Subscriber가 데이터를 받고 데이터를 조작하는 데 걸리는 시간(sleep)인 500ms를 합산한 1500ms가 걸린다.


Publisher, Subscriber가 모두 main thread에서 작동하기 때문에 hookOnSubscribe가 끝나고 데이터 generation이 진행된다. (sleep 영향)


비교 2) prefetch 적용

여기서 만약 prefetch를 적용시키면 subscriber가 데이터를 처리하는 동안에도 미리 publisher에게 request 신호를 전달하기 때문에 더 빨리 데이터를 받을 수 있다. prefetch 과정을 지켜보기 위해 Publisher에 log()를 추가해보자.


var i = 1
val upstream = Flux.generate<Int> {
    Thread.sleep(1000L)
    it.next(i++)
}.log()


val downstream = upstream.publishOn(Schedulers.elastic(), 4).log()

downstream.subscribe(
    object: BaseSubscriber<Int>() {
        val logger = LoggerFactory.getLogger(javaClass)
        var time: Long = 0
        override fun hookOnSubscribe(subscription: Subscription) {
            time = System.currentTimeMillis()
            request(10)
        }

        override fun hookOnNext(value: Int) {
            val fetchTime = System.currentTimeMillis() - time
            logger.info("value: $value, fetchTime: $fetchTime")
            time = System.currentTimeMillis()

            Thread.sleep(500L)
        }
    }
)

Thread.sleep(100000)
  • 맨 아레에 Thread.sleep()을 걸어주는 이유는 publishOn()에서 Schedulers를 elastic으로 지정해서 메인 쓰레드와 다른 쓰레드에서 subscriber가 실행되기 때문에 메인쓰레드가 종료되지 않게 하기 위해 추가했다.

실행결과를 보면 조금 의아한 것을 발견할 수 있다.

[main] INFO reactor.Flux.Generate.1 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.PublishOn.2 - | request(10)
[main] INFO reactor.Flux.Generate.1 - | request(4)
[main] INFO reactor.Flux.Generate.1 - | onNext(1)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(1)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 1, fetchTime: 1007
[main] INFO reactor.Flux.Generate.1 - | onNext(2)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(2)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 2, fetchTime: 1005
[main] INFO reactor.Flux.Generate.1 - | onNext(3)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(3)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 3, fetchTime: 1000
[elastic-2] INFO reactor.Flux.Generate.1 - | request(3)
[main] INFO reactor.Flux.Generate.1 - | onNext(4)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(4)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 4, fetchTime: 1006
[main] INFO reactor.Flux.Generate.1 - | onNext(5)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(5)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 5, fetchTime: 1003
[main] INFO reactor.Flux.Generate.1 - | onNext(6)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(6)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 6, fetchTime: 1002
[elastic-2] INFO reactor.Flux.Generate.1 - | request(3)
[main] INFO reactor.Flux.Generate.1 - | onNext(7)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(7)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 7, fetchTime: 1001
[main] INFO reactor.Flux.Generate.1 - | onNext(8)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(8)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 8, fetchTime: 1003
[main] INFO reactor.Flux.Generate.1 - | onNext(9)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(9)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 9, fetchTime: 1005
[elastic-2] INFO reactor.Flux.Generate.1 - | request(3)
[main] INFO reactor.Flux.Generate.1 - | onNext(10)
[elastic-2] INFO reactor.Flux.PublishOn.2 - | onNext(10)
[elastic-2] INFO src.tutorial.MainKt$prefetch$1 - value: 10, fetchTime: 1002
[main] INFO reactor.Flux.Generate.1 - | onNext(11)
[main] INFO reactor.Flux.Generate.1 - | onNext(12)
[main] INFO reactor.Flux.Generate.1 - | onNext(13)

일단 hookOnSubscribe() 에서 request(10)을 걸어줬지만 실제로는 request(4)가 요청이 되었다. publishOn(scheduler, prefetch) 를 지정하면 우리가 지정한 request(N) 이 씹히고 publishOn 단계에서 알아서 request(prefetch)를 걸어준다. 그리고 로그에 찍힌 쓰레드를 보면 PublishOn, Subscriber는 모두 elastic 쓰레드고, 실제 데이터를 만드는 쓰레드는 main 쓰레드임을 알 수 있다. (비동기로 작동) 이는 publishOn 메서드에서 Scheduler를 등록했기 때문이다.


Prefetch를 적용할 경우 Publisher와 Subscriber의 시퀀스를 그림으로 표현하면 다음과 같다.



처음에는 prefetch만큼의 request를 호출하고, upstream은 다른 쓰레드에서 데이터를 prefetch 크기만큼 만들어 보내준다. publishOn은 upstream에서 prefetch의 75% 만큼의 onNext가 수행되면 알아서 request(prefetch * 0.75) 를 요청한다. 데이터를 받는 Subscriber 부분과 데이터를 만드는 Publisher 부분이 서로 다른 쓰레드에서 구동 중이라서 prefetch를 할 때 더 빨리 데이터를 받을 수 있다. (사실 내부적으로는 prefetch라기 보다 비동기 태스크라서 더 빨리 걸리는 것이라고 생각이 든다. 비동기가 중요한 것!)


마무리

이번 시간에는 Subscriber가 Publisher에게 요청하는 방법인 backpressure에 대하여 알아보았다. 다음 장에는 Mono와 Flux를 정의할 때 팩토리 메서드가 아닌 직접 프로그래밍으로 만들어보는 방법을 소개하겠다. 실제로 이때까지 Flux.handle()이나 Flux.generate() 메서드를 예제로 사용하긴 하였지만 설명은 하지 않았다. 다음 장에서는 이와 같은 메서드에 대하여 설명하는 시간을 가져 볼 예정이다.

728x90

댓글