There are several ways to learn a new technology:
- Read about it
- Play with it
- Implement something similar
Most tutorials cover the first two. This article takes the third approach: we’ll build a working (if simplified) version of RxSwift from scratch.
Observable
The Observable interface is surprisingly minimal:
public protocol ObservableType {
associatedtype E
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E
}One method: pass an Observer, get back a Disposable for unsubscribing.
Observer
An Observer handles events:
public protocol ObserverType {
associatedtype E
func on(event: Event<E>)
}
public enum Event<T> {
case next(T)
case error(Error)
case completed
}Disposable
Disposable has a single responsibility—cleanup. When dispose() is called, it releases resources (network connections, database handles, etc.):
public protocol Disposable {
func dispose()
}Building the types
Starting from the bottom:
public final class AnonymousDisposable: Disposable {
private let disposeClosure: () -> Void
public init(_ disposeClosure: @escaping () -> Void) {
self.disposeClosure = disposeClosure
}
public func dispose() {
disposeClosure()
}
}
public final class CompositeDisposable: Disposable {
private var isDisposed = false
private var disposables: [Disposable] = []
public init() {}
public func add(disposable: Disposable) {
if isDisposed {
disposable.dispose()
return
}
disposables.append(disposable)
}
public func dispose() {
if isDisposed { return }
disposables.forEach { $0.dispose() }
isDisposed = true
}
}AnonymousDisposable wraps a cleanup closure. CompositeDisposable aggregates multiple disposables—when disposed, it disposes all children. Adding to an already-disposed composite immediately disposes the new item.
Observer wraps an event handler:
public final class Observer<E>: ObserverType {
private let handler: (Event<E>) -> Void
public init(_ handler: @escaping (Event<E>) -> Void) {
self.handler = handler
}
public func on(event: Event<E>) {
handler(event)
}
}Now for Observable:
public class Observable<Element>: ObservableType {
public typealias E = Element
private let subscribeHandler: (Observer<Element>) -> Disposable
public init(_ subscriptionClosure: @escaping (Observer<Element>) -> Disposable) {
self.subscribeHandler = subscriptionClosure
}
public func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E {
let composite = CompositeDisposable()
let subscription = subscribeHandler(Observer { event in
observer.on(event: event)
switch event {
case .error, .completed:
composite.dispose()
default:
break
}
})
composite.add(disposable: subscription)
return composite
}
}This works, but subscribe(observer:) is doing too much:
- Managing disposables
- Forwarding events
- Checking disposal state before forwarding
Let’s extract this into a Sink—like a kitchen sink, it defines how the “stream” flows and handles cleanup:
final class Sink<O: ObserverType>: Disposable {
private var disposed = false
private let forward: O
private let subscriptionHandler: (Observer<O.E>) -> Disposable
private let composite = CompositeDisposable()
init(forward: O, subscriptionHandler: @escaping (Observer<O.E>) -> Disposable) {
self.forward = forward
self.subscriptionHandler = subscriptionHandler
}
func run() {
let observer = Observer<O.E>(forwardEvent)
composite.add(disposable: subscriptionHandler(observer))
}
private func forwardEvent(_ event: Event<O.E>) {
if disposed { return }
forward.on(event: event)
switch event {
case .completed, .error:
dispose()
default:
break
}
}
func dispose() {
disposed = true
composite.dispose()
}
}Now Observable becomes clean:
public class Observable<Element>: ObservableType {
public typealias E = Element
private let subscribeHandler: (Observer<Element>) -> Disposable
public init(_ subscriptionClosure: @escaping (Observer<Element>) -> Disposable) {
subscribeHandler = subscriptionClosure
}
public func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E {
let sink = Sink(forward: observer, subscriptionHandler: subscribeHandler)
sink.run()
return sink
}
}Let’s verify it works:

Adding operators
The real power of Rx comes from operators. Let’s implement map:
extension ObservableType {
public func map<U>(_ transform: @escaping (E) throws -> U) -> Observable<U> {
return Observable<U> { observer in
return self.subscribe(observer: Observer { event in
switch event {
case .next(let element):
do {
try observer.on(event: .next(transform(element)))
} catch {
observer.on(event: .error(error))
}
case .error(let e):
observer.on(event: .error(e))
case .completed:
observer.on(event: .completed)
}
})
}
}
}
Recap
Observable is a type you subscribe to by passing an Observer. The key insight: all the work happens in subscribe. You can chain and transform observables without triggering any side effects—nothing executes until subscription.
Conclusion
This implementation is naive and not production-ready. But hopefully it demystifies Rx—there’s no magic here, just closures and protocols.
Playground available here.