ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Subject
    ReactiveX 2021. 5. 15. 06:54

    Subject는 Observable의 속성과 Observer의 속성을 모두 가지고 있다. 즉 데이터를 발행할 수도 있고, 데이터를 받아서 처리할 수도 있다. 

    AysncSubject

     

    AsyncSubject는 Observable로부터 발행된 마지막 값(만)을 발행하고 Observable의 동작이 완료된 후에야 동작한다.

    (만약, Observable이 아무 값도 발행하지 않으면 AsyncSubject 역시 아무 값도 발행하지 않는다.)

     

     

    AsyncSubject는 맨 마지막 값을 뒤 이어 오는 Observer에 전달하는데, 만약 Observable이 오류로 인해 종료될 경우 AsyncSubject는 아무 항목도 발행하지 않고 발생한 오류를 그대로 전달한다.

        fun main(){
            val observable = Observable.just(1,2,3,4,5,6,7,8,9,10)
            val asyncSubject = AsyncSubject.create<Int>()
            observable.subscribe(asyncSubject)
            asyncSubject.subscribe{
                println("result_1st : $it")
            }
            asyncSubject.subscribe{
                println("result_2st : $it")
            }
        }

    결과

      result_1st : 10 
      result_2st : 10  

    BehaviorSubject

     

    Observer가 BehaviorSubject를 구독하기 시작하면, Observer는 Observable이 가장 최근에 발행한 항목(또는 아직 아무 값도 발행되지 않았다면 맨 처음 값이나 기본 값)의 발행을 시작하며 그 이후 Observable(들)에 의해 발행된 항목들을 계속 발행한다.

     

    만약, Observable가 오류 때문에 종료되면 BehaviorSubject는 아무런 항목도 발행하지 않고 Observable에서 발생한 오류를 그대로 전달한다.

        fun main(){
            val behaviorSubject = BehaviorSubject.createDefault<String>("AA")
            behaviorSubject.subscribe {
                println("result_1st = $it")
            }
            behaviorSubject.onNext("BB")
            behaviorSubject.onNext("CC")
            behaviorSubject.subscribe {
                println("result_2st = $it")
            }
            behaviorSubject.onNext("DD")
            behaviorSubject.onComplete()
        }

    결과

      result_1st = AA 
      result_1st = BB 
      result_1st = CC 
      result_2st = CC 
      result_1st = DD 
      result_2st = DD

    PublishSubject

    PublishSubject는 구독 이후에 Observable(들)이 발행한 항목들만 Observer에게 전달한다.

    Observable이 배출하는 모든 항목들의 발행을 보장해야 한다면 Create을 사용해서 명시적으로 "차가운" Observable(항목들을 발행하기 전에 모든 Observer가 구독을 시작했는지 체크한다)을 생성하거나, PublishSubject 대신 ReplaySubject를 사용해야 한다.

        fun main(){
            val publishSubject = PublishSubject.create<String>()
            publishSubject.subscribe {
                println("result_1st = $it")
            }
            publishSubject.onNext("AA")
            publishSubject.onNext("BB")
            publishSubject.subscribe {
                println("result_2st = $it")
            }
            publishSubject.onNext("CC")
            publishSubject.subscribe {
                println("result_3st = $it")
            }
            publishSubject.onNext("DD")
            publishSubject.onComplete()
        }

    결과

      result_1st = AA 
      result_1st = BB 
      result_1st = CC 
      result_2st = CC 
      result_1st = DD 
      result_2st = DD 
      result_3st = DD 

    ReplaySubject

     

     

    ReplaySubject는 cold observable과 유사하게 Observer가 구독을 시작한 시점과 관계 없이 Observable(들)이 발행한 모든 항목들을 모든 Observer에게 발행한다. 

    만약, ReplaySubject을 옵저버로 사용할 경우, 멀티 스레드 환경에서는 (비순차적) 호출을 유발시키는 onNext(또는 그 외 on) 메서드를 사용하지 않도록 주의해야 한다.

     

    결과

      result_1st = AA 
      result_1st = BB 
      result_2st = AA 
      result_2st = BB 
      result_1st = CC 
      result_2st = CC 

    References

    • https://reactivex.io/
    • 유동환, 박정준, 리액티브 프로그래밍 기초부터 안드로이드까지 한번에, 한빛미디어 

    'ReactiveX' 카테고리의 다른 글

    RxLifecycle 라이브러리  (0) 2021.08.22
    RxPermissions, RxBinding 라이브러리  (0) 2021.08.17
    RxAndroid  (0) 2021.05.02
    [RxJava] Scheduler  (0) 2021.05.01
    [RxJava] Single, Maybe  (0) 2021.05.01
Designed by Tistory.