June 06, 2017
From my perspective, there are several ways to learn new technology if you are a software developer:
Read about it
Play with it
Hack or implement something similar
There are a lot of examples around the web that illustrate the first two but very few that shows how to implement it.
So in this article, we are going to implement to some extend naїv but working version of RxSwift
According to the documentation the interface of Observable
is pretty simple:
public protocol ObservableType {
associatedtype E
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E
}
It has a method that takes an Observer and returns a Disposable which you can use later on to unsubscribe
Observer is a type that that handles events.
public protocol ObserverType {
associatedtype E
func on(event: Event<E>)
}
public enum Event<T> {
case next(T)
case error(Error)
case completed
}
Disposable is an even simpler type which has only one method — dispose()
and responsibility for it is to clean up any resources that had been used during the subscription e.g network call or DB connection
public protocol Disposable {
func dispose()
}
Let’s start from the bottom and implement these types
public final class AnonimousDisposable: Disposable {
private let disposeClosure: () -> Void
public init(_ disposeClosure: @escaping () -> Void) {
self.disposeClosure = disposeClosure
}
public func dispose() {
disposeClosure()
}
}
public final class CompositeDisposable: Disposable {
private var isDisposed: Bool = false
private var disposables: [Disposable] = []
public init() {}
public func add(disposable: Disposable) {
if isDisposed {
disposable.dispose()
return
}
disposables.append(disposable)
}
public func dispose() {
if isDisposed { return }
disposables.forEach {
$0.dispose()
}
isDisposed = true
}
}
AnonymousDisposable — provides closure that will be called on dispose
CompositeDisposable — is basically a container for disposables that will call dispose on each of them when it is disposed. And any attempt to add Disposable to already disposed CompositeDisposable will immediately dispose it
Observer is initialised with an eventHandler closure which is going to be called when a new Event arrives
public final class Observer<E>: ObserverType {
private let handler: (Event<E>) -> Void
public init(_ handler: @escaping (Event<E>) -> Void) {
self.handler = handler
}
public func on(event: Event<E>) {
handler(event)
}
}
And the last but not least Observable
public class Observable<Element>: ObservableType {
public typealias E = Element
private let subscribeHandler: (Observer<Element>) -> Disposable
public init(_ subscribtionClosure: @escaping (Observer<Element>) -> Disposable) {
self.subscribeHandler = subscribtionClosure
}
public func subscribe<O : ObserverType>(observer: O) -> Disposable where O.E == E {
let composite = CompositeDisposable()
let subscription = subscribeHandler(Observer { (event) in
observer.on(event: event)
switch event {
case .error, .completed:
composite.dispose()
default:
break
}
})
composite.add(disposable: subscription)
return composite
}
}
As you can see there are lot of work done in the subscribe(observer:)
method which is out of responsibilities of the Observable:
managing disposables
forwarding events to outer observer,
We also need to check whether subscription was disposed before forwarding an event
So let’s encapsulate it in the helper type which we going to call Sink. As a sink from the kitchen, it’s will define behavior of the “stream” and dispose it when needed
final class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _subscriptionHandler: (Observer<O.E>) -> Disposable
private let _composite = CompositeDisposable()
init(forvard: O, subscriptionHandler: @escaping (Observer<O.E>) -> Disposable) {
_forward = forvard
_subscriptionHandler = subscriptionHandler
}
func run() {
let observer = Observer<O.E>(handler: forward)
_composite.add(disposable: _subscriptionHandler(observer))
}
private func forward(event: Event<O.E>) {
if _disposed {
return
}
_forward.on(event: event)
switch event {
case .completed, .error:
self.dispose()
default:
break
}
}
func dispose() {
_disposed = true
_composite.dispose()
}
}
Now we can refactor our Observable implementation by using the Sink
public class Observable<Element>: ObservableType {
public typealias E = Element
private let _subscribeHandler: (Observer<Element>) -> Disposable
public init(_ subscribtionClosure: @escaping (Observer<Element>) -> Disposable) {
_subscribeHandler = subscribtionClosure
}
public func subscribe<O : ObserverType>(observer: O) -> Disposable where O.E == E {
let sink = Sink(forvard: observer, subscribtionHandler: _subscribeHandler)
sink.run()
return sink
}
}
Let’s check if it works…
Well this is cool! But the power of Rx is also in the operators… So lets implement the map
operator
The same as the map
for array map
in Rx transforms each element of the sequence by applying transform
function to it and returns new sequence
extension ObservableType {
public func map<U>(_ transform: @escaping (E) throws -> U) -> Observable<U> {
return Observable<U> { observer in
return self.subscribe(observer: Observer { (event) in
switch event {
case .next(let element):
do {
try observer.on(event: .next(transform(element)))
} catch {
observer.on(event: .error(error))
}
case .error(let e):
observer.on(event: .error(e))
case .completed:
observer.on(event: .completed)
}
})
}
}
}
Observable — is a type to which you can pass an Observer by calling the subscribe(observer:)
method . And as you saw all magic happens in the subscribe method. This is what make Rx so powerful — that you can transform sequence and pass instances of Observable without worrying that something gonna happened.
Sure this is naive and not ready for production implementation. But I really hope that it shows that there is no magic behind Rx!
Playground is available here
Written by Serg Dort, who works and lives in London builds useful things. You can follow him on Twitter