iOS Developer in search for meaning 🧘‍♂️

Learn Rx by Implementing Observable

June 06, 2017

There are several ways to learn a new technology:

  • Read about it
  • Play with it
  • Implement something similar

Most tutorials cover the first two. This article takes the third approach: we’ll build a working (if simplified) version of RxSwift from scratch.

Observable

The Observable interface is surprisingly minimal:

public protocol ObservableType {
    associatedtype E

    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E
}

One method: pass an Observer, get back a Disposable for unsubscribing.

Observer

An Observer handles events:

public protocol ObserverType {
    associatedtype E

    func on(event: Event<E>)
}

public enum Event<T> {
    case next(T)
    case error(Error)
    case completed
}

Disposable

Disposable has a single responsibility—cleanup. When dispose() is called, it releases resources (network connections, database handles, etc.):

public protocol Disposable {
    func dispose()
}

Building the types

Starting from the bottom:

public final class AnonymousDisposable: Disposable {
    private let disposeClosure: () -> Void

    public init(_ disposeClosure: @escaping () -> Void) {
        self.disposeClosure = disposeClosure
    }

    public func dispose() {
        disposeClosure()
    }
}

public final class CompositeDisposable: Disposable {
    private var isDisposed = false
    private var disposables: [Disposable] = []

    public init() {}

    public func add(disposable: Disposable) {
        if isDisposed {
            disposable.dispose()
            return
        }
        disposables.append(disposable)
    }

    public func dispose() {
        if isDisposed { return }
        disposables.forEach { $0.dispose() }
        isDisposed = true
    }
}

AnonymousDisposable wraps a cleanup closure. CompositeDisposable aggregates multiple disposables—when disposed, it disposes all children. Adding to an already-disposed composite immediately disposes the new item.

Observer wraps an event handler:

public final class Observer<E>: ObserverType {
    private let handler: (Event<E>) -> Void

    public init(_ handler: @escaping (Event<E>) -> Void) {
        self.handler = handler
    }

    public func on(event: Event<E>) {
        handler(event)
    }
}

Now for Observable:

public class Observable<Element>: ObservableType {
    public typealias E = Element
    private let subscribeHandler: (Observer<Element>) -> Disposable

    public init(_ subscriptionClosure: @escaping (Observer<Element>) -> Disposable) {
        self.subscribeHandler = subscriptionClosure
    }

    public func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E {
        let composite = CompositeDisposable()
        let subscription = subscribeHandler(Observer { event in
            observer.on(event: event)
            switch event {
            case .error, .completed:
                composite.dispose()
            default:
                break
            }
        })
        composite.add(disposable: subscription)
        return composite
    }
}

This works, but subscribe(observer:) is doing too much:

  • Managing disposables
  • Forwarding events
  • Checking disposal state before forwarding

Let’s extract this into a Sink—like a kitchen sink, it defines how the “stream” flows and handles cleanup:

final class Sink<O: ObserverType>: Disposable {
    private var disposed = false
    private let forward: O
    private let subscriptionHandler: (Observer<O.E>) -> Disposable
    private let composite = CompositeDisposable()

    init(forward: O, subscriptionHandler: @escaping (Observer<O.E>) -> Disposable) {
        self.forward = forward
        self.subscriptionHandler = subscriptionHandler
    }

    func run() {
        let observer = Observer<O.E>(forwardEvent)
        composite.add(disposable: subscriptionHandler(observer))
    }

    private func forwardEvent(_ event: Event<O.E>) {
        if disposed { return }
        forward.on(event: event)
        switch event {
        case .completed, .error:
            dispose()
        default:
            break
        }
    }

    func dispose() {
        disposed = true
        composite.dispose()
    }
}

Now Observable becomes clean:

public class Observable<Element>: ObservableType {
    public typealias E = Element
    private let subscribeHandler: (Observer<Element>) -> Disposable

    public init(_ subscriptionClosure: @escaping (Observer<Element>) -> Disposable) {
        subscribeHandler = subscriptionClosure
    }

    public func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E {
        let sink = Sink(forward: observer, subscriptionHandler: subscribeHandler)
        sink.run()
        return sink
    }
}

Let’s verify it works:

observable

Adding operators

The real power of Rx comes from operators. Let’s implement map:

extension ObservableType {
    public func map<U>(_ transform: @escaping (E) throws -> U) -> Observable<U> {
        return Observable<U> { observer in
            return self.subscribe(observer: Observer { event in
                switch event {
                case .next(let element):
                    do {
                        try observer.on(event: .next(transform(element)))
                    } catch {
                        observer.on(event: .error(error))
                    }
                case .error(let e):
                    observer.on(event: .error(e))
                case .completed:
                    observer.on(event: .completed)
                }
            })
        }
    }
}

map

Recap

Observable is a type you subscribe to by passing an Observer. The key insight: all the work happens in subscribe. You can chain and transform observables without triggering any side effects—nothing executes until subscription.

Conclusion

This implementation is naive and not production-ready. But hopefully it demystifies Rx—there’s no magic here, just closures and protocols.

Playground available here.

Further reading