02.Reactive Streams

1. Reactive Streams란

Rxjava 버전이 1.x에서 2.x로 올라간 배경에는 Reactive Streams가 있습니다. Reactive Streams란 라이브러리나 프레임워크 상관없이 데이터 스트림을 비동기로 다룰수 있는 공통 매커니즘으로, 이 매커니즘을 편리하게 사용할 수 있는 인터페이스를 제공합니다.

Reactive Streams

즉, Reactive Streams는 인터페이스만 제공하고 구현은 각 라이브러리와 프레임워크에서 합니다.

Reactive Streams Specification for the JVM

2. Reactive Streams의 구성

Reactive Streams는 데이터를 만들어 통지하는 Publisher(생산자)와 통지된 데이터를 받아 처리하는 Subscriber(소비자)로 구성됩니다.

SubscriberPublisher구독(subscribe)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있습니다.

  • Publisher : 데이터를 통지하는 생산자
  • Subscriber: 데이터를 받아 처리하는 소비자

데이터 흐름

  • Publisher는 통지 준비가 끝나면 이를 Subscriber통지(onSubscribe)합니다.
  • 해당 통지를 받은 Subscriber는 받고자 하는 데이터 개수를 요청합니다.

이때, Subscriber가 자신이 통지 받을 데이터 개수를 요청하지 않으면 Publisher는 통지해야 할 데이터 개수 요청을 기다리게 되므로 통지를 시작 할 수 없습니다.

  • 그 다음 Publisber는 데이터를 만들어서 Subscriber통지(onNext)합니다.
  • 이 데이터를 받은 Subscriber는 데이터를 사용해 처리 작업을 수행합니다.
  • Publisher는 요청받은 만큼의 데이터를 통지한 뒤 Subscriber로 부터 다음 요청이 올때까지 데이터 통지를 중단합니다.
  • 이후 Subscriber가 처리 작업을 완료하면 다음에 받을 데이터 개수를 Publisher에게 요청합니다.

이 요청을 보내지 않으면 Publisher는 요청 대기 상태가 돼 Subscriber에 데이터를 통지할 수 없습니다.

  • PublisherSubscriber에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료돼 정상 종료 됐다고 통지(onComplete)합니다.
  • 완료 통지를 하고나면 Publisher는 이 구독건에 대해 어떤 통지도 하지 않습니다.

또한, Publisher는 처리 도중에 에러가 발생하면 Subscriber에 발생한 에러 객체와 함께 에러를 통지(onError)합니다.

데이터 흐름 설명

SubscriberPublisher에 통지 받을 데이터 개수를 요청하는 것은 Publisher가 통지하는 데이터 개수를 제어하기 위해서 입니다. 예를 들어, PublisherSubscriber의 처리가 각각 다른 스레드에서 진행되는데 Publisher의 통지 속도가 빠르면 Subscriber가 소화할 수 없을만큼 많은 처리 대기 데이터가 쌓입니다.

→ 이를 막기위해 Publisher가 통지할 데이터 개수를 Subscriber가 처리 할 수 있을 만큼으로 제어하는 수단이 필요합니다.

Reactive Streams가 제공하는 프로토콜

프로토콜 설명
onSubscribe 데이터 통지가 준비됐음을 통지
onNext 데이터통지
onError 에러(이상 종료)통지
onComplete 완료(정상 종료)통지

Reactive Streams의 인터페이스

인터페이스 설명
Publisher 데이터를 생성하고 통지하는 인터페이스
Subscriber 통지된 데이터를 전달받아 처리하는 인터페이스
Subscription 데이터개수를 요청하고 구독해지 하는 인터페이스
Processor Publisher와 Subscriber의 기능이 모두 있는 인터페이스

Publisher.java

1
2
3
4
5
//데이터를 통지하는 생산자
public interface Publisher<T> {
// 데이터를 받는 Subscriber 등록
public void subscribe(Subscriber<? super T> subscriber);
}

Subscriber.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import org.reactivestreams.Subscription;

// 데이터를 받아 처리 하는 소비자
public interface Subscriber<T> {
// 구독 시작시 처리
public void onSubscribe(Subscription subscription);

// 데이터 통지시 처리
public void onNext(T item);

// 에러 통지시 처리
public void onError(Throwable error);

// 완료 통지시 처리
public void onComplete();

}

Subscription.java

1
2
3
4
5
6
7
8
9
// 생산자와 소비자를 연결하는 인터페이스
public interface Subscription {

// 통지 받을 데이터 개수 요청
public void request(long num);

// 구독 해지
public void cancel();
}

Processor.java

1
2

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

Subscription 보관

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

import org.reactivestreams.Subscription;

public class MainClass implements Subscriber{

// subscriber내부에 subscription보관하기
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 받은 Subscription을 Subscriber 내부에 보관한다
this.subscription = subscription;
this.subscription.request(num);
}

@Override
public void onNext(Object item) {
// 요청한 데이터를 처리하면 다음 데이터 개수를 요청한다.
subscription.request(num);
}

@Override
public void onError(Throwable error) {
}

@Override
public void onComplete() {
}
}

Subscription처리가 Subscriber 외부에서는 호출되지 않는다는 것을 전제로 구현할 수 있습니다.

Rxjava처럼 외부에서 구독을 해지할 방법을 제공한다면 Subscription이 비동기로 호출돼도 문제가 없게 구현되어야 합니다.

→ 추가로 Reactive StreamsProcessor라는 인터페이스가 있습니다. 이 ProcessorPublisher와 Subscriber 모두 상속받아 데이터 통지와 수신이 가능합니다.

즉, Processor는 다른 Publisher구독하거나 다른 Subscriber가 자신을 구독하게 할 수 있습니다.

인터페이스 기능 설명

PublisherSubscriber가 사용하는 Subscription은 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할 때 사용하는 인터페이스입니다. SubscriptionPublisher에서 인스턴스가 생성돼 통지 준비가 끝났을 때 호출하는 onSubscribe 메서드의 인자로 Subscriber에 전달됩니다. 이 Subscription을 받은 SubscriberSubscription의 메서드를 호출해 데이터 개수를 요청하거나 구독을 해지합니다.

또한, onNext 메서드에서 이 Subscription을 사용하려면 onSubscribe 메서드로 전달받은 SubscriptionSubscriber내부에 있어야합니다.

3. Reactive Streams의 규칙

Reactive Streams인터페이스로 데이터를 통지하는 구조를 제공합니다. 해당 구조가 제대로 작동하기 위해서는 Reactive Streams의 규칙을 따라야합니다.

3.1. Reactive Streams의 기본규칙

  1. 구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생합니다.

Reactive Streams에서 구독 시작 통지는 해당 구독에서 한 번만 수행됩니다. 따라서 Subscriber의 onSubscribe메서드는 구독을 시작해 통지준비가 끝났을때 한번만 실행됩니다.
단, 다음 작업은 추천하지 않지만 처리가 종료된 이후에는 같은 Publisher와 Subscriber로 subscribe 메서드를 호출하면 다시 onSubscribe 메서드가 실행됩니다.
이는 처리가 끝난뒤에 subscribe 메서드를 호출하면 새로운 구독을 시작한다고 생각합니다. 그러나 같은 인스턴스를 다시 사용해 subscribe 메서드를 호출할 때 Publisher나 Subscriber 내부의 관리 상태를 초기화하지 않으면 의도하지 않은 결과가 발생할 수 있습니다.

  1. 통지는 순차적으로 이루어집니다.

Reactive Streams에서는 데이터 통지가 순차적으로 이루어집니다. 즉, 여러 통지를 동시에 할 수 없으며 RxJava의 Observable 규약 이라는 규칙에 따른것으로, 데이터가 동시에 통지돼 불일치가 발생하는것을 막을 수 있습니다.

  1. null을 통지하지 않습니다.
    Reactive Streams에서는 null을 통지 할 수 없습니다. 만약 null을 통지하려면 Reactive Streams에서 NullPoiuntException을 발생시킵니다. 이는 데이터를 통지할 때 뿐만 아니라 에러를 통지할때도 마찬가지로 진행됩니다.
    null을 통지하지 않는다 라는 사양은 RxJava 1.x, 2.x 간 다른 사양이므로 주의해야합니다.
  1. Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료합니다.

Reactive Streams에서는 완료나 에러를 통지하면 Publisher가 처리를 끝 마친것으로 판단합니다.이는 완료나 에러 통지를 마친 구독은 더 이상 통지하지 않는다는 의미입니다.
예를 들면, 완료를 통지한 뒤에 에러가 발생했다면 이 에러는 통지하지 않으므로 정상적으로 종료되었다고 생각 할 수 있습니다.

3.2. 데이터 개수 요청이나 구독해지를 수행하는 Subscription의 규칙

  1. 데이터 개수 요청에 Long.MAX_VALUE를 설정하면 데이터 개수에 의한 통지 제한은 없어집니다.
    Reactive Streams에서 Long.MAX_VALUE를 데이터 개수 요청으로 지정하면 통지할 데이터 개수의 제한이 없어집니다. 그러므로, 이 요청을 전송한 후에는 데이터 개수 요청을 보내지 않아도 데이터 통지를 계속해서 받을 수 있습니다.
    요청받은 데이터 개수가 남은 상태에서 추가로 데이터 개수를 요청받으면 새로 요청받은 데이터 개수가 기존 데이터 개수에 추가된다는 점을 주의해야합니다.
    즉, 데이터 개수 요청을 받을때 마다 기존 개수에 더해져 통지 가능한 데이터 개수가 증가합니다. 이 결과가 Long.MAX_VALUE에 도달하면 통지 가능한 데이터 개수 제한이 사라집니다.
  1. Subscription의 메서드는 동기화된 상태로 호출해야 합니다.
    Subscription 메서드는 동기화된 상태로 호출해야합니다. 즉, Subscription의 메서드를 동시에 호출해서는 안됩니다.

RxJava를 사용할 때는 각 통지 메서드와 Subscription의 메서드를 호출할 때 동기화가 이뤄지므로 처리 자체가 스레드 안전(Thread safety)한지를 특히 신경 써야합니다. 잘못 될 경우 제대로 통지가 되지 않을 가능성이 있습니다.

4. Reference

https://tech.kakao.com/2018/05/29/reactor-programming/