iOS Developer in search for meaning 🧘‍♂️

Learn Rx by implementing Observable

June 06, 2017

From my perspective, there are several ways to learn new technology if you are a software developer:

  • Read about it

  • Play with it

  • Hack or implement something similar

There are a lot of examples around the web that illustrate the first two but very few that shows how to implement it.

So in this article, we are going to implement to some extend naїv but working version of RxSwift

Observable

According to the documentation the interface of Observable is pretty simple:

public protocol ObservableType {
    associatedtype E

    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E
}

It has a method that takes an Observer and returns a Disposable which you can use later on to unsubscribe

Observer

Observer is a type that that handles events.

public protocol ObserverType {
    associatedtype E

    func on(event: Event<E>)
}

public enum Event<T> {
    case next(T)
    case error(Error)
    case completed
}

Disposable

Disposable is an even simpler type which has only one method — dispose() and responsibility for it is to clean up any resources that had been used during the subscription e.g network call or DB connection

public protocol Disposable {
    func dispose()
}

Let’s start from the bottom and implement these types

public final class AnonimousDisposable: Disposable {
    private let disposeClosure: () -> Void

    public init(_ disposeClosure: @escaping () -> Void) {
        self.disposeClosure = disposeClosure
    }

    public func dispose() {
        disposeClosure()
    }
}

public final class CompositeDisposable: Disposable {
    private var isDisposed: Bool = false
    private var disposables: [Disposable] = []

    public init() {}

    public func add(disposable: Disposable) {
        if isDisposed {
            disposable.dispose()
            return
        }
        disposables.append(disposable)
    }

    public func dispose() {
        if isDisposed { return }
        disposables.forEach {
            $0.dispose()
        }
        isDisposed = true
    }
}

AnonymousDisposable  —  provides closure that will be called on dispose

CompositeDisposable — is basically a container for disposables that will call dispose on each of them when it is disposed. And any attempt to add Disposable to already disposed CompositeDisposable will immediately dispose it

Observer is initialised with an eventHandler closure which is going to be called when a new Event arrives

public final class Observer<E>: ObserverType {
    private let handler: (Event<E>) -> Void

    public init(_ handler: @escaping (Event<E>) -> Void) {
        self.handler = handler
    }

    public func on(event: Event<E>) {
        handler(event)
    }
}

And the last but not least Observable

public class Observable<Element>: ObservableType {
    public typealias E = Element
    private let subscribeHandler: (Observer<Element>) -> Disposable

    public init(_ subscribtionClosure: @escaping (Observer<Element>) -> Disposable) {
        self.subscribeHandler = subscribtionClosure
    }

    public func subscribe<O : ObserverType>(observer: O) -> Disposable where O.E == E {
        let composite = CompositeDisposable()
        let subscription = subscribeHandler(Observer { (event) in
            observer.on(event: event)
            switch event {
            case .error, .completed:
                composite.dispose()
            default:
                break
            }
        })
        composite.add(disposable: subscription)

        return composite
    }
}

As you can see there are lot of work done in the subscribe(observer:) method which is out of responsibilities of the Observable:

  • managing disposables

  • forwarding events to outer observer,

  • We also need to check whether subscription was disposed before forwarding an event

So let’s encapsulate it in the helper type which we going to call Sink. As a sink from the kitchen, it’s will define behavior of the “stream” and dispose it when needed

final class Sink<O: ObserverType>: Disposable {
    private var _disposed: Bool = false
    private let _forward: O
    private let _subscriptionHandler: (Observer<O.E>) -> Disposable
    private let _composite = CompositeDisposable()

    init(forvard: O, subscriptionHandler: @escaping (Observer<O.E>) -> Disposable) {
        _forward = forvard
        _subscriptionHandler = subscriptionHandler
    }

    func run() {
        let observer = Observer<O.E>(handler: forward)
        _composite.add(disposable: _subscriptionHandler(observer))
    }

    private func forward(event: Event<O.E>) {
        if _disposed {
            return
        }
        _forward.on(event: event)
        switch event {
        case .completed, .error:
            self.dispose()
        default:
            break
        }
    }

    func dispose() {
        _disposed = true
        _composite.dispose()
    }
}

Now we can refactor our Observable implementation by using the Sink

public class Observable<Element>: ObservableType {
    public typealias E = Element
    private let _subscribeHandler: (Observer<Element>) -> Disposable

    public init(_ subscribtionClosure: @escaping (Observer<Element>) -> Disposable) {
        _subscribeHandler = subscribtionClosure
    }

    public func subscribe<O : ObserverType>(observer: O) -> Disposable where O.E == E {
        let sink = Sink(forvard: observer, subscribtionHandler: _subscribeHandler)
        sink.run()
        return sink
    }
}

Let’s check if it works…

observable

Well this is cool! But the power of Rx is also in the operators… So lets implement the map operator

The same as the map for array map in Rx transforms each element of the sequence by applying transform function to it and returns new sequence

extension ObservableType {
    public func map<U>(_ transform: @escaping (E) throws -> U) -> Observable<U> {
        return Observable<U> { observer in
            return self.subscribe(observer: Observer { (event) in
                switch event {
                case .next(let element):
                    do {
                        try observer.on(event: .next(transform(element)))
                    } catch {
                        observer.on(event: .error(error))
                    }
                case .error(let e):
                    observer.on(event: .error(e))
                case .completed:
                    observer.on(event: .completed)
                }
            })
        }
    }
}

map

To recap :

Observable  — is a type to which you can pass an Observer by calling the subscribe(observer:) method . And as you saw all magic happens in the subscribe method. This is what make Rx so powerful — that you can transform sequence and pass instances of Observable without worrying that something gonna happened.

Conclusion:

Sure this is naive and not ready for production implementation. But I really hope that it shows that there is no magic behind Rx!

Playground is available here

Where to go from here:


Serg Dort

Written by Serg Dort, who works and lives in London builds useful things. You can follow him on Twitter