Observable 클래스는 데이터의 변화가 발생하는 데이터 소스입니다.
Observable은 옵저버 패턴을 구현합니다. 옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록합니다. 그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려줍니다. 라이프 사이클은 존재하지 않으며 보통 단일 함수를 통해 변화만 알립니다.
Observable의 콜백
- onNext()
Observable이 데이터의 발행을 알립니다. 기존의 옵저버 패턴과 같습니다. - onComplete()
모든 데이터의 발행을 완료 했음을 알립니다. onComplete 이벤트는 단 한번만 발생하며. 발생한 후에는 더 이상 onNext 이벤트가 발생해선 안 됩니다. - onError()
Observable에서 어떤 이유로 에러가 발생했음을 알립니다. onError 이벤트가 발생하면 이후 에 onNext 및 onComplete 이벤트가 발생하지 않습니다. 즉, Observable의 실행을 종료합니다.
just() 함수
데이터를 발행하는 가장 쉬운 방법은 기존의 자료구조를 사용하는 것입니다. just() 함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성합니다. 실제 데이터의 발행은 subscribe() 함수를 호출해야 시작합니다. 1개의 값을 넣을 수도 있고 인자로 최대 10개를 넣을 수도 있습니다. 단 타입은 모두 같아야 합니다. just() 함수로 발행한 데이터는 자동으로 알림 이벤트가 발생합니다.
create() 함수
just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만 create() 함수는 onNext, onComplete, onError 같은 호출을 개발자가 해야 합니다. 그래서 create()는 라이브러리가 무언가를 해준다기보다 개발자가 무언가를 직접 하는 느낌이 강한 함수입니다.
Observable<Integer> source =
Observable.create((ObservableEmitter<Integer> emitter) -> {
emitter.onNext(100);
emitter.onNext(200);
emitter.onNext(300);
emitter.onCompleteO;
);
source 변수는 차가운 Observable입니다. 즉, 첫 번째 문장만으로는 실제로 데이터를 발행하지 않고 두 번째 문장에서 subscribe() 함수를 호출했을 때 100, 200, 300의 값을 발행합니다. subscribe() 함수를 호출하지 않으면 아무것도 출력되지 않는다는 뜻입니다.
Observable.create()를 사용할 때는 아래와 같은 사항을 주의해야 합니다.
- Observable이 구독 해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 합니다. 그렇지 않으면 잠재적으로 메모리 누수(memory leak)가 발생합니다.
- 구독자가 구독하는 동안에만 onNext와 onComplete 이벤트를 호출해야 합니다.
- 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야 합니다.
- 배압(back pressure)을 직접 처리해야 합니다.
fromArray() 함수
단일 데이터가 아닐 때는 fromXXX() 계열 함수를 사용합니다.
주의할 점은 primitive 타입의 배열을 그대로 넣으면 실제 값과 다른 값이 출력됩니다. 이런 경우 Primitive 타입을 Wrapper 타입으로 변환해서 사용해야 합니다.
Integer[] arr = {100, 200, 300};
Observable<Integer> source = Observable.fromArray(arr);
source.subscribe(System.out::println);
fromIterable() 함수
Observable을 만드는 다른 방법은 Iterable 인터페이스를 구현한 클래스에서 Observable 객체를 생성하는 것입니다. Iterable 인터페이스는 반복자를 반환합니다.
Iterator 인터페이스는 이터레이터 패턴을 구현한 것으로 다음에 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는 장점이 있습니다. 자바의 많은 컬렉션 클래스가 이 인터페이스를 구현합니다.
List와 Set, Queue로 작성할 수 있습니다.
List<String> names = new ArrayList();
names.add("Derry");
names.add( "William");
names.add("Bob");
Observable<String> source = Observable.fromlterable(names);
source.subscribe(System.out::println);
fromCallable() 함수
비동기 실행 후 결과를 반환하는 call() 메서드를 가지고 있는 Callable 인터페이스입니다.
Callable<String> callable = () -> {
Thread.sleep(1000);
return "Hello Callable";
};
Observable<String> source = Observable.fromCallable(callable);
source.subscribe(System.out :println);
fromFuture() 함수
보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환합니다. get() 메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때 까지 블로킹됩니다.
Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
Thread .sleep(1000);
return "Hello Future";
})
Observable<String> source = Observable.fromFuture(future);
source.subscribe(System.out :println);
Executors 클래스는 단일 스레드 실행자(SingleThreadExecutor)뿐만 아니라 다양 한 스레드풀(FixedThreadPool,CachedThreaPool)을 지원합니다. 하지만 RxJava는 위 와 같은 실행자를 활용하기 보다 RxJava에서 제공하는 스케줄러를 활용하도록 권장합니다.
fromPublisher() 함수
Publisher 객체는 Observable.create()와 마찬가지로 onNext()와 onComplete() 함수를 호출할 수 있습니다.
Publisherstring> publisher = (Subscriber<? super String> s) -> {
s.onNext("Hello Observable.fromPublisherO");
s.onComplete();
};
Observable<String> source = Observable.fromPublisher(publisher);
source.subscribe(System.out :println);
subscribe() 함수
RxJava는 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할수 있습니다. 이때 사용하는 것이 subscribe() 함수입니다. 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행합니다.
subscribe() 함수는 인자의 개수(오버로딩)에 따라서 다른 동작을 지원합니다.
- 인자가 없는 함수
onNext와 onComplete 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotlmplementedException를 던집니다(throw). 따라서 Observable로 작성한 코드를 테스트하거나 디버깅할 때 활용합니다. - 인자가 1개인 함수
onNext 이벤트를 처리합니다. 이때도 onError 이벤트가 발생하면 OnErrorNotlmplementedException을 던집니다. - 인자가 2개인 함수
onNext와 onError 이벤트를 처리합니다. - 인자가 3개인 함수
onNext, onError, onComplete 이벤트를 모두 처리할 수 있습니다.
Disposable
subscribe()로 데이터 처리가 끝난 뒤 subscribe() 함수는 모두 Disposable 인터페이스의 객체를 리턴합니다.
- dispose() 함수
Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수입니다. Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊습니다. 따라서 onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()를 호출할 필요가 없습나다. - isDisposed() 함수
함수는 이름에서 알 수 있는 것처럼 Observable이 데이터를 발행하지 않는지(구독을 해 지했는지) 확인하는 함수입니다.
[ 출처 : RxJava 프로그래밍 ]
'Domain > ReactiveX' 카테고리의 다른 글
[ReactiveX] ConnectableObservable (0) | 2021.04.29 |
---|---|
[ReactiveX] Subject (0) | 2021.04.29 |
[ReactiveX] Cold Observable Vs Hot Observable (0) | 2021.04.29 |
[ReactiveX] Maybe (0) | 2021.04.29 |
[ReactiveX] Single (0) | 2021.04.29 |
댓글