Chapters

Hide chapters

Combine: Asynchronous Programming with Swift

Third Edition · iOS 15 · Swift 5.5 · Xcode 13

17. Schedulers
Written by Florent Pillet

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

As you’ve been progressing through this book, you’ve read about this or that operator taking a scheduler as a parameter. Most often you’d simply use DispatchQueue.main because it’s convenient, well understood and brings a reassuring feeling of safety. This is the comfort zone!

As a developer, you have at least a general idea of what a DispatchQueue is. Besides DispatchQueue.main, you most certainly already used either one of the global, concurrent queues, or created a serial dispatch queue to run actions serially on. Don’t worry if you haven’t or don’t remember the details. You’ll re-assess some important information about dispatch queues throughout this chapter.

But then, why does Combine need a new similar concept? It is now time for you to dive into the real nature, meaning and purpose of Combine schedulers!

In this chapter, you’ll learn why the concept of schedulers came about. You’ll explore how Combine makes asynchronous events and actions easy to work with and, of course, you’ll get to experiment with all the schedulers that Combine provides.

An introduction to schedulers

Per Apple’s documentation, a scheduler is a protocol that defines when and how to execute a closure. Although the definition is correct, it’s only part of the story.

A scheduler provides the context to execute a future action, either as soon as possible or at a future date. The action is a closure as defined in the protocol itself. But the term closure can also hide the delivery of some value by a Publisher, performed on a particular scheduler.

Did you notice that this definition purposely avoids any reference to threading? This is because the concrete implementation is the one that defines where the “context” provided by the scheduler protocol executes!

The exact details of which thread your code will execute on therefore depends on the scheduler you pick.

Remember this important concept: A scheduler is not equal to a thread. You’ll get into the details of what this means for each scheduler later in this chapter.

Let’s look at the concept of schedulers from an event flow standpoint:

Main thread Observe button press Perform computation Background scheduler Main thread Update UI Button

What you see in the figure above:

  • A user action (button press) occurs on the main (UI) thread.
  • It triggers some work to process on a background scheduler.
  • Final data to display is delivered to subscribers on the main thread, so subscribers can update the app‘s UI.

You can see how the notion of scheduler is deeply rooted in the notions of foreground/background execution. Moreover, depending on the implementation you pick, work can be serialized or parallelized.

Therefore, to fully understand schedulers, you need to look at which classes conform to the Scheduler protocol.

But first, you need to learn about two important operators related to schedulers!

Note: In the next section, you’ll primarily use DispatchQueue which conforms to Combine‘s Scheduler protocol.

Operators for scheduling

The Combine framework provides two fundamental operators to work with schedulers:

Introducing subscribe(on:)

Remember — a publisher is an inanimate entity until you subscribe to it. But what happens when you subscribe to a publisher? Several steps take place:

Uvitumedq wbubvyubh nesuec 8 4 1 Lorlixdom yxatzg regf Hipcafzuf xkioqip ruzwyvovloir 1 Judgojpux etosx moxauc gutcryamu(ag:) 9 hedfxlobuv Nusnclovom 4 taheomuw dofiab Garmqduduk wuwuori(uq:) tiveavi(od:)

// 1
let computationPublisher = Publishers.ExpensiveComputation(duration: 3)

// 2
let queue = DispatchQueue(label: "serial queue")

// 3
let currentThread = Thread.current.number
print("Start computation publisher on thread \(currentThread)")
let subscription = computationPublisher
  .sink { value in
    let thread = Thread.current.number
    print("Received computation result on thread \(thread): '\(value)'")
  }
Start computation publisher on thread 1
ExpensiveComputation subscriber received on thread 1
Beginning expensive computation on thread 1
Completed expensive computation on thread 1
Received computation result on thread 1 'Computation complete'
let subscription = computationPublisher
  .subscribe(on: queue)
  .sink { value in...
Start computation publisher on thread 1
ExpensiveComputation subscriber received on thread 5
Beginning expensive computation from thread 5
Completed expensive computation on thread 5
Received computation result on thread 5 'Computation complete'

Introducing receive(on:)

The second important operator you want to know about is receive(on:). It lets you specify which scheduler should be used to deliver values to subscribers. But what does this mean?

let subscription = computationPublisher
  .subscribe(on: queue)
  .receive(on: DispatchQueue.main)
  .sink { value in
Start computation publisher on thread 1
ExpensiveComputation subscriber received on thread 4
Beginning expensive computation from thread 4
Completed expensive computation on thread 4
Received computation result on thread 1 'Computation complete'

Scheduler implementations

Apple provides several concrete implementations of the Scheduler protocol:

ImmediateScheduler

The easiest entry in the scheduler category is also the simplest one the Combine framework provides: ImmediateScheduler. The name already spoils the details, so have a look at what it does!

let source = Timer
  .publish(every: 1.0, on: .main, in: .common)
  .autoconnect()
  .scan(0) { counter, _ in counter + 1 }
// 1
let setupPublisher = { recorder in
  source
    // 2
    .recordThread(using: recorder)
    // 3
    .receive(on: ImmediateScheduler.shared)
    // 4
    .recordThread(using: recorder)
    // 5
    .eraseToAnyPublisher()
}

// 6
let view = ThreadRecorderView(title: "Using ImmediateScheduler", setup: setupPublisher)
PlaygroundPage.current.liveView = UIHostingController(rootView: view)

.receive(on: DispatchQueue.global())

ImmediateScheduler options

With most of the operators accepting a Scheduler in their arguments, you can also find an options argument which accepts a SchedulerOptions value. In the case of ImmediateScheduler, this type is defined as Never so when using ImmediateScheduler, you should never pass a value for the options parameter of the operator.

ImmediateScheduler pitfalls

One specific thing about ImmediateScheduler is that it is immediate. You won’t be able to use any of the schedule(after:) variants of the Scheduler protocol, because the SchedulerTimeType you need to specify a delay has no public initializer and is meaningless for immediate scheduling.

RunLoop scheduler

Long-time iOS and macOS developers are familiar with RunLoop. Predating DispatchQueue, it is a way to manage input sources at the thread level, including in the Main (UI) thread. Your application’s main thread still has an associated RunLoop. You can also obtain one for any Foundation Thread by calling RunLoop.current from the current thread.

let setupPublisher = { recorder in
  source
    // 1
    .receive(on: DispatchQueue.global())
    .recordThread(using: recorder)
    // 2
    .receive(on: RunLoop.current)
    .recordThread(using: recorder)
    .eraseToAnyPublisher()
}

let view = ThreadRecorderView(title: "Using RunLoop", setup: setupPublisher)
PlaygroundPage.current.liveView = UIHostingController(rootView: view)

A little comprehension challenge

What would happen if you had used subscribe(on: DispatchQueue.global()) instead of receive(on:) the first time? Try it!

Scheduling code execution with RunLoop

The Scheduler lets you schedule code that executes as soon as possible, or after a future date. While it was not possible to use the latter form with ImmediateScheduler, RunLoop is perfectly capable of deferred execution.

var threadRecorder: ThreadRecorder? = nil
.handleEvents(receiveSubscription: { _ in threadRecorder = recorder })
RunLoop.current.schedule(
  after: .init(Date(timeIntervalSinceNow: 4.5)),
  tolerance: .milliseconds(500)) {
    threadRecorder?.subscription?.cancel()
  }

RunLoop options

Like ImmediateScheduler, RunLoop does not offer any suitable options for the calls which take a SchedulerOptions parameter.

RunLoop pitfalls

Usages of RunLoop should be restricted to the main thread’s run loop, and to the RunLoop available in Foundation threads that you control if needed. That is, anything you started yourself using a Thread object.

DispatchQueue scheduler

Throughout this chapter and previous chapters, you’ve been using DispatchQueue in various situations. It comes as no surprise that DispatchQueue conforms to the Scheduler protocol and is fully usable with all operators that take a Scheduler as a parameter.

Queues and threads

The most familiar queue you work with all the time is DispatchQueue.main. It directly maps to the main (UI) thread, and all operations executing on this queue can freely update the user interface. UI updates are only permitted from the main thread.

Using DispatchQueue as a scheduler

It’s time for you to experiment! As usual, you’re going to use a timer to emit values and watch them migrate across schedulers. But this time around, you’re going to create the timer using a Dispatch Queue timer.

let serialQueue = DispatchQueue(label: "Serial queue")
let sourceQueue = DispatchQueue.main
// 1
let source = PassthroughSubject<Void, Never>()

// 2
let subscription = sourceQueue.schedule(after: sourceQueue.now,
                                        interval: .seconds(1)) {
  source.send()
}
let setupPublisher = { recorder in
  source
    .recordThread(using: recorder)
    .receive(on: serialQueue)
    .recordThread(using: recorder)
    .eraseToAnyPublisher()
}
let view = ThreadRecorderView(title: "Using DispatchQueue",
                              setup: setupPublisher)
PlaygroundPage.current.liveView = UIHostingController(rootView: view)

let sourceQueue = serialQueue

DispatchQueue options

DispatchQueue is the only scheduler providing a set of options you can pass when operators take a SchedulerOptions argument. These options mainly revolve around specifying QoS (Quality of Service) values independently of those already set on the DispatchQueue. There are some additional flags for work items, but you won’t need them in the vast majority of situations.

.receive(
  on: serialQueue,
  options: DispatchQueue.SchedulerOptions(qos: .userInteractive)
)

OperationQueue

The last scheduler you will learn about in this chapter is OperationQueue. The documentation describes it as a queue that regulates the execution of operations. It is a rich regulation mechanism that lets you create advanced operations with dependencies. But in the context of Combine, you will use none of these mechanisms.

let queue = OperationQueue()

let subscription = (1...10).publisher
  .receive(on: queue)
  .sink { value in
    print("Received \(value)")
  }
Received 4
Received 3
Received 2
Received 7
Received 5
Received 10
Received 6
Received 9
Received 1
Received 8
print("Received \(value) on thread \(Thread.current.number)")
Received 1 on thread 5
Received 2 on thread 4
Received 4 on thread 7
Received 7 on thread 8
Received 6 on thread 9
Received 10 on thread 10
Received 5 on thread 11
Received 9 on thread 12
Received 3 on thread 13
Received 8 on thread 14
queue.maxConcurrentOperationCount = 1
Received 1 on thread 3
Received 2 on thread 3
Received 3 on thread 3
Received 4 on thread 3
Received 5 on thread 4
Received 6 on thread 3
Received 7 on thread 3
Received 8 on thread 3
Received 9 on thread 3
Received 10 on thread 3

OperationQueue options

There is no usable SchedulerOptions for OperationQueue. It’s actually type aliased to RunLoop.SchedulerOptions, which itself provides no option.

OperationQueue pitfalls

You just saw that OperationQueue executes operations concurrently by default. You need to be very aware of this as it can cause you trouble: By default, an OperationQueue behaves like a concurrent DispatchQueue.

Challenges

Phew, this was a long and complex chapter! Congratulations on making it so far! Have some brainpower left for a couple of challenges? Let’s do it!

Challenge 1: Stop the timer

This is an easy one. In this chapter’s section about DispatchQueue you created a cancellable timer to feed your source publisher with values.

Challenge 2: Discover optimization

Earlier in this chapter, you read about an intriguing question: Is Combine optimizing when you’re using the same scheduler in successive receive(on:) calls, or is it a Dispatch framework optimization?

Key points

  • A Scheduler defines the execution context for a piece of work.
  • Apple‘s operating systems offer a rich variety of tools to help you schedule code execution.
  • Combine tops these schedulers with the Scheduler protocol to help you pick the best one for the job in any given situation.
  • Every time you use receive(on:), further operators in your publisher execute on the specified scheduler. That is, unless they themselves take a Scheduler parameter!

Where to go from here?

You‘ve learned a lot, and your brain must be melting with all this information! The next chapter is even more involved as it teaches you about creating your own publishers and dealing with backpressure. Make sure you schedule a much-deserved break now, and come back refreshed for the next chapter!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now