1. Reactive Streams란
Rxjava 버전이 1.x에서 2.x로 올라간 배경에는 Reactive Streams가 있습니다. Reactive Streams란 라이브러리나 프레임워크 상관없이 데이터 스트림을 비동기로 다룰수 있는 공통 매커니즘으로, 이 매커니즘을 편리하게 사용할 수 있는 인터페이스를 제공합니다.
즉, Reactive Streams는 인터페이스만 제공하고 구현은 각 라이브러리와 프레임워크에서 합니다.
Reactive Streams Specification for the JVM
2. Reactive Streams의 구성
Reactive Streams는 데이터를 만들어 통지하는 Publisher(생산자)
와 통지된 데이터를 받아 처리하는 Subscriber(소비자)
로 구성됩니다.
Subscriber
가 Publisher
를 구독(subscribe)
하면 Publisher
가 통지한 데이터를 Subscriber
가 받을 수 있습니다.
- Publisher : 데이터를 통지하는 생산자
- Subscriber: 데이터를 받아 처리하는 소비자
데이터 흐름
Publisher
는 통지 준비가 끝나면 이를Subscriber
에통지(onSubscribe)
합니다.- 해당 통지를 받은
Subscriber
는 받고자 하는 데이터 개수를 요청합니다.
이때, Subscriber
가 자신이 통지 받을 데이터 개수를 요청하지 않으면 Publisher
는 통지해야 할 데이터 개수 요청을 기다리게 되므로 통지를 시작 할 수 없습니다.
- 그 다음
Publisber
는 데이터를 만들어서Subscriber
에통지(onNext)
합니다. - 이 데이터를 받은
Subscriber
는 데이터를 사용해 처리 작업을 수행합니다. Publisher
는 요청받은 만큼의 데이터를 통지한 뒤Subscriber
로 부터 다음 요청이 올때까지 데이터 통지를 중단합니다.- 이후
Subscriber
가 처리 작업을 완료하면 다음에 받을 데이터 개수를Publisher
에게 요청합니다.
이 요청을 보내지 않으면 Publisher
는 요청 대기 상태가 돼 Subscriber
에 데이터를 통지할 수 없습니다.
Publisher
는Subscriber
에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료돼 정상 종료 됐다고통지(onComplete)
합니다.- 완료 통지를 하고나면
Publisher
는 이 구독건에 대해 어떤 통지도 하지 않습니다.
또한, Publisher
는 처리 도중에 에러가 발생하면 Subscriber
에 발생한 에러 객체와 함께 에러를 통지(onError)
합니다.
데이터 흐름 설명
Subscriber
가 Publisher
에 통지 받을 데이터 개수를 요청하는 것은 Publisher
가 통지하는 데이터 개수를 제어하기 위해서 입니다. 예를 들어, Publisher
와 Subscriber
의 처리가 각각 다른 스레드에서 진행되는데 Publisher
의 통지 속도가 빠르면 Subscriber
가 소화할 수 없을만큼 많은 처리 대기 데이터가 쌓입니다.
→ 이를 막기위해 Publisher
가 통지할 데이터 개수를 Subscriber
가 처리 할 수 있을 만큼으로 제어하는 수단이 필요합니다.
Reactive Streams가 제공하는 프로토콜
프로토콜 | 설명 |
---|---|
onSubscribe | 데이터 통지가 준비됐음을 통지 |
onNext | 데이터통지 |
onError | 에러(이상 종료)통지 |
onComplete | 완료(정상 종료)통지 |
Reactive Streams의 인터페이스
인터페이스 | 설명 |
---|---|
Publisher | 데이터를 생성하고 통지하는 인터페이스 |
Subscriber | 통지된 데이터를 전달받아 처리하는 인터페이스 |
Subscription | 데이터개수를 요청하고 구독해지 하는 인터페이스 |
Processor | Publisher와 Subscriber의 기능이 모두 있는 인터페이스 |
Publisher.java
1 | //데이터를 통지하는 생산자 |
Subscriber.java
1 |
|
Subscription.java
1 | // 생산자와 소비자를 연결하는 인터페이스 |
Processor.java
1 |
|
Subscription 보관
1 |
|
→ Subscription
처리가 Subscriber
외부에서는 호출되지 않는다는 것을 전제로 구현할 수 있습니다.
Rxjava처럼 외부에서 구독을 해지할 방법을 제공한다면 Subscription
이 비동기로 호출돼도 문제가 없게 구현되어야 합니다.
→ 추가로 Reactive Streams
에 Processor
라는 인터페이스가
있습니다. 이 Processor
는 Publisher와 Subscriber
모두 상속받아 데이터 통지와 수신이 가능합니다.
즉, Processor
는 다른 Publisher
를 구독
하거나 다른 Subscriber
가 자신을 구독
하게 할 수 있습니다.
인터페이스 기능 설명
Publisher
가 Subscriber
가 사용하는 Subscription
은 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할 때 사용하는 인터페이스입니다. Subscription
은 Publisher
에서 인스턴스가 생성돼 통지 준비가 끝났을 때 호출하는 onSubscribe
메서드의 인자로 Subscriber
에 전달됩니다. 이 Subscription
을 받은 Subscriber
는 Subscription
의 메서드를 호출해 데이터 개수를 요청하거나 구독을 해지합니다.
또한, onNext
메서드에서 이 Subscription
을 사용하려면 onSubscribe
메서드로 전달받은 Subscription
이 Subscriber내부
에 있어야합니다.
3. Reactive Streams의 규칙
Reactive Streams
는 인터페이스로 데이터를 통지하는 구조를 제공
합니다. 해당 구조가 제대로 작동하기 위해서는 Reactive Streams의 규칙을 따라야합니다.
3.1. Reactive Streams의 기본규칙
- 구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생합니다.
Reactive Streams에서 구독 시작 통지는 해당 구독에서 한 번만 수행됩니다. 따라서 Subscriber의 onSubscribe메서드는 구독을 시작해 통지준비가 끝났을때 한번만 실행됩니다.
단, 다음 작업은 추천하지 않지만 처리가 종료된 이후에는 같은 Publisher와 Subscriber로 subscribe 메서드를 호출하면 다시 onSubscribe 메서드가 실행됩니다.
이는 처리가 끝난뒤에 subscribe 메서드를 호출하면 새로운 구독을 시작한다고 생각합니다. 그러나 같은 인스턴스를 다시 사용해 subscribe 메서드를 호출할 때 Publisher나 Subscriber 내부의 관리 상태를 초기화하지 않으면 의도하지 않은 결과가 발생할 수 있습니다.
- 통지는 순차적으로 이루어집니다.
Reactive Streams에서는 데이터 통지가 순차적으로 이루어집니다. 즉, 여러 통지를 동시에 할 수 없으며 RxJava의 Observable 규약
이라는 규칙에 따른것으로, 데이터가 동시에 통지돼 불일치가 발생하는것을 막을 수 있습니다.
- null을 통지하지 않습니다.
Reactive Streams에서는 null을 통지 할 수 없습니다. 만약 null을 통지하려면 Reactive Streams에서 NullPoiuntException을 발생시킵니다. 이는 데이터를 통지할 때 뿐만 아니라 에러를 통지할때도 마찬가지로 진행됩니다.
null을 통지하지 않는다
라는 사양은 RxJava 1.x, 2.x 간 다른 사양이므로 주의해야합니다.
- Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료합니다.
Reactive Streams에서는 완료나 에러를 통지하면 Publisher가 처리를 끝 마친것으로 판단합니다.이는 완료나 에러 통지를 마친 구독은 더 이상 통지하지 않는다는 의미입니다.
예를 들면, 완료를 통지한 뒤에 에러가 발생했다면 이 에러는 통지하지 않으므로 정상적으로 종료되었다고 생각 할 수 있습니다.
3.2. 데이터 개수 요청이나 구독해지를 수행하는 Subscription의 규칙
- 데이터 개수 요청에 Long.MAX_VALUE를 설정하면 데이터 개수에 의한 통지 제한은 없어집니다.
Reactive Streams에서 Long.MAX_VALUE를 데이터 개수 요청으로 지정하면 통지할 데이터 개수의 제한이 없어집니다. 그러므로, 이 요청을 전송한 후에는 데이터 개수 요청을 보내지 않아도 데이터 통지를 계속해서 받을 수 있습니다.
요청받은 데이터 개수가 남은 상태에서 추가로 데이터 개수를 요청받으면 새로 요청받은 데이터 개수가 기존 데이터 개수에 추가된다는 점을 주의해야합니다.
즉, 데이터 개수 요청을 받을때 마다 기존 개수에 더해져 통지 가능한 데이터 개수가 증가합니다. 이 결과가 Long.MAX_VALUE에 도달하면 통지 가능한 데이터 개수 제한이 사라집니다.
- Subscription의 메서드는 동기화된 상태로 호출해야 합니다.
Subscription 메서드는 동기화된 상태로 호출해야합니다. 즉, Subscription의 메서드를 동시에 호출해서는 안됩니다.
RxJava를 사용할 때는 각 통지 메서드와 Subscription의 메서드를 호출할 때 동기화가 이뤄지므로 처리 자체가 스레드 안전(Thread safety)한지를 특히 신경 써야합니다. 잘못 될 경우 제대로 통지가 되지 않을 가능성이 있습니다.