Daniel Tull: Blog

Combine Latest Collection

Sunday, 04 August 2019

In this article, I will explain how I made a publisher that takes an array of publishers and combines them into an array output.

The code for the resulting publisher is on GitLab and is up-to-date with Xcode 11 beta 5.

Prelude

During a workshop run by Antoine van der Lee at Swift Island this year, we were tasked with using Combine to add checks to a simple four page form. Each page progressively added more complexities. He has put the workshop on GitHub.

On the second page, we were tasked with making the continue button only be enabled when a series of three switches were all turned on.

I set up three IBOutlets, one for each of the switches and used two publishers to combine their values and update the state of the button.

The first is a custom publisher by Antoine which provides the control when it receives a new event. I use UIControl.Event.valueChanged to be alerted to any changes each switch’s value.

The second is Apple’s own CombineLatest3 which takes three publishers and provides their values in a tuple as its output.

let publisher1 = switch1.publisher(for: .valueChanged)
let publisher2 = switch2.publisher(for: .valueChanged)
let publisher3 = switch3.publisher(for: .valueChanged)

cancellable = Publishers
    .CombineLatest3(publisher1, publisher2, publisher3)
    .map { $0.0.isOn && $0.1.isOn && $0.2.isOn }
    .assign(to: \.isEnabled, on: nextButton)

This code shows:

For this scenario I thought it would be ideal if I could have a single outlet collection to all the switches and map over them to create an array of publishers, one for each switch, combining these to one single publisher and map the array to a boolean by checking if all the switches are on.

However, the Combine framework doesn’t have a publisher which can combine the latest value from an array of source publishers.

The reason Apple’s trio of CombineLatest publishers work in this way is so that you can combine publishers of different output types together and the type information can be carried into the output tuple. In this example though, each publisher’s output is the same, a UISwitch.

In our case the array contains the switch publishers and the transform returns true if all values in the array are true.

Fortunately for us, Apple have taken cues from other parts of the Swift Standard Library and designed Combine to be fully extendable. Publisher is a protocol which we can implement and our custom publishers will be able to interact with all the other features of the Combine framework, including gaining all of the standard Publisher methods when they apply.

Understanding what kind of Publisher is needed

I find it’s often good to write the calling code first, assuming the underlying publisher is already written, as this provides guidance on what we expect the publisher to provide. It also allows us to experiment with names and make sure that the result is actually what we desire before taking the far more complicated step of actually writing the publisher!

cancellable = switches
    .map { $0.publisher(for: .valueChanged) }
    .combineLatest
    .map { $0.allSatisfy { $0.isOn } }
    .assign(to: \.isEnabled, on: nextButton)

Here we see each step:

Note: It may seem quite strange that we take an array of switches, turn them into an array of publishers only to get a publisher with an output of an array of switches. However it is important to realise that what were are creating is a publisher of the array of switches not the switches themselves. This allows us to create a chain for when they change, mutating and massaging the output to our desired value of a boolean for the isEnabled property of our button.

Creating the Publisher

public struct CombineLatestCollection<Publishers>
    where
    Publishers: Collection,
    Publishers.Element: Publisher
{
    public typealias Output = [Publishers.Element.Output]
    public typealias Failure = Publishers.Element.Failure

    private let publishers: Publishers
    public init(_ publishers: Publishers) {
        self.publishers = publishers
    }
}

Our first step is creating the publisher which will be generic over a collection whose elements are a publisher, which I have called Publishers. The publisher itself won’t do any logic, it will just be a container for our array of publishers.

We also have two typealiases:

What should be noted is that all the publishers need to be of the same type, which they would need to be to be contained in an Array anyway, but it’s still worth thinking about. If your publishers came from different sources, there are still options to changing them to be the same type. The first would be to erase their complex type with eraseToAnyPublisher() which will give you an AnyPublisher<Output, Failure>. If either the output or failures are different, you can map and mapError to adjust those types.

Create the convenience property

extension Collection where Element: Publisher {

    public var combineLatest: CombineLatestCollection<Self> {
        CombineLatestCollection(self)
    }
}

The convenience properties will allow us to chain together functions and will make the code more readable as it saves us having to read a set of initializers from the inside out and rather allows us to read a logic set of calls one after the other.

Conforming to the Publisher protocol

So now we have a publisher and a convenient way to create it from an array of publishers, we haven’t actually made CombineLatestCollection conform to the Publisher protocol. Without this, we can’t actually use this as a publisher at all!

public protocol Publisher {

    /// The kind of values published by this publisher.
    associatedtype Output

    /// The kind of errors this publisher might publish.
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    associatedtype Failure : Error

    /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
    ///
    /// - SeeAlso: `subscribe(_:)`
    /// - Parameters:
    ///     - subscriber: The subscriber to attach to this `Publisher`.
    ///                   once attached it can begin to receive values.
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

Looking at the generated interface for Publisher, we can see that we need to implement one method and to define the Output and Failure.

extension CombineLatestCollection: Publisher {

    public func receive<S>(subscriber: S)
        where
        S: Subscriber,
        S.Failure == Failure,
        S.Input == Output
    {
        let subscription = Subscription(publishers: publishers,
                                        subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }
}

The implementation of the receive method needs to provide the subscriber with a subscription that will.

This may seem like we are delaying the writing of the actual combine latest logic until the bitter end; You would be correct!

Creating the Subscription

extension CombineLatestCollection {

    public final class Subscription<Subscriber>: Combine.Subscription
        where
        Subscriber: Combine.Subscriber,
        Subscriber.Failure == Failure,
        Subscriber.Input == Output
    {

        private let subscribers: [AnyCancellable]

        fileprivate init(publishers: Publishers, subscriber: Subscriber) {

            var values: [Publishers.Element.Output?] = Array(repeating: nil, count: publishers.count)
            var completions = 0
            var hasCompleted = false
            var lock = pthread_mutex_t()

            subscribers = publishers.enumerated().map { index, publisher in

                publisher
                    .sink(receiveCompletion: { completion in

                        pthread_mutex_lock(&lock)
                        defer { pthread_mutex_unlock(&lock) }

                        guard case .finished = completion else {
                            // One failure in any of the publishers cause a
                            // failure for this subscription.
                            subscriber.receive(completion: completion)
                            hasCompleted = true
                            return
                        }

                        completions += 1

                        if completions == publishers.count {
                            subscriber.receive(completion: completion)
                            hasCompleted = true
                        }

                    }, receiveValue: { value in

                        pthread_mutex_lock(&lock)
                        defer { pthread_mutex_unlock(&lock) }

                        guard !hasCompleted else { return }

                        values[index] = value

                        // Get non-optional array of values and make sure we
                        // have a full array of values.
                        let current = values.compactMap { $0 }
                        if current.count == publishers.count {
                            _ = subscriber.receive(current)
                        }
                    })
            }
        }

        public func request(_ demand: Subscribers.Demand) {}

        public func cancel() {
            subscribers.forEach { $0.cancel() }
        }
    }
}

Note: I nest the Subscription type inside the CombineLatestCollection publisher we have made for a couple of reasons. Inside CombineLatestCollection, it allows us to reference the subscription with a short hand “Subscription” but importantly, it gains all of the generic parameters and typealiases from CombineLatestCollection itself, so we don’t need to redefine Publishers, Output, Value and Failure. Not having this duplication allows easier modifications and removes any doubt about which Output you may be talking about if Output were defined on the Subscription implementation too.

This subscription has three things to do: send value changes, determine completion and handle cancellation.

Sending Value Changes

The first is to notify the subscriber of changes to the value. Our output is an array of the outputs of each publisher, so we create an array of those outputs and set a value of nil for each publisher. Only once the array has values from each publisher does this start outputting updates.

Enumerating through the publishers and subscribing to each allows the receiveValue block to set the new value in the values array by the publisher’s index. This way the index of the value will match the publisher.

I use compactMap to get an array of non-optional values, which is the output we want. If this array has the same number of elements as the publishers count, we know that we have values for each publisher and can send the result.

Determining Completion

Completion happens in this case when any one publisher fails or when all publishers succeed. Because we don’t want to send values after the completion, I have a hasCompleted boolean to set to true the subscription has sent the completion to the subscriber. Now I can use this as a guard when receiving values.

I keep track of how many successful completions I have with an integer completions count. Because publishers only ever send one completion you can be sure than once this reaches the number of publishers, all publishers have completed.

Handling Cancellation

If the subscription is cancelled, we need to cancel each of the subscriptions to the publishers we have set up. This is done simply by storing each of those in an array and cancelling each one when receive a call to cancel.

A note on mutex locking

I initially used a DispatchQueue to receive values on, but this gave the problem that CombineLatestCollection would receive on that queue, so would need a receive on the main queue to handle UI changes. When I investigated Apple’s ComineLatest3 publisher, I found that it didn’t require this because presumably they’re not using queues internally, but are locking instead.

Conclusion

If you’ve ever written a custom Sequence or Collection type, then this kind of code should feel quite familiar; to me a Publisher is to Sequence as Subscription is to Iterator.

If not, then hopefully this article has shown that creating custom publishers is within grasp of us all!