Domain/ReactiveX

[ReactiveX] Subject

by Donghwan 2021. 4. 29.

Subject 클래스는 차가운 Observable을 뜨거운 Observable로 바꿔준다고 소개했습니 다. Subject 클래스의 특성은 Observable의 속성과 구독자의 속성이 모두 있다는 점입니다. Observable처럼 데이터를 발행할 수도 있고 구독자처럼 발행된 데이터를 바로 처리할 수도 있습니다.

Rxjava에서 제공하는 주요 Subject 클래스에는 AsyncSubject,BehaviorSubject, PublishSubject, ReplaySubject 등이 있습니다.

 

AsyncSubject 클래스

AsyncSubject 클래스는 Observable에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스입니다. 완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시합니다.

완료되기 전까지는 구독자에게 데이터를 전달하지 않다가 완료됨과 동시에 구독자에게 마지막 데이터를 발행하고 종료합니다.

onNext로 데이터를 전달하더라도 onComplete 이전엔 절대 데이터를 발행하지 않습니다.

또, Subject 클래스가 Observable을 상속하고 동시에 Observer 인터페이스를 구현하기 때문에 Subject 클래스는 구독자로도 동작할 수 있습니다.

Float[] temperature = {10.1f, 13.4f, 12.5f};
Observable<Float> source = Observable.fromArray(temperature);
AsyncSubject<Float> subject = AsyncSubject.create();
subject.subscribe(data -> System.out.printlnC'Subscriber #1 => " + data));
source.subscribe(subject);
AsyncSubject<Integer> subject = AsyncSubject.create();
subject.onNext(10);
subject.onNext(1) ;
subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
subject.onNext(12);
subject.onComplete();
subject.onNext(13); // onComplete 이후에는 onNext 등의 함수가 무시됩니다.
subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
subject.subscribe(data -> System.out.println("Subscriber #3 => " + data));

//result
//Subscriber #1 => 12
//Subscriber #2 => 12
//Subscriber #3 => 12

 

BehaviorSubject 클래스

BehaviorSubject는 구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스입니다.

BehaviorSubject 클래스는 AsyncSubject 클래스와는 다르게 createDefault() 힘수로 생 성합니다. 구독자가 subscribe() 함수를 호출했을 때 그전까지 발행한 값이 없다면 기본값을 대신 발행해야 하기 때문입니다.

AsyncSubject와 다르게 구독 시점부터 최근값 또는 기본값 데이터를 발행합니다. onComplete가 되면 발행을 중지합니다.

BehaviorSubject<String> subject = BehaviorSubject.createDefault("6");
subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
subject.onNext("5");
subject.onComplete();

//result
//Subscriber #1 => 6
//Subscriber #1 => 1
//Subscriber #1 => 3
//Subscriber #2 => 3
//Subscriber #1 => 5
//Subscriber #2 => 5

 

PublishSubject 클래스

가장 평범한 Subject 클래스입니다. 구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작합니다. AsyncSubject 클래스처럼 마지막 값만 발행하거나 BehaviorSubject 클래스처럼 발행한 값이 없을 때 기본값을 대신 발행하지도 않습니다. 오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달받습니다.

쉽게 말해서 구독을 한 다음 onNext에 의해 데이터를 발행합니다.

PublishSubject 클래스는 별도의 기본값을 제공하지 않으므로 AsyncSubject 클래스처럼 create() 함수를 호출해 생성합니다.

PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(data ->System.out.println("Subscriber #1 => " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data ->System.out.println("Subscriber #2 => " + data));
subject.onNext("5") ;
subject.onComplete();

//result
//Subscriber #1 => 1
//Subscriber #1 => 3
//Subscriber #1 => 5
//Subscriber #2 => 5

 

ReplaySubject 클래스

마지막으로 소개할 Subject 클래스는 가장 특이하고 사용할 때 주의해야 하는 ReplaySubject 클래스입니다. Subject 클래스의 목적은 뜨거운 Observable을 활용하는 것인데 차가운 Observable처럼 동작하기 때문입니다.

ReplaySubject 클래스는 구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해줍니다. 마치 테이프로 전체 내용을 녹음해두었다가 새로운 사람이 들어오면 정해진 음악을 처음부터 들려주는 것과 같습니다.

그러므로 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 합니다.

ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
subject.onNext("5");
subject.onComplete();

//result
//Subscriber #1 => 1
//Subscriber #1 => 3
//Subscriber #2 => 1
//Subscriber #2 => 3
//Subscriber #1 => 5
//Subscriber #2 => 5

 

[ 출처 : RxJava 프로그래밍 ]

728x90
반응형

'Domain > ReactiveX' 카테고리의 다른 글

[ReactiveX] SubscribeOn vs ObserveOn  (0) 2021.04.29
[ReactiveX] ConnectableObservable  (0) 2021.04.29
[ReactiveX] Cold Observable Vs Hot Observable  (0) 2021.04.29
[ReactiveX] Maybe  (0) 2021.04.29
[ReactiveX] Single  (0) 2021.04.29

댓글