Swift RxSwift

As explained by Sebastian Boldt

  • Code descriptive and testable

  • Multithreading easier

  • Cleaner code & architecture

  • Composable

  • Multi-Platform

everything in RxSwift is an observable sequence

let helloSequence = Observable.just("Hello Rx")
let fibonacciSequence = Observable.from([0,1,1,2,3,5,8])
let dictSequence = Observable.from([1:"Hello",2:"World"])
  • subscribe(on:(Event<T>)-> ()).

let helloSequence = Observable.of("Hello Rx")let subscription = helloSequence.subscribe { event in
  print(event)
}
  • sequences can emit zero or more events

  • .next(value: T)

  • .error(error: Error)

  • .completed

let helloSequence = Observable.from(["H","e","l","l","o"])let subscription = helloSequence.subscribe { event in
  switch event {
      case .next(let value):
          print(value)
      case .error(let error):
          print(error)
      case .completed:
          print("completed")
  }
}
// Creating a DisposeBag so subscribtion will be cancelled correctly
let bag = DisposeBag()

// Creating an Observable Sequence that emits a String value
let observable = Observable.just("Hello Rx!")

// Creating a subscription just for next events
let subscription = observable.subscribe (onNext:{ 
    print($0)
})
// Adding the Subscription to a Dispose Bag
subscription.addDisposableTo(bag

Subjects

Four types for subjects

  • PublishSubject: all events, after subscription.

  • BehaviourSubject: most recent event, after subscription.

  • ReplaySubject: n events, after subscription.

  • Variable: BehaviourSubject

let bag = DisposeBag()
var publishSubject = PublishSubject<String>()
publishSubject.onNext("Hello")
publishSubject.onNext("World")

let subscription1 = publishSubject.subscribe(onNext:{
  print($0)
}).addDisposableTo(bag)

// Subscription1 receives these 2 events, Subscription2 won't
publishSubject.onNext("Hello")
publishSubject.onNext("Again")

// Sub2 will not get "Hello" and "Again" because it susbcribed later
let subscription2 = publishSubject.subscribe(onNext:{
  print(#line,$0)
})

publishSubject.onNext("Both Subscriptions receive this message")

Operators

Map

Transform elements.

_images/operator_map.png

Observable<Int>.of(1,2,3,4).map { value in 
  return value * 10
}.subscribe(onNext:{
  print($0)
})

Flatmap

Transform elements.

_images/operator_flatmap.png

let sequence1  = Observable<Int>.of(1,2)
let sequence2  = Observable<Int>.of(1,2)let sequenceOfSequences = Observable.of(sequence1,sequence2)sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{
    print($0)
})

Scan

Aggregate values like reduce does.

_images/operator_scan.png

Observable.of(1,2,3,4,5).scan(0) { seed, value in
    return seed + value
}.subscribe(onNext:{
    print($0)
})

Buffer

_images/operator_buffer.png

SequenceThatEmitsWithDifferentIntervals
          .buffer(timeSpan: 150, count: 3, scheduler:s)
          .subscribe(onNext:{
    print($0)
})

Filter

Filter elements.

_images/operator_filter.png

Observable.of(2,30,22,5,60,1).filter{$0 > 10}.subscribe(onNext:{
      print($0)
})operator

DistinctUntilChanged

Skip steady values.

_images/operator_distinctUntilChanged.png

Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext:{
    print($0)
})

Combine

StartWith

_images/operator_startwith.png

Observable.of(2,3).startWith(1).subscribe(onNext:{
    print($0)
})

Merge

Combine the output of multiple observables.

_images/operator_merge.png

let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()Observable.of(publish1,publish2).merge().subscribe(onNext:{
    print($0)
})publish1.onNext(20)
publish1.onNext(40)
publish1.onNext(60)
publish2.onNext(1)
publish1.onNext(80)
publish2.onNext(2)
publish1.onNext(100)
})

Zip

_images/operator_zip.png

let a = Observable.of(1,2,3,4,5)
let b = Observable.of("a","b","c","d")Observable.zip(a,b){ return ($0,$1) }.subscribe {
    print($0)
}
  • Concat

  • CombineLatest

  • SwitchLatests

Schedulers

Use observerOn and subscribeOn to use certain queue.

  • MainScheduler: MainThread, UI work.

  • CurrentThreadScheduler: Default.

  • SerialDispatchScheduler

  • ConcurrentDispatchQuueScheduler Background jobs.

  • OperationQueueScheduler

let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()let concurrentScheduler = 

ConcurrentDispatchQueueScheduler(qos: .background)Observable.of(publish1,publish2)
          .observeOn(concurrentScheduler)
          .merge()
          .subscribeOn(MainScheduler())
          .subscribe(onNext:{
    print($0)
})

publish1.onNext(20)
publish1.onNext(40)