Reactive Programming에서는 Data Stream을 "Hot" 또는 "Cold"로 분류할 수 있습니다. 여기서 사용하는 Library에 따라 Observable(ReactiveX의 관점)과 Sequence(Reactor의 관점)로 용어가 나뉠 수 있습니다. 따라서 아래의 내용에서 Stream이라는 내용이 나온다면 Observable 또는 Sequence라고 이해하시면 될 것 같습니다.
Cold Observable/Sequence
발행자(Publisher)가 구독자(Subscriber)가 등록될 때마다 데이터 스트림을 처음부터 다시 시작하는 방식으로 동작합니다. 이 스트림은 데이터를 새롭게 다시 발행할 수 있으며, 새로운 구독자가 등록될 때마다 데이터의 처음부터 끝까지 모든 항목을 발행합니다. 예를 들어, 파일에서 데이터를 읽어들이는 스트림이나 네트워크에서 데이터를 수신하는 스트림 등이 있습니다. ReactiveX의 관점에서 Observable과 Flowable은 Cold Observable에 속하며 Reactor의 관점에서는 일반적은 Flux와 Mono가 여기에 속합니다.
Hot Observable/Sequence
발행자가 구독자가 등록되기 전부터 데이터를 발행하는 방식으로 동작합니다. 구독자가 등록되어도 처음부터 다시 시작하지 않습니다. 즉, 구독자가 현재 발행되고 있는 데이터 스트림의 중간부터 구독을 시작하게 됩니다. 예를 들어, 마우스 클릭, 키보드 입력 또는 센서 데이터와 같이 실시간으로 발생하는 데이터 스트림이 있습니다. ReactiveX의 관점에서 ConnectableObservable과 Subject은 Hot Observable에 속하며 Reactor의 관점에서는 Hot Sequence를 생성하기 위해서는 ConnectableFlux나 ConnectableMono를 사용하여 일반적인 Flux나 Mono를 Hot Sequence로 변환해야 합니다. ConnectableFlux나 ConnectableMono를 구독할 때까지 데이터를 발행하지 않으며, connect() 메서드를 호출하여 실제로 데이터를 발행합니다.
Result & Example
Hot Observable의 경우 구독자들이 동시에 구독을 하더라도 이전의 데이터를 그대로 받아들이게 됩니다. 이러한 스트림의 경우 여러 구독자들이 동시에 데이터를 공유하고, 새로운 구독자가 추가되어도 이전 데이터를 놓치지 않게 되는 장점이 있습니다. 반면에 Cold Observable은 새로운 구독자가 등록될 때마다 처음부터 데이터를 다시 발행하므로 자원을 더 많이 사용하게 됩니다. 따라서, 개발자는 데이터 스트림을 Hot 또는 Cold로 분류하고, 이에 따라 적합한 방식으로 리액티브 프로그래밍을 구현해야 합니다.
Cold Example
//Observable<ReactiveX>
Observable<Integer> coldObservable = Observable.range(0, 5);
coldObservable.subscribe(i -> System.out.println("Subscriber1 : " + i));
coldObservable.subscribe(i -> System.out.println("Subscriber2 : " + i));
//Sequence<Reactor>
Flux<Integer> coldFlux = Flux.range(0, 5);
coldFlux.subscribe(i -> System.out.println("Subscriber1: " + i));
coldFlux.subscribe(i -> System.out.println("Subscriber2: " + i));
/*
* Observable & Sequence Result
* Subscriber1: 0
* Subscriber1: 1
* Subscriber1: 2
* Subscriber1: 3
* Subscriber1: 4
* Subscriber2: 0
* Subscriber2: 1
* Subscriber2: 2
* Subscriber2: 3
* Subscriber2: 4
*/
Hot Example
//Observable<ReactiveX>
PublishSubject<Integer> hotObservable = PublishSubject.create();
hotObservable.subscribe(i -> System.out.println("Subscriber1: " + i));
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.subscribe(i -> System.out.println("Subscriber2: " + i));
hotObservable.onNext(3);
hotObservable.onNext(4);
//Sequence<Reactor>
EmitterProcessor<Integer> hotFlux = EmitterProcessor.create();
hotFlux.subscribe(i -> System.out.println("Subscriber1: " + i));
hotFlux.onNext(1);
hotFlux.onNext(2);
hotFlux.subscribe(i -> System.out.println("Subscriber2: " + i));
hotFlux.onNext(3);
hotFlux.onNext(4);
hotFlux.onComplete();
/*
* Observable & Sequence Result
* Subscriber1: 1
* Subscriber1: 2
* Subscriber1: 3
* Subscriber2: 3
* Subscriber1: 4
* Subscriber2: 4
*/
'Domain > Reactive Programming' 카테고리의 다른 글
ObserveOn(), SubscribeOn(), PublishOn() (0) | 2023.05.12 |
---|---|
Reactive Programming? (0) | 2023.05.12 |
댓글