observeOn(_ scheduler: ImmediateSchedulerType )

rxSwift의 scheduler는 stream 속의 연산들을 어느 스레드에서 동작하게 해 줄지 rxSwift에서 결정해주는 역할을 하고 있다.

observeOn() 오퍼레이터를 통해 결정해줄 수 있으며, 매개변수로 schedular를 입력받는다. observeOn() 오퍼레이터의 다음 연산부터 지정된 스레드로 연산을 수행한다.

@IBAction func exMap3() {
    Observable.just("800x600")
        .observeOn(ConcurrentDispatchQueueScheduler(qos: .default))    //ConcurrentDispatchQueueScheduler 동작
        .map { $0.replacingOccurrences(of: "x", with: "/") }    
        .map { "https://picsum.photos/\($0)/?random" }          
        .map { URL(string: $0) }                                
        .filter { $0 != nil }
        .map { $0! }                                            
        .map { try Data(contentsOf: $0) }                       
        .map { UIImage(data: $0) }                              
        .observeOn(MainScheduler.instance)    //MainScheduler 동작
        .subscribe(onNext: { image in
            self.imageView.image = image
        })
        .disposed(by: disposeBag)
}

url을 통해 이미지를 불러오는 함수다. 요청과 응답의 대기시간이 있기에, 스레드를 분리해서 작업해야 응답 대기시간 동안에 대기하는 동작을 막을 수 있다. 첫 번째 observeOn() 오퍼레이터를 통해서 concurrent 큐에서 요청 작업을 하게 된다. 응답에 대한 데이터를 UIImage 형태로 변환 후 이미지를 사용자에게 보여주기 위한 UI작업은 두 번째 observeOn( )을 통해 MainScheduler로 전환하여 작업하게 된다.


subscribeOn(_ scheduler: ImmediateSchedulerType )

observeOn() 오퍼레이터의 경우 Observable이 생성된 이후부터 적용할 수 있다. Observable이 생성되는 부분부터 특정 스레드를 지정하고 싶다면, subscribeOn() 오퍼레이터로 해결이 가능하다.

.subscribeOn(_ scheduler: ImmediateSchedulerType)
//Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
특정 스케줄러에서 구독 및 구독취소 로직을 실행하기 위해 소스 시퀀스(스트림)를 감쌉니다.

 

 

설명을 이해하기 전에 우선 Observable에 대해 알아두어야 할 점이 있다.

 

 

GitHub - ReactiveX/RxSwift: Reactive Programming in Swift

Reactive Programming in Swift. Contribute to ReactiveX/RxSwift development by creating an account on GitHub.

github.com

Observable just defines how the sequence is generated and what parameters are used for element generation. Sequence generation starts when subscribe method is called.

rx의 공식문서에 따르면 Observable은 subscribe()가 호출되는 순간에 생성된다. subscribe()가 호출되기 전까지는 스트림 생성에 관한 요소들을 정의하는 것일 뿐, 실제로 Observable은 생성된 상태가 아닌 것이다.

 

요약을 하자면 Observable의 생성은 subscribe()가 호출되는 순간이고, Observable이 어떤 스레드에서 생성될지를 결정해주는 것이 바로 subscribeOn(_ scheduler) 오퍼레이터라는 것이다. 

subscribeOn(_ scheduler) -> Observable을 생성할 스레드를 지정
ObserveOn(_ shcedular) -> Observable이 생성된 이후 스레드 변경

 

let observable = Observable<Int>.create { observer in
    observer.onNext(1)
    observer.onNext(2)
                                                
    print("[.create] in mainThread? \(Thread.isMainThread)")

    observer.onCompleted()
    return Disposables.create()
}
.observeOn(MainScheduler.instance)
.map{ next -> Int in
    print("[.map]",next, "in mainThread? \(Thread.isMainThread)")
    return next*2
}

print("[subscribe start]")    //구독시작
observable
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
    .subscribe(onNext: { event in
    print("[.subscribe] onNext \(event) in mainThread? \(Thread.isMainThread)")
}, onDisposed: {(
    print("[.disposed] in mainThread? \(Thread.isMainThread) \n---------------------------")
)})
.disposed(by: disposeBag)

//[subscribe start]
//[.map] 1 in mainThread? true
//[.create] in mainThread? false
//[.subscribe] onNext 2 in mainThread? true
//[.map] 2 in mainThread? true
//[.subscribe] onNext 4 in mainThread? true
//[.disposed] in mainThread? true 
//---------------------------
//[subscribe start]
//[.map] 1 in mainThread? true
//[.subscribe] onNext 2 in mainThread? true
//[.map] 2 in mainThread? true
//[.subscribe] onNext 4 in mainThread? true
//[.create] in mainThread? false
//[.disposed] in mainThread? true 
//---------------------------

두 번 수행한 출력 결과를 보면 알 수 있듯이 create 영역의 출력은 subscribe() 오퍼레이터가 호출된 이후에 등장하며, concurrent Queue에서 작업이 실행되기 때문에 mainThread 와의 출력 순서를 보장하지 않는다. 반면 ObserveOn() 오퍼레이터 이후에는 mainThread로 변경되어 동작하기에 출력 순서가 보장되는 모습을 확인할 수 있다.

 

subscribeOn()의 경우 다음 라인부터 영향을 주는 observeOn()과 달리 스트림 속 아무 라인에 적어놓아도 관계없이 적용되며, 여러 번 작성하더라도, 가장 첫 번째 subscribeOn()으로만 적용이 된다고 한다.

 

참고 영상 및 블로그

[유튜브] 곰튀김 rxswift 4시간에 끝내기 - 8    

[블로그] 이승연 님 글 [RxSwift] Scheduler 제대로 알아보기

'iOS > RxSwift' 카테고리의 다른 글

next, error, complete  (0) 2022.08.19
operator  (0) 2022.07.21
subscribe, dispose  (0) 2022.06.15
Observable stream  (0) 2022.06.08
ReactiveX  (0) 2022.05.30

+ Recent posts