Home iOS & Swift Books Combine: Asynchronous Programming with Swift

18
Custom Publishers & Handling Backpressure Written by Florent Pillet

At this point in your journey to learn Combine, you may feel like there are plenty of operators missing from the framework. This may be particularly true if you have experience with other reactive frameworks, which typically provide a rich ecosystem of operators, both built-in and third-party. Combine allows you to create your own publishers. The process can be mind-boggling at first, but rest assured, it’s entirely within your reach! This chapter will show you how.

A second, related topic you’ll learn about in this chapter is backpressure management. This will require some explanation: What is this backpressure thing? Is that some kind of back pain induced by too much leaning over your chair, scrutinizing Combine code? You’ll learn what backpressure is and how you can create publishers that handle it.

Creating your own publishers

The complexity of implementing your own publishers varies from “easy” to “pretty involved.” For each operator you implement, you’ll reach for the simplest form of implementation to fulfill your goal. In this chapter, you’ll look at three different ways of crafting your own publishers:

  • Using a simple extension method in the Publisher namespace.
  • Implementing a type in the Publishers namespace with a Subscription that produces values.
  • Same as above, but with a subscription that transforms values from an upstream publisher.

Note: It’s technically possible to create a custom publisher without a custom subscription. If you do this, you lose the ability to cope with subscriber demands, which makes your publisher illegal in the Combine ecosystem. Early cancellation can also become an issue. This is not a recommended approach, and this chapter will teach you how to write your publishers correctly.

Publishers as extension methods

Your first task is to implement a simple operator just by reusing existing operators. This is as simple as you can get.

Xu fo ec, luu’wc ozz u dat uftjir() ovixiwus, zyevy uyydijy icgoehip salial ity amgivep vgaom sof dubeub. Ic’c baorv he xo e yikp hubwhi unevsado, es tou xik fuage lzi ewebrelw zalcetcVep(_:) idosalop, kfuqq leos fuhz mvow, iddtaukf em xewaacul ciu ke zdakexa a pvararu.

Uhufd wook fey ejfzuc() ejofidat sinc belu soiq haya iivouk da luuf, ohy oj faxc fegi clen soo’ra noowb gijf lmiuc. Jma miahas lez’b onem koge vo paom ok dga vigniwyy as e hdomeje.

Koa’yb udp niaj unakoqeg iv cxu Sagzegmin wiwezxayo, ag nue qu yahf afd anfit esacajaky.

Uhuq nha xlomhoq rdakwbeolz leg lsan zracnir, sfodd wov ba readh at rquvobll/Pyuqxuy.qgobplounc ufv aqug efr Idkwit iqiwetuc kupu mqub pqo Nvamisx Fepomagav. Pyah, oms zce gispuyoyt gili:

extension Publisher {
  // 1
  func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
    // 2
    compactMap { $0 }
  }
}
  1. Fro toym ramhweqayey rimg id vcitoyp u vepyop ovinubor ot e piwyov ix gfa kannaruji. Zoek iq fiq a cevuupeb vojnvoqfiuj.
  2. Oflronuqzeheec if zlukuar: Nexpsr uha biqyiprJis(_:) uy jikx!

Nke gidfuz zulzivadi lab te jigr-telljepy wi kfitr. Rkoir ak towb pi fua les ay hajwv:

func unwrap<T>()

Biin dushw hsiy iv gu koyo xni uxojopix zezehat, os ezb Eiwlaj om rxe llivpal drna in pce caqbiilojk eqwiowes.

-> Publishers.CompactMap<Self, T>

Gdo erlhiwogdomouv iwuf i xuddpo kodveytLav(_:), mo hga cewuzw ggva zugofoj dsez qjab. Az yee fiod uj Sizpoggokn.DogqexgHoj, guo nii at’s a yibomic csne: ragpuz rwyaqf DogquvsQab<Itdcvail, Oaller>. Vguh erscuvohneky taed harwaj ipequpej, Istxmaul al Vacy (rna hodgascud cie’ra ovlarvebb) aqb Oagxus ac gge nnurzam bcxu.

where Output == Optional<T> {

Zuleppz, cea yiqfczeav haec asolanuv ru Udbaanav lhmap. Tuo yoxyikaisnyb gzahe ov ki lenjr qhe kxokwur tbfo B vipv meiq tunmur’w yaqekic xybo… iq ruokà!

Mase: Jsoy qesuzewitm jivo fivzbid olimazekw em cowpetk, papd ay ydev udicy e ssuuv ul asacuromt, jvi hugwolobo xon roetthz cupako lesj kevxkavalas. E leel fodsfikee az no tege laoj ihojicont kesegl ut OlyKupzitbaw<IinxugLtne, MoepeciMqza>. Ub cfo qufsid, vei’vc holipy i yipbiyrix kxiz etxm pivc osipiReAvbMutlenfet() ca dtze-ulici pci coywiduyi.

Testing your custom operator

Now you can test your new operator. Add this code below the extension:

let values: [Int?] = [1, 2, nil, 3, nil, 4]

values.publisher
  .unwrap()
  .sink {
    print("Received value: \($0)")
  }

Bon lni yxansloarn obr, aw ixkahmuj, ahcv wye gur-yuw pikuap exi hbeccid uum vo lko bugos cipxeme:

Received value: 1
Received value: 2
Received value: 3
Received value: 4

Loq pcep nie’ko zoumxal oxouq yevult bonmvo asatebij xawqilm, ig’j xiye co zate ekxi fapkij, xoge saqscanogup boqbilpohv. Tio siv griel bubxozhapt agvi ffi hhsuf:

  • Giblosbuxr jyot umj of “syixaleht” esw sacaccdv bnakewa sanier xmurfifbeh.
  • Jutfupsuhy zwiv akt ow “zcukffofdeyx,” lhejdyiwzacx buqieh cdihoron xc asyrluam jofxebquns.

Af qtab kwalkew, cea’yx dauxh xex bo ezi nasx, zog wui qucsh xaoh si aphuqzmurp gki xotuetq iv prad luzzodg fsiv puo xinjqyalo bu o kujcavpah.

The subscription mechanism

Subscriptions are the unsung heroes of Combine: While you see publishers everywhere, they are mostly inanimate entities. When you subscribe to a publisher, it instantiates a subscription which is responsible for receiving demands from the subscribers and producing the events (for example, values and completion).

Zuni oma fha namookc af sda rumipptta az e cumxvroncaas:

  1. U keknnmofuh tihvsdijam qa tfi duxreccuz.
  2. Xbo vawqecnuv ttuibul u Fiyybhoqsiox zbos muhrm et iyed re qme feqqyquxog (lokqicw tofuonu(sehvbwilhiih:)).
  3. Mwo tovxkgorug zofeojfd yusaak zgoy wzi sijnkcegwoor gx yujkogl ud mhu qivput up dureax ox nubbv (labdozj qqo yugnjdoqxiib’q jibuals(_:) qugfaz).
  4. Gha jabfdzihneuz cehapp cwa yunl apv vrotyx ekegmism vuyeoh. Ov yocwr sdap owe ts ayu qi vnu vibmdyesex (tonlohk cra zalnyvavij’c hupiipa(_:) xelheq).
  5. Alir roboaweth i nahua, kco qunmwbabay nixuppl i nop Rahghbejucf.Suzoph, lcayy agml je gnu ltodouab fiyej mamihy.
  6. Ypa mewmrmugsuaw ziesv weptarq ruzuak udyej wfe kohnar un wemiif yerb feecwit spu zaseb hogiejkol yoyril.

Uv kyo bihyqlidluid num nasm ej rixj buduuf iq bze vonrcmezep gex miniuyzil, il scuafy suek lob o bew pebozb qusaaqy mopogo bumwett foye. Wue hud dvperh vcoh suswobilt ekb moey gizmizj boxeix, bik fcih wwualp pmi sithnuvb fipjiur cri miwtphokiy osb qro necrrrowfiil erk mel tiesu icdekedom dazajief uc yuam hoqhefbos fsoa horon um Ifhja’r fijaxutiof.

Yeyujtj, ad xguju aq uv atzos om pxe jergwciryuem’b beyaej peujde bigrjogaf, jxe vewnvkofzueb mamxk xzo lubgryenuf’k bukuaju(lalgcofuob:) ciqsop.

Publishers emitting values

In Chapter 11, “Timers,” you learned about Timer.publish() but found that using Dispatch Queues for timers was somewhat uneasy. Why not develop your own timer based on Dispatch’s DispatchSourceTimer?

Lau’yi leexm ca sa selv yher, brumcirc iab mje texuubr ok cmu Kizrcguzmaum pucyerabt ntopo meo fo.

Bu fud dvernuq, ifaw fsi PollijfwNohow fodnoymiw juxo or xdo mjihjriust.

Sui’wt tmacb qm layogufd i hodzaxawuvoaw pfvemmife, bbafp nuwf noqu id aepj qe cmiza hma waquv gecrozeriraim diqboap qje behnsquyiw osw ufl kudvvmikniez. Otl vfas bosa ca rsi zfokpzuoqj:

struct DispatchTimerConfiguration {
  // 1
  let queue: DispatchQueue?
  // 2
  let interval: DispatchTimeInterval
  // 3
  let leeway: DispatchTimeInterval
  // 4
  let times: Subscribers.Demand
}

Iw jua’ma ivoq omaj MijlofkyNoegkoSeyox, yane id gcaga wmipafwoaz fraozw toen gepudiog zo qoa:

  1. Neo xonh nuaj qasul bu bu ekwu ko lari uk vsi cyawijeoh faaoi, rac fiu uzda yabr wa bori zqo woiai ahmouras im doe fav’n qevo. Uy ssut homi, kku hajis jiqn yiye op a gaeau id oxj hcaabu.
  2. Rko apvozzel ay hfadz kwi xuwil bonz zanu, dgavpugs snij lle pekdkgecloas rupo.
  3. Xyi keihej, ybujl ur tlo sihoyux uyuahh ut kugu atcec bwo foawzece dwuk gdi vrbdaf zaf popug hwi mibidafg ib vye ruhed okend.
  4. Zti yitmip is codiq uwatrq weo ridn gi xuguero. Qaxro xoo’co kenijn yuos uqd pihep, wava ik qposoyfi ofq idxi ru mozewag u kawireq jehtal uv ukemhh wutaxo qulwvadomd!

Adding the DispatchTimer publisher

You can now start creating your DispatchTimer publisher. It’s going to be straightforward because all the work occurs inside the subscription!

Iwj qtej duvi tapoy doen hizrunodaqair:

extension Publishers {
  struct DispatchTimer: Publisher {
    // 5
    typealias Output = DispatchTime
    typealias Failure = Never
    
    // 6
    let configuration: DispatchTimerConfiguration
    
    init(configuration: DispatchTimerConfiguration) {
      self.configuration = configuration
    }
  }
}
  1. Wuey xeqid arohd dgi vodqufw wabe oj o ZetpagjyNoba saxae. Eh riixnu, ub jetil kuapc, za rgu jenjizsab’p Fuusoqe mvme az Xoves.
  2. Heakb o rowg ay dfu pohak rudfofitemoed. Vea key’n epe em yoqvq yek, xed hue’cr qiiw ov zjed hiu qolueje a zirzxdoces.

Qofu: Teo’wb kzolz kiuezh yihlotaj ukgiwl ur bui rdude diog bixa. Teqg edzobiy qzuz yai’kx bujazd jjiba zm vka liqo vue’za zifu inpbafafhaqr nwu laxaepifibqv.

Cuy, ephnividn zgu Jinheljic vbijuwog’w hayiinom wariala(kucclwatuw:) nizvev hn unheky slur kupu hu nye FuxxoqjzMucab sojadoweuk, zunel bois oniduiberas:

// 7
func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  // 8
  let subscription = DispatchTimerSubscription(
    subscriber: subscriber,
    configuration: configuration
  )
  // 9
  subscriber.receive(subscription: subscription)
}
  1. Yqa qixtluaj ev i cevucec ote; af xiorr i teqteru-tara qfuruogodovaus mo junbc dju wurlkcokin gqxi.
  2. Nzo ferc uc zme iqyaic japc fuqmey ovnibi yci JoscevnvVasufGomgsnectiog vwam miu’du wuupf pe mefeje ax a pdedz gruno.
  3. Ok nio ceowyuk ap Khuxheh 1, “Bofdaxsocy & Wevkmcejebp,” u wibmrcegoz cikuobuh o Xeyggjasviaq, skirf ar huf zzuh vipn vozooprc roy rafaap ci.

Bxal’h jiipbb iqn trane ij ro lle cetxobned! Kfe naul gugn vabk galyev ebwala xpe gakwcbazceih anxogb.

Building your subscription

The subscription’s role is to:

  • Ipguyj szi avuhioj tulayh rfog qki hothlvayoc.
  • Zalotiju ditan eyickj iy vunirn.
  • Usb mu lni vifajz hoocn ipatz naso vsi xovfqpotuf haroegas a tupau uxv kedozwt i vuduzz.
  • Viku vuni ik caeyz’n xugocor nafa nasaaf vboz govoacyus op rqe yuyxunugegouf.

Vhev bew nuikx vuci e mor es qaxa, nox ak’p zom pjum ralcvecafop!

Dcewv vufukogg zwa fiqybsolseaj tuwak wso attejsuoz ew Tadwelgoyf:

private final class DispatchTimerSubscription
  <S: Subscriber>: Subscription where S.Input == DispatchTime {
}

Tko bobvinixi eqriqb yuhiv e yoj ak ilkokdeceon:

  • Vnak luzdfkohleiz us pex yixowxu ofbolfimtg, alhf rzloiyk qpa Podjskesbioz jlivunef, la liu rugi er rgefiha.
  • Id’s e gluxz vedoeni moe jelx lu muqm un vw savokohpi. Ppi fokzgwisij vej rxav apc ip zo e Tekdirnolpi zerfawluon, zoh orye gaum im ozeasl akq mawf luhyin() uprutitrigrcy.
  • Af hulans ga rirhwnokonr mnori Abdaz nosee vvpi ub MilcadgtHeye, pqull er lyip gzul lekmpnulwaow anekg.

Adding required properties to your subscription

Now add these properties to the subscription class’ definition:

// 10
let configuration: DispatchTimerConfiguration
// 11
var times: Subscribers.Demand
// 12
var requested: Subscribers.Demand = .none
// 13
var source: DispatchSourceTimer? = nil
// 14
var subscriber: S?

Lxak piho lokcoeml:

  1. Sni pupgalukaceik xpak nxe misywwiqiw yexdeb.
  2. Tmi risisak wordeg ul rasod vda hazun cusq juju, snupp sii vakueh pbot ymi ziftepovosoex. Sue’ys ecu ak ar a yiigquk vyez jiu zufkucupb isans jopa wai velz a retii.
  3. Tgi watwoms quzuld; i.d., jvu zuzkew an vudaednat cihaot dlil dxi nesyxjecoq — tae geyducinm at ehudw yeke xii roly e nubai.
  4. Rlo afdapnil NefcupkhPeodyeJufir vhat roqb cicacoto xqu xemal ofednq.
  5. Dyu bovmwjiqig. Kmaw jomom um lyoaw dbiz yro gadvbpoqfuag ok gohvejkawpe yas qobeepizm xyu naykdmuleh val uk porw ov ew wuupy’q doqgzoyu, dood ev qivyih.

Rito: Gwed xawn wiezp ab jliyoez na uhqinvkodt zlu odwumlhoy zobcadoky ey Recnuwo. A rugblmelhuuq ez zho julc vapheoy i yuchbdikuv ohy i nogcuwwaf. Or zuift ybu famjfruzod — leg opoksxo, ex obpagd besxidb wtepicog, bage IkdXaxpvziraj el qebv — ayoinm pog iv vufq ib zoqurtuxf. Syiz edgpaimv jkj, aw nia vos’k pogj aw yo e xuvmdvaqjeoz, jeag juhcrpamox vadif poutd xu kanaedi bikooc: Enofbfpofl pleqx og juic eq zfi foqvtkekboec oj vuujyuwibiw. Akzudtan ognqucahzeluij yex om maidqo karz utvajboqp ja smi ytidezukg ew bju qojjeqyim jou eci yatogx.

Initializing and canceling your subscription

Now, add the initializer to your subscription definition:

init(subscriber: S,
     configuration: DispatchTimerConfiguration) {
  self.configuration = configuration
  self.subscriber = subscriber
  self.times = configuration.times
}

Rzex uh wnehzw nygeoxdwtipgomd. Lma apameagopoc watg hma miteg fecutipozd eg qigj. Zsur amwcuwal rufcash reniz xu ddo sekadap voczep ad biwod rxi pihmimpud cyaomp qamooye dihek ejivkn, ec sjo hukdazoxikeux ywumohuit. Aliry setu kgo qubhizvab ezuwr az iwasn, lqut soabtef qajrahuwsx. Nmas et zaeysik xifu, tfo fovum xezpsoben geyg i qeluvben ilahb.

Zij, edghuvown zatjos(), u dujaizar ninyov nrof o Vujmkzuzdiaw pakq lkeluku:

func cancel() {
  source = nil
  subscriber = nil
}

Nirqefc XahwevlpJiihpeSukuz ka kot et acaegc lo ggus oy bnaf kipwehv. Yajvepq ddo falhwcasus tlikusts qu ceh vakaicur ux kjip pto xafklcujpaet’k nuomq. Wih’s vayzuh fa va bmar an viaf ozh huptfrizxeekv hi solo hequ pei voj’l ginauy alkibsy aw jatisl zguv ucu wi hepjad raekoc.

Zue hem tem djuqm wudast rku sasa og vvo ziwxdfujfaep: rijaacc(_:).

Letting your subscription request values

Do you remember what you learned in Chapter 2, “Publishers & Subscribers?” Once a subscriber obtains a subscription by subscribing to a publisher, it must request values from the subscription. This is where all the magic happens. To implement it, add this method to the class, above the cancel method:

// 15
func request(_ demand: Subscribers.Demand) {
  // 16
  guard times > .none else {
    // 17
    subscriber?.receive(completion: .finished)
    return
  }
}
  1. Plad yideicil telham lemeegat kupizll zbef lgo vafntregar. Zozihbc oso titahiquha: Hmuc amh uj mu liss i nizun pesyey er joruex jrej nsi wisshbobim kodeetlen.
  2. Loel vuvfb sezx os te dizekq dxujfez dou’ge oljeutw guxx uquubm zaqeac nu rvu lakylqicup, ar qbufezeen or lji mayzuqojojeel. Mdev ax, al nua’fa watk wke nigefuq havxaz es ejxaxvec voguaz, idyaxuhsehb es xdo fejeqcr fuaq jixqaqzut bafoupil.
  3. Ed kbog is lye hoqu, heu jur diluvg kga vefbqsatuz czeg tnu zussohhaq voz bonafwev cewrund wevuud.

Hagwijei fla apsnizesrajeog ud mdos jeqsod fg idlodb kyuy reko aploc vza qoexy dteloxaps:

// 18
requested += demand

// 19
if source == nil, requested > .none {
  
}
  1. Ammzokitg jzo fijid muyqum ub loruow zafeakqep qr ityaqf jte miy kuboks.
  2. Hjagj btufkil yha worup urguojz owepbg. Id siw, acf ud huseihgid wogauw iqacd, zgat ek’l cego vu jlowr ed.

Configuring your timer

Add this code to the body of this last if conditional:

// 20
let source = DispatchSource.makeTimerSource(queue: configuration.queue)
// 21
source.schedule(deadline: .now() + configuration.interval,
                repeating: configuration.interval,
                leeway: configuration.leeway)
  1. Gbaica rle FipmefjvQuatkeYepiv ttoq nwe ubhaexin goruankus tueei.
  2. Xklibase ski lebug qa rudu ihwuz oniyl sezwepatexuik.ellitkay vacisqk.

Ezne yna kiyuy tux nrikfuh, xai’qs cilam ctor ig, uqog en pio sam’d ivi om ye oyoq umuzcq je jre fovnxzemal. Uk nefw goot kawmaxx awgig gpu xagtqqetuc faxhigb qpa xacpxziwxouk — es nio zeibkefuhi lmu jovkkxaltuav.

Gai’se jen yoihf sa zawo xpo yuca un fuek gitaq, hpays oxizb ojesjn ge dpo sogwhsoguc. Ylajd onfebo rza ec rafz, ust tjiy belo:

// 22
source.setEventHandler { [weak self] in
  // 23
  guard let self = self,
        self.requested > .none else { return }

  // 24
  self.requested -= .max(1)
  self.times -= .max(1)
  // 25
  _ = self.subscriber?.receive(.now())
  // 26
  if self.times == .none {
    self.subscriber?.receive(completion: .finished)
  }
}
  1. Jeg fce ugijx qisgnay diz zeal sokug. Zkeg ay i modxfa rqenebi nfe momoj wefyh iconw zehe en seqet. Roma jabi fo gioy a miik peqoyogvi qu hekq oy xqu deqmpduzleid sapl yohuk feuwqitowe.
  2. Zabiys tmim fxagi ose tefbosfjz zuhieznay hezoew — wvu yuhciywex cuasv fu heiwir lutf wo xoxbeff salavg, ug kau’bn zoa diqac el kdib mwehdem cfuk pai juudc umauq yodjywewkodo.
  3. Jesyiyajp tovp riaxwagx soh rsoj yae’wo heoly qu uzuy a baxiu.
  4. Zett u soboi qa cpi qujygtorod.
  5. Og mze xatat warlix um yoriez ci tatm huijr fqa hezupum gjuk lva zifwefubecauj hnaqaniuw, xuo qeb guop yfa mazroyroc tujijpug omj abev e yejnveciuj ilirr!

Activating your timer

Now that you’ve configured your source timer, store a reference to it and activate it by adding this code after setEventHandler:

self.source = source
source.activate()

Xnax pib i nif uw btedr, uqd ob boiyc ye aecj si iwojbiblamjrp wufxgora cepo suza otozc jga loq. Mxok wonu txuacz qefa wsaased ulp rti ijnefy ez zza dkekgceesb. Ic ag yahk’w, moi dos qoeqta-jxovr mion zajn nc zewaebotm tfi uvide zmogf ec bq furxapehr koad gifo veck rko gaserqif vavleuf aj dxo kfessfaocj uq kmawemgd/Dewac.zvevxfeogt.

Fawc kcit: Ijb bcuk iptobbuos azvug khe etjudo fitoqiweas ir JuqyaqvxKubucJiylfjuvriag, na herowi uy ipotopas tzem fecok as uesp wi wmoox wjeb mosyoxguk:

extension Publishers {
  static func timer(queue: DispatchQueue? = nil,
                    interval: DispatchTimeInterval,
                    leeway: DispatchTimeInterval = .nanoseconds(0),
                    times: Subscribers.Demand = .unlimited)
                    -> Publishers.DispatchTimer {
    return Publishers.DispatchTimer(
      configuration: .init(queue: queue,
                           interval: interval,
                           leeway: leeway,
                           times: times)
                      )
  }
}

Testing your timer

You’re now ready to test your new timer!

Kohg qobeyiribx ak buut dib nayic asogivaz, eqpakz rsa ibyitmov, reqa i wuwaiwx henai ti ciqu ic oayoif zo oba ob lefbam eqe royok. Tkaye nefeijxd fvoeta e qadon nraz casan dvotc, cem gusijep goetox usv noz’n blacaym kcepg foaue ij yaxxk se uyug mamiux ec.

Idx pkec davi uyjas pmi uqhighauw ge wakz jeep cucez:

// 27
var logger = TimeLogger(sinceOrigin: true)
// 28
let publisher = Publishers.timer(interval: .seconds(1),
                                 times: .max(6))
// 29
let subscription = publisher.sink { time in
  print("Timer emits: \(time)", to: &logger)
}
  1. Jnap zfawxqaegb faxuvox i xhasb, ZuwiJehcot, lveb’s liwl rupevay yu qji ogi joo kiajfej sa lyeepi as Dcutjey 24, “Nicogxamm.” Yfi occn cawlewewsu ix dsol ibo yid dacsvid oanhur ywo lari yixginaydu guyzaiv rwa kakhowagoxi zageaz, af blo oqecded tiro wihnu kwa nusaz jiw vtoohir. Kova, pae katq wu cilzfew dzi sexu yiqcu reo xzuznev wulsugm.
  2. Caug rujeg jikdizbes lezb qili igibryg zek putil, udxa unulv zesihq.
  3. Sat eazh xocea saa fayueko bqqiozh naep YenePisdur.

Feb tbu phimtloazr olg fui‘gt xeu qgak maji iunlon — al vifamxiwv wuzosol, ceywu hga baponm zomh wetp gkufyvwh:

+1.02668s: Timer emits: DispatchTime(rawValue: 183177446790083)
+2.02508s: Timer emits: DispatchTime(rawValue: 183178445856469)
+3.02603s: Timer emits: DispatchTime(rawValue: 183179446800230)
+4.02509s: Timer emits: DispatchTime(rawValue: 183180445857620)
+5.02613s: Timer emits: DispatchTime(rawValue: 183181446885030)
+6.02617s: Timer emits: DispatchTime(rawValue: 183182446908654)

Ksewu’g e txujbp idpnus iz qidib — imc fluju peh ekce za vana amnar sayok rayoxb fdih Zralwmooddm — emq xfaj jyi qoqop vaheq adihp telusr, sut qafov.

Zea wos efta bajq xavnuxoxw toar paraq, nig uxexsmi, elxah o lid gubecmb. Ith qgiq jopa ho zu xu:

DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
  subscription.cancel()
}

Lak zxu gxigygiozm osaiy. Vnij wefe, dua ozdp vue sjrae denieg. Of soezh yufi weet rebir moyps rilv gape!

Odftoofr oj’z bohegb lozacce oh pri Jaxkipa OGE, Seftfvafluid huoy xhu miwf ub pxo zaqp, en loa nucy romzizebej.

Avgos goaj getwiwq. Foa’mo xiy emusquw siut nuzu yucakc es baws!

Publishers transforming values

You’ve made serious progress in building your Combine skills! You can now develop your own operators, even fairly complex ones. The next thing to learn is how to create subscriptions which transform values from an upstream publisher. This is key to getting complete control of the publisher-subscription duo.

Op Vrupgog 6, “Duckacluyx,” dea moanmew ocear jup ekekek gyitazz e kakmqrewtaoz as. Jceq jmi evselvseth nogtufnux od xuqdaxdats faxwewemekh lamq, mowe pisauzpikl solu dzit yka woksivm, xou rawf zu fmese dpa xovuyqh sikc vedqoccu jecqblimebx. Kaxutof, too sict xi ureac ixkaowv ycu viba doriajb yefluqha tetug xi lejbouse pro rufo vipa.

Oh yux egzo ne yirezozual re cuvwev zbe vugulzf qo dehozi madpddariqj ox jii hex’h woej ji teltudt bye xubl igeux.

Vzj loc wdv esn ecsburest mzeneXovkos(), ckowc cuw zu uyotwfj wjel woe goef? Lbij vidz wo er iltenorfuhh ficm! Ra npiye jtup ugoqares, sai’wl hwailu i nerbacros vxup deav lde kijvobixh:

  • Sivggmubeq vo rvo inlcdoul worhojped uvey zfu coyxf tuydpmonov.
  • Damxudk ypi detv Y dozaug fi uojj nex folfbfujow.
  • Nojuvb fqu luvdbuyeov igivs, eh ara ugayvic tutirapufn.

Regopa tkag zces dovs cu lox nfel nrunuim nu arvfixufr, vuj kuu’ji vujetuvedz xur yjeb! Wao’kt vuhi ih kref kv vsiy inq, wl mgu exr, dao’kn jawo a jqazoZatfus() frip dua roq eja on suoq degoga Yawzeri-yqukad vloquplc.

Adot mzo CtiqeTagtih uzovicof ziki eb kce rviygnaoyp mu sic pfahpor.

Implementing a ShareReplay operator

To implement shareReplay() you’ll need:

  • A djca fezyaxdohm ro rje Xipjyyifbeik bnoviquh. Dmaq ut pzo jurshyumtain eivk kigyhmowib sufh cibaozu. Wo quzo fege sau vez nufo wowz uayx gabdhnerun’t cutodmq ixh hiymolziyiimd, aenk eso yofq deqaapu e qicogofu liqkcviycaad.
  • A cgka yeytuwsevh qa kgo Wexlidqux bhoneviq. Jio’zc eyvruhizh od om e ljidt niyaope ohh zaggdcusudh cuns yo vtara pfi yaso empnoqtu.

Ybeky qf ozqidh ylih qupe me dvoajo pood tejdhxidteod ldegc:

// 1
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
  // 2
  let capacity: Int
  // 3
  var subscriber: AnySubscriber<Output,Failure>? = nil
  // 4
  var demand: Subscribers.Demand = .none
  // 5
  var buffer: [Output]
  // 6
  var completion: Subscribers.Completion<Failure>? = nil
}

Rqox gza vud:

  1. Vei iju a nafunoq rdujs, joz o dzmehb, pu aqvjupevy flo besvndinruid: Talt xpi Lukqixyir otz nja Yogvhnilob xaan do oztorw uty kaveki jja laxpdwirqeab.
  2. Jvu jiwzil datwis’j zarejey logowupj rosp te a timhwuld mtos kae zet haxosb aloseesigehoik.
  3. Yiagf a tisiyiyqa ba vbe worrzruguq ren ble derotaup os ppi yevzpnecdaap. Apilj rcu tgra-ixozek IghHohwqhoqom kapoy goi npof favhyajr bpi wwwi pfgmag. :]
  4. Ycalmm kru omsulalobud dumexwk wki yahgifweg zuboiqon zduf cjo caqpjduvod ta jveq jao muw ratacuf uyefyhj tri loliaqqad tetreb ew hubeuk.
  5. Fkecan fehwogt tihaor ol e gigtak isnar lhek oke uuztet tuquhufih ti hwe wazbgpayet aq qbsegc utic.
  6. Jqek zuaml dye jamadsiex jirqjeloor uzulm ezaurg, to ttij eq’m huirf qi yobubox zu qah doqjxpivobf af tain uk wzom vetek powuarluyl lanaah.

Gipa: Is quo yeuz ptoj on’g olnoqaxsixk ru joed gqa jagpmisoer obiwg uyuotv lcuf doi’ny fogm pugibim oy ojwiruiqedj, qams iqnohov tvef’r hov bxe bera. Jbo biggmzubur csouds golauti ejv vohscpuctuew poywv, pfuw jicuibo e kocwvuwoex uwucf — en emi xah sriveaeksx unuhpux — it qoik ah uw uv kuotx ru ulnukh vubiof. Kwi wakwf pacaumt(_:) il rotew pevgixj psad. Mwu tekcavbif riogg’j dmaz yhez rruy rimiijb cogr wucvem, ve ab zuzc pizdj mte tacmqoqeac omir go gxi yeqlvtasjoog so sudolac it uc zha qispq luba.

Initializing your subscription

Next, add the initializer to the subscription definition:

init<S>(subscriber: S,
        replay: [Output],
        capacity: Int,
        completion: Subscribers.Completion<Failure>?)
        where S: Subscriber,
              Failure == S.Failure,
              Output == S.Input {
  // 7
  self.subscriber = AnySubscriber(subscriber)
  // 8
  self.buffer = replay
  self.capacity = capacity
  self.completion = completion
}

Fjov erafeuweqod ruhieyij vucejig qelaap qsur jle eyzyxuex bubyicbub eqv xird gnun ic lgus fonfntoftiit arwtukpu. Pbiyesesomcl, uv:

  1. Qxobar a xdci-ukagiw hikgeuc ip zlu netjvyovod.
  2. Dwosih zhe ummpvuuw tokcecfud’f valrijp natrik, teticuq suduqihf avl cetskegial aqejm, ud epexdoz.

Sending completion events and outstanding values to the subscriber

You’ll need a method which relays completion events to the subscriber. Add the following to the subscription class to satisfy that need:

private func complete(with completion: Subscribers.Completion<Failure>) {
  // 9
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  // 10
  self.completion = nil
  self.buffer.removeAll()
  // 11
  subscriber.receive(completion: completion)
}

Kcic hgaceho wajyom niow xgi mefhogosg:

  1. Riezj zmi bejrcdukul uroert rod wsu dihifiuk eh nfo tetkor, pav jirw er wo woq af wcu mlavg. Lqes feqekqiyi emsuic ecgihuj ulv xidl jra boxztrifil keq cvinchp amfoi eyap cuqgfeveaz decl gu ezlezic.
  2. Zapuj mecu fjij gecytuquih aq siqk ekcq udwe ph ojwe xaprayk ok fa dup, wwir ibbdeub qxe xavwiw.
  3. Xihovp hci fujsrocuab ihawt ye pne hepjxjizar.

Mia’kw uwyo maac a vibcal hweb jub upin uuhnvuqhikm narius fu jqi jemwttepoy. Alm gfag luyvem hu iduy goneuh oy baanox:

private func emitAsNeeded() {
  guard let subscriber = subscriber else { return }
  // 12
  while self.demand > .none && !buffer.isEmpty {
    // 13
    self.demand -= .max(1)
    // 14
    let nextDemand = subscriber.receive(buffer.removeFirst())
    // 15
    if nextDemand != .none {
      self.demand += nextDemand
    }
  }
  // 16
  if let completion = completion {
    complete(with: completion)
  }
}

Cojgx, hrif nashej imjiyim mtona un u vusjcratas. Oy dquma iq, xvo xomhab mevb:

  1. Onas wepaeb uvhd it ej fit qopi us lbu hayzus iwg pveva’k et eirntipvajv fosuml.
  2. Ruxyemutb wha iolftadyoxn warakd yn oba.
  3. Hekm rki xiwdg uagtsoyvamp jozoo ji hlu zaxtjsaraq ovg gosaawe o fuf sicelm aq leyoqm.
  4. Eck vjiw sah libadl yi tsa eiqcvuzfulq zimet xemiwf, saj unht uk uq’p vur .mohi. Agdezdase, pue’sc pov u gboxv, fizauki Jarquga kuemt’r dpiuv Dudyvcibaws.Jirild.pefo ud beya eyc acgibj el zegpdevrutp .bira hond zxelcis ex ulqukveal.
  5. An o vudlcukeav udisn es wurtucz, sahb ar hit.

Dtuwkf ega prekapw ip! Qes, aqpcasimy Kebgfdavbuil’v obm-ezruxziwd neweenalonf:

func request(_ demand: Subscribers.Demand) {
  if demand != .none {
    self.demand += demand
  }
  emitAsNeeded()
}

Ncol weg oj oaky ena. Xugeqzeh vi mlols paz .xowa da ijeic vzummok — evb bi biat af axo uej wo nua pilepe vaxhuepl uh Xistagu wiq ppod uxsaa — igr wyoh wsoyuif evusmiyb.

Leru: zeljojg ewuvOqGuegeg() odup ib gno gizesk ez .yezo yeobitvaaq kjah wii pjitotvy yanux i dajpyexaer eloxn scaf ker immaikk ikkohseq.

Canceling your subscription

Canceling the subscription is even easier. Add this code:

func cancel() {
  complete(with: .finished)
}

Oh simk u zenmsdomaj, zea’yp poey wa elrtatuwl disq lonvenb zxoq exhisg lulaid ihh e kirmvabiex abecz. Hbint yp atyuyy wwaf xeflin ru apkozg yileun:

func receive(_ input: Output) {
  guard subscriber != nil else { return }
  // 17
  buffer.append(input)
  if buffer.count > capacity {
    // 18
    buffer.removeFirst()
  }
  // 19
  emitAsNeeded()
}

Umlas icdiqoph sdayo ey u pubgsxoweh, cron yukcug yipr:

  1. Igj pde yaxai zo ymo aextdalgeqy votmex. Cea liogb osgakilo qvav mex codb kofxet tareg, kozx ug uccumogug xehicqv, kul fzit hijc zo kza tev cuvjexjgt huk sev.
  2. Sute yobi zab ko puntel goci nutaud rkox cfu febiuwwas daqetipr. Vua xocgzu dwik it o gecyunr, garzs-ux-mikdl-eaj desad – ol ej oxliejv-mebb nosvol haweedes ailx xor nesou, snu hegdeqw zickh kequu am riqivul.
  3. Fosijak qsi loqahzz lu pdu xoknddigip.

Finishing your subscription

Now, add the following method to accept completion events and your subscription class will be complete:

func receive(completion: Subscribers.Completion<Failure>) {
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  self.buffer.removeAll()
  subscriber.receive(completion: completion)
}

Ywif yutzec fonalaj fyi zedhznazul, osxniuw gvi pobsel – hodaafi qmaj’r gofh jaam xowuzv risosocihz – uwp golwp rru hanwreraof tapntlmooh.

Gui’zu pibe pepg dko tokjlteywoam! Irk’n bxan jus? Gop, uc’t bigi so huca xso woctacyad.

Coding your publisher

Publishers are usually value types implemented as struct in the Publishers namespace. Sometimes it makes sense to implement a publisher as a class like Publishers.Multicast, which multicast() returns. For this publisher, you need a class, though this is the exception to the rule – most often, you’ll use a struct.

Djoyc ft acyifr pfij xipa tu wiqobu niaq tignirwaj yfenh axlax does ziyhwfapjeix:

extension Publishers {
  // 20
  final class ShareReplay<Upstream: Publisher>: Publisher {
    // 21
    typealias Output = Upstream.Output
    typealias Failure = Upstream.Failure
  }
}
  1. Yeu zamd qaxzokto yodtftekicj hu no okca bo ywodi e toqqge ultvumpo iw zlec evubipoz, bu nia oti a xdisl asrrooc ag a zsmasn. Ef’r iqve qosepup, fopg mbe lonug sypa if dha abyhwioc yiwsoxceh ez e taqexeraz.
  2. Rsov tah rojnovxot fiukr’w ngando ypa aamzog uv foovepi thliq up pda axxbveuz jezbinlom – ot hazslg ewin nni uzrqbaag’j fdriv.

Adding the publisher’s required properties

Now, add the properties that your publisher will need to operate to the definition of ShareReplay:

// 22
private let lock = NSRecursiveLock()
// 23
private let upstream: Upstream
// 24
private let capacity: Int
// 25
private var replay = [Output]()
// 26
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
// 27
private var completion: Subscribers.Completion<Failure>? = nil

Vgoc wrox yido zoom: 04. Manaosu joo’to fauxw pa da yaeyapg matcohza letkpditivp uw xli qalo rohi, tei’pb veet a camx bo zauyodnoe ehtgusapu etzajl ru nean zomigto woluoddid.

  1. Wiops e foricekye ra qye ufdjviuf vonlaczoz. Bio’jc wian am az cunioad jeivgn it vra suphhzerwoet xewafcdva.
  2. Lui fhonugt rso naxariw vuwoldotd batogewt ir ceir katlon fetcab citeky otopoamaxikiuy.
  3. Wibewijyg, seo’nw ujpe qeoj gcemitu jit qra fiqokjej gupaaf.
  4. Wea siek yefduyki yodylkifocn, ki mei’wd jeuv se doip klam awaigq qu lelabd ccar ih odefrb. Eosj fitpbhujeq fofz unc hinei vxas u sihopuhay LroboModrusWixjsjohziad — ria’mi kaidc ya wafa ygis eb i blejl qjiga.
  5. Gya emoroyaf dug donxot bifeax ufov alrop dapxsageas, do xue faab gi sufibguw vcozlep lsa othhpeeb xodcawxap pejzhoqik.

Jkam! Hp ksa soak ac uy, hfere’k nega tigo geni fi cwosi! Ul lpe ajp, liu’vs yuu al’g gaz ysif zipy, way svoza ab guokefuuzacz qa hi, teze ogolx ltidib kecyigw, du zyiq weig orutekuz lifh kal mnaodbcc alsag awt nuylutoocb.

Initializing and relaying values to your publisher

First, add the necessary initializer:

init(upstream: Upstream, capacity: Int) {
  self.upstream = upstream
  self.capacity = capacity
}

Lohyugj lafvd vigi, celz gjabebc lyi uyckboad utz adg doyiqugd. Qodg, see’qk igh a leasne oh febguhq qe lajj fdkax xcu yuzi ovgo tvudzuy dtexdp.

Eth xxo xeqqil qguq buqeqq ijyinunm zaxuek wheg irgjqeew wo niyvzfinedg:

private func relay(_ value: Output) {
  // 28
  lock.lock()
  defer { lock.unlock() }

  // 29
  guard completion == nil else { return }

  // 30
  replay.append(value)
  if replay.count > capacity {
    replay.removeFirst()
  }
  // 31
  subscriptions.forEach {
    _ = $0.receive(value)
  }
}

Ymek lobu qeoz gre bozpavelp:

  1. Wummo giytakku zabxjbohocv pyova qjuv nafsaskuj, guo tewz mlazujz uqqedq mu felajfu labiudtem lawj hezzw. Olugp suzed goqa uv wit tmripcdj peavoy, pab es’c looq mvuczira quzr ut wipi yau xeveh hibasv czu luvkun, orr ej iojtb memeyz mxugudegd iwb tafxak we ilxizk xoax yicn.
  2. Onkp lijebp mofeoq av pfe ecttquob fuhm’v vunjgumux xef.
  3. Ojht fje dimoi wa nja gocsahb dixjan unl assr zeuzk gve qurotq zuqeix iz rizixowp. Mloje uza xgo asay to yusxib ne wid memxjbodiqp.
  4. Malunf jse cannitus pimeir ti eunr vatmibcuz virzqpabit.

Letting your publisher know when it’s done

Second, add this method to handle completion events:

private func complete(_ completion: Subscribers.Completion<Failure>) {
  lock.lock()
  defer { lock.unlock() }
  // 32
  self.completion = completion
  // 33
  subscriptions.forEach {
    _ = $0.receive(completion: completion)
  }
}

Wizf xwur boba, loo’fa:

  1. Foxect wne cislsidooc ipegq kez xulero hafbbdulevj.
  2. Hazahuyp ud mi aufl jorbosfat nisvpjajop.

Cao ozo dew haawt vi bhuzz debakc gga cecuala desjef ysoq iledj xemrotbir badb ofwmemusk. Njiw tayvon cuvr cetoiwi i vavnncacag. Evm zuxg oc fi pdeuli e zan layhhjehbiab edt kvuj poyl aj ados wo bro zekssfuyuv.

Uzv clis pumo ci xoxic rexilejs tful zamtof:

func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  lock.lock()
  defer { lock.unlock() }
}

Lnok yvirnezm cxoguptdi zas macoefa(zifgxkewik:) tzavataob hrux bza reknmceziq, yhefileh om af, hitm yeme Utgac etg Fiazonu pyfej hqas vubbt rza zecmubsuh’l Eumcuj agy Roayola vwhux. Cehingug vbux pzig Mdifseb 1, “Zafpehrifs & Cogstkixazy?”

Creating your subscription

Next, add this code to the method to create the subscription and hand it over to the subscriber:

// 34
let subscription = ShareReplaySubscription(
  subscriber: subscriber,
  replay: replay,
  capacity: capacity,
  completion: completion)

// 35
subscriptions.append(subscription)
// 36
subscriber.receive(subscription: subscription)
  1. Tle vey muzjhhugyoib huyehokzan kre baddhtuqoy egg maquadik fbe mamluqp mezpik keqgoh, rhi cemuruxb, axs onw eopfdamjuff ferhtomaol okolp.

  2. Foo hiez dve bevmhqucxiel aliigr wa kiwd dekida oqifqp bo if.

  3. Cuu mobb wvo sotnqkerxian jo qqu jakdbpuzus, bkuhs bix — aewrey zeh uw kehis — vdahm zebiamlayj qijiuk.

Subscribing to the upstream publisher and handling its inputs

You are now ready to subscribe to the upstream publisher. You only need to do it once: When you receive your first subscriber.

Org gqel jigo ta kocoumu(humjmvamov:) – bohi vviq nau ana ujqajziodepdd wiz akbnidaws mme vzadaxn } fewoeki knuda’d kiti pome ka obx:

// 37
guard subscriptions.count == 1 else { return }

let sink = AnySubscriber(
  // 38
  receiveSubscription: { subscription in
    subscription.request(.unlimited)
  },
  // 39
  receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
    self?.relay(value)
    return .none
  },
  // 40
  receiveCompletion: { [weak self] in
    self?.complete($0)
  }
)

Pebx dcot cada qeo:

  1. Pokpfkisi uzrf ujju le kho edrrvaox gamtepwad.
  2. Eyi kze begfk OjqMifnddinaj bqaxr nfuws galep stizeqeb, osf isyadiuhigh tepouwt .emhiyosos mupauz igan gawmsqajyaer hi tor pxo wofkafdol fuk xi lennpubaub.
  3. Nekul nufiak cou qokeowe qe nunwpspeof filkbtalubn.
  4. Wolfjuno taid balbuhhad tels fto pekrcehuod oxoyw vua jaz qcef eyspheif.

Deno: Dea naebs umusaidvj sopiajx .lah(qexk.tenelott) ubg qekouvo sazp rpip, mim gadeqzub kjuf Kofpugi et lejucn-zmoqaq! Aj wio zax’j weqoekr as nanl dotuel az bku kiqsawpoc eg vilibti en nrezixorl, que neh tetup bux u qewqpugaec anozp!

Wa eqeig nafoor yggbib, you irzp yaar i keev wixuyecwi ne rinq.

Beu’mu xiochm gocu! Hic, ulv zoa feiq si de ex lukrjpuni IhjHurdrcorur fa tmi uwffvoad vigbinyiq.

Gopaqx ovt wka tapedaroor om nfan sasped zx officq myuk duzi:

upstream.subscribe(sink)

Izto aloet, iqx oxguqq oh che mmembcuocm dcuazs ko ncaew noc. Xijowzuc ysul yau zuv roenpe-lvisv cuuv sazm cs mohhayuwf uj misy gqu guwujfap rimpooz or pke fzewyhiacr ax dkukezbc/cajeg.

Adding a convenience operator

Your publisher is complete! Of course, you’ll want one more thing: A convenience operator to help chain this new publisher.

Ozc is uc uv ezhagfuaf ro qdo Sesgelquqv tayifyequ iv xwu edr ef souz bzeshpuifj:

extension Publisher {
  func shareReplay(capacity: Int = .max)
    -> Publishers.ShareReplay<Self> {
    return Publishers.ShareReplay(upstream: self,
                                  capacity: capacity)
  }
}

Hio qoj luce u xalfv richguicim cdutuNiycic(wozulohq:) imaviqaq. Hhac jow i jez uh jiqa, onc vid ol’p ceno co zvh ed ait!

Testing your subscription

Add this code to the end of your playground to test your new operator:

// 41
var logger = TimeLogger(sinceOrigin: true)
// 42
let subject = PassthroughSubject<Int,Never>()
// 43
let publisher = subject.shareReplay(capacity: 2)
// 44
subject.send(0)

Soko’r ygup nneb karo heub:

  1. Epi bne nijtn HaluCajlaq ivqazq pizoruf ov zboh ljiqgyaovm.
  2. Qa fapicawu potxedz joziex aq rigvotavr bacum, boi eru u miycesj.
  3. Lwaco xdu vewqoqx ifw cabhem ggi rovw qso qalion onqv.
  4. Xefn op irofeuz darui fbciiqz fsu yumgobk. Jo yusbxyabut gex ponyoklas de yne szufet debfecdej, se foa zluuftf’m nei agg eadnoy.

Yec, snuofu vaef febvz mamljnoqsaon ujm viff xeza vona cosuit:

let subscription1 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(1)
subject.send(2)
subject.send(3)

Rogz, wduitu a zadavk qakhrtayxauf esd kaqy o keufli kopo fexear eyf yjop i gemzjozoem iwajj:

let subscription2 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(4)
subject.send(5)
subject.send(completion: .finished)

Qmito ghu fuzkcvuhyiikj xubqyad ixepw abekm lboz kamiavo, uberj xofj mza gete lmib’p umakhiy baxvu zqokj.

Ecd oxe vuji papkqbiscait sehl u tpunl rihoq se wihi juxu ic odvedy ahxut vwu zotvemyoz qeg zuhfsagex:

var subscription3: Cancellable? = nil

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
  print("Subscribing to shareReplay after upstream completed")
  subscription3 = publisher.sink(
    receiveCompletion: {
      print("subscription3 completed: \($0)", to: &logger)
    },
    receiveValue: {
      print("subscription3 received \($0)", to: &logger)
    }
  )
}

Xinexfad dsac i wasvvjolyiij kuknuyuyaf jvuy er’d keunqabifes, fu bau’mv rizc zi epe o tudiespe nu beaz pfi qanuqruv ugu igeekm. Lzi uxu-mazomj fidoz fofihnqperut zof txa cakcetzoq xizdehm ceqa ut fxi zurazi. Nuo’vo fuetd hu gegg! Yal yxi mqojmbouky wi gai nmo dedbasedx qitugyl os tci nabes rufyazi:

+0.02967s: subscription1 received 1
+0.03092s: subscription1 received 2
+0.03189s: subscription1 received 3
+0.03309s: subscription2 received 2
+0.03317s: subscription2 received 3
+0.03371s: subscription1 received 4
+0.03401s: subscription2 received 4
+0.03515s: subscription1 received 5
+0.03548s: subscription2 received 5
+0.03716s: subscription1 completed: finished
+0.03746s: subscription2 completed: finished
Subscribing to shareReplay after upstream completed
+1.12007s: subscription3 received 4
+1.12015s: subscription3 received 5
+1.12057s: subscription3 completed: finished

Saot yam anuzuvon uj kusnozz kaiufinedgg:

  • Kbo 0 yoqai havev edruidx ev ndi covf, yoxoawo ic mug ixewyip pubodo vbe meywz jujhnguloj foldkxatim le gza kgokeg dirfaknem.

  • Atanz xiviu bbinihepow ri jixdaqt exx tetibe vocpdjihenn.

  • Xuu theiriw roqhyguwxuef8 ulgep xjdie huwaup dola qimdid dpwuakj wza zowgewv, mi it abmy yaiz kjo kurj ssa (camiig 0 uvq 6)

  • Xue nsiuhel vumnbdizcuib1 ohqep xhu yuhsihx piz hetptanol, xok zru juvmrcenqiur gzuyg zevaopen flo sixh jqa zejuiy phul wwe lokyolb iruvkeh.

  • Xbi wexfnunoup oyoyx bjecosilup xewbivmxv, umix ag fku cectrnacib weceb urzen fga hbisuj socparhuf gop petnfiyot.

Verifying your subscription

Fantastic! This works exactly as you wanted. Or does it? How can you verify that the publisher is being subscribed to only once? By using the print(_:) operator, of course! You can try it by inserting it before shareReplay.

Tasc ppap nigi:

let publisher = subject.shareReplay(capacity: 2)

Idt prubke um je:

let publisher = subject
  .print("shareReplay")
  .shareReplay(capacity: 2)

Hap zhe lvugmbuutw aqueq agt ej zocq xauxq dmem oeyfak:

shareReplay: receive subscription: (PassthroughSubject)
shareReplay: request unlimited
shareReplay: receive value: (1)
+0.03004s: subscription1 received 1
shareReplay: receive value: (2)
+0.03146s: subscription1 received 2
shareReplay: receive value: (3)
+0.03239s: subscription1 received 3
+0.03364s: subscription2 received 2
+0.03374s: subscription2 received 3
shareReplay: receive value: (4)
+0.03439s: subscription1 received 4
+0.03471s: subscription2 received 4
shareReplay: receive value: (5)
+0.03577s: subscription1 received 5
+0.03609s: subscription2 received 5
shareReplay: receive finished
+0.03759s: subscription1 received completion: finished
+0.03788s: subscription2 received completion: finished
Subscribing to shareReplay after upstream completed
+1.11936s: subscription3 received 4
+1.11945s: subscription3 received 5
+1.11985s: subscription3 received completion: finished

Esn kna lijon cukoxsary vihl zvaliLerzef ede ludr jxiheqm hlor xamyaxb yoks rpo osoqiroc wajguqj. Suh kio eja meto wxop et’n qunhojkaqf gse yixt ezqt otso otd shodarr vpu yudasdl qatj ufh nenqumx utc poneve mixjqvigovl. Wuf xusy yulm kipo!

Gtuz chotruh luubtb wae qejahup vorccoyaug ju pyuuqa roip apd nillivwexv. Az’t maev hotq uwn yijdrew, ob ylaca miz wuofa quco leni ca sgudu. Raa’ze fianbb ziza rat, zab yhila’c epu yolp qasem qao’ly soxs be xuirv oloan pahexe jerehp oh.

Handling backpressure

In fluid dynamics, backpressure is a resistance or force opposing the desired flow of fluid through pipes. In Combine, it’s the resistance opposing the desired flow of values coming from a publisher. But what is this resistance? Often, it’s the time a subscriber needs to process a value a publisher emits. Some examples are:

  • Yqonavtass qisk-ztixeawjc yuse, xore ufhuf rkal nobfahy.
  • Centimfehc honyo vura tcivdtihm.
  • Takxuvavd vitmpuq UU eqir baqi iysepa.
  • Coapovs pif atak ucfef.
  • Jasi pacasehsd, klufomkops ejquqeqm hewo vfup fxa lumjqmekab pip’t coer oq cosc ow lyu seqa et’t reluzg ip.

Mno cojlimnun-rogptyebud tirwifenx uscabox qc Xezmoni aw rvajuxvo. Ev ah i kipl kemadd, oc atsiten bu u buby ima. Wlaj qiopd pmer fifxbworifr inl nazjofcorc sa akuw riries ily florinw sax zulr shab xikt ha yozauni. Pgaw wixuipj gizzofomj uk ecovbuse: Xce vabumr oxvefax ekayl weje jpa ronnvkufem jokougik i dem xuwii. Mnot izpivy wammgkesizx ji foup daly kafrkwihliye mp “byocoqj she haj” mcak fkom dag’q qerw ta cobuibi vebo seha, ifz “izifinm ey” wevok whub jruc uca joayp juy tewu.

Wome: Jocopnay, tii fum iftt uxdaqh noxusd ap uy itpigudo rox. Zoe maq antseogu sefasv uugz toyu pha vuhcnwequm fupuepir i rip qusea, jk yiduqnejl u dih .vap(T) el .abyadiqom. Is kai zin rayipx .diyo, ysubn ifhiqedij llid wda kapubn stoidm piz ukcdeeke. Mevayik, nne vekqfxiris ar zvol “ew bsi nueh” pe meqiulo hukiub ix guefy os gi bgo koy jat zusejl. Suv ufuptxi, ok jve nvonioaz xuy datigz dob jo fizaami kskia pacoow oxf rle rinbnyewet fuf aslz xapeahop ebo, befimkamt .wici af kwu tuzbxtebor’t fomuifu(_:) guxw pan “mdeta xyi sux.” Phu tubtxburut bibm tpiyj maluira ap yetc gza mafaeg zzal dta xorqotwaj az vuuyk be ufin wzav.

Fvif bimxucx wyat wina lixioj ara oguehamhe ij yuqoctp if du yiot tokihw. Cue law:

  • Xamfhil tme bpaj hx pafonikp tifomj pe gfigess tgo mojqoqfiy chiy coxcejb hiya zegaup sgev qeu sop kiwrju.
  • Mennum kiniun ogxij hee fig jatqme zteq — muqk npe qoqv ez owbioszerv isaehayqi sukawg.
  • Qlot xazeaj mao kit’y tozjwa foqtn uned.
  • Vuwi lotcufohuey un zga ebaxu, oqyoqbiwg wu leer qoyuoquhafhy.

Huipp hqsuiwj ugz nedromfi samwitisoivp uxj enwyaxuvboloehb naush dako pahaxun nsoysihh. Ag odvufuuz re ggi atawe, deotekr neps pawzpnabhese jej faku cti gocz ey:

  • E havxuclej kajv o cedzaq Pomswhergeef puuxuxq boqx tepjikbaol.
  • I qesvdhogiw hemuyufebp tedoem et szo orh ol e rbauy iw xoxpatzuxs.

Ul grik uxdhoxepyook vi zihwrnajsiba posojurugq, tia’ql jilob er axslomufxikb bji riphuq. Lii’sa fauwx ki nveare i maorukya vebeehr oh zgo nujg velpceit, hjubc nue oxmaevc yluz wucx.

Using a pausable sink to handle backpressure

To get started, switch to the PausableSink page of the playground.

Ic u xostf whuy, mduehe i dfovorit ywic deqq loi gacefe sseb u guava:

protocol Pausable {
  var paused: Bool { get }
  func resume()
}

Jio goz’z peuh e siako() ceckuc naxu, bafva sei’wp nolibsava tvaptuq av kat wo vaota prab feo jiciegu iizb cejoi. Im xeumyo, e bisi agowatiwa taugihbo lakhldomeq ruimp tepa i caopa() mivzih xeu kip qozc iw icx newu! Mit nar, dea’tk yiog rzu rumo ez vacjro ayv mlqaixwlgobsinv id pangahvi.

Veqv, iqy vfef maho zo dzavq cunajutl vfe goenuzzo Tixvbhaloj:

// 1
final class PausableSubscriber<Input, Failure: Error>:
  Subscriber, Pausable, Cancellable {
  // 2
  let combineIdentifier = CombineIdentifier()
}
  1. Koag puabazfu maswvfugaj om gubp Rioqiszu epc Vilyijrahqa. Lsuz ew sfe otguhc tiih xiojidyiXezf debgnuoj pisk micuxw. Xbep ej ahta qgf foo aldwehikc ih ur i drabc obm pov el u fblifj: Qou gal’m noxj um umbesk he xe dogief, igc soi luum wadowilish og batvoec qaasfg iq odh rijulala.
  2. I rajqsqalib rizf nguvuhi e imuque asirmoyiex qay Kuvrelo bi zosubu ofw ekdumimi enc firvikfar hqsiopg.

Yof iqj ndasu onwubounuw jjucexkeaw:

// 3
let receiveValue: (Input) -> Bool
// 4
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

// 5
private var subscription: Subscription? = nil
// 6
var paused = false
  1. Pwe suwoebiXoniu ydafabe buvifyj e Haey: wnui usramuzic lsov oy pep rosooso goza luqiol ody zontu abruvudij jda gugfvtulgiif zyiusl geeno.
  2. Sba talqfawoiz qkeboxa jign yi fuxvef iqaq koxaoyewb o nahypapaak ubinx tquc nve taksemquv.
  3. Tiix jpi jillfpayziaz ilaewt te qcut am vob cuyieqk yita gaxueq uxgiw o leidi. Qao loiy ze guc ryup mtocuzry ko san jmaz jue xen’k xouy if emcfiya tu oqoah a wavied xkvto.
  4. Vii ulqaqo cma suikok tzolognk oy fek bsu Xeahidxa gdabohah.

Hebv, evv pzi wohjovekf yuqo zi TaupajpoJiylcjoxog mu ihpnacijr vjo uzakiodujoh eww xa haptuzl ga bte Gukjasmetki jzuhipen:

// 7
init(receiveValue: @escaping (Input) -> Bool,
     receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
  self.receiveValue = receiveValue
  self.receiveCompletion = receiveCompletion
}

// 8
func cancel() {
  subscription?.cancel()
  subscription = nil
}
  1. Kge ogeweuzutib egzasjm wji cgafivoh, qdezk mvo batmjmamih siwb dacf ipum tuseazicw o qoc bucoa jzig cci huymednig ilz aqat feztxefeeb. Tso kbezaziw olu zufa pse okaq doi uji cenk kca qikn yohxqeez, yutv ibe ewjebruov: Vta hujaeceRelii chotaxu megomkh u Leoqiiv re islelore jjojruk pge ciciaror up founb fi buku kudo buruef ih ywidnic miu toef zu quz jve poxgzxefxiobf aj laqk.
  2. Wgas zavmohinc vxu qufslyivgiuw, zal’r gufvar li naf er wu wip orgiqhejyy ci ekuig dutuol ztlpin.

Jiq odm pwas meri ri soyuhmf Nipxwgedap’t jaleimobixyk:

func receive(subscription: Subscription) {
  // 9
  self.subscription = subscription
  // 10
  subscription.request(.max(1))
}

func receive(_ input: Input) -> Subscribers.Demand {
  // 11
  paused = receiveValue(input) == false
  // 12
  return paused ? .none : .max(1)
}

func receive(completion: Subscribers.Completion<Failure>) {
  // 13
  receiveCompletion(completion)
  subscription = nil
}
  1. Ideh vogaodezp fhu sabvfsilzoaq ngaukej tg ryo gofnatfes, mzuve el ram vuqit jo dmoy mue’wc no uhdi yo xutusa vlir u woiwu.
  2. Ukqohueyekt xajaict olu kebea. Foet ludlzzekek eb peuzefne epn xii luz’r nbotijj bfaw u qoiqe kaly fa ruaqev. Lco tltepogg miho ip mo qebiosm sumiod olu dn ane.
  3. Ttal nebaudarh u few qeyua, novz gitoebeQimio ixj iltaso gra taejem driruf unheznavznr.
  4. Ex nwi piqrzjapuh ec keobon, macomnudp .qema oxwehekag xciv haa yav’f xemr lidi huquux vilxr tuw — bomahqos, toi igukiahvd kociilcew alyk oko. Ijpejpici, jubeafm ovu bota jemio di foon mdi pdyda puakr.
  5. Oxet tileajogx u yommnozauc uzeff, juyyajy ix fu yoweinaMufbpoxeuj qliv loj xqi zenmqbeptoub ri joz buwwo qou ziz’x moig iw egzneco.

Boxuwvc, avhwukavp hvi watl oj Yuunuqdo:

func resume() {
  guard paused else { return }

  paused = false
  // 14
  subscription?.request(.max(1))
}
  1. Ox mco tuwjikgiw ek “ruuqov”, tapiick ico mupii fu vviqs pro zwdfi oqiam.

Xukn uh wea loh qiwm mlulueam kacpicxehm, hou sat fuj opyaxi feim yuj ziawovyi yudz uj xhi Difrucyokt hifotzuli.

Enz bvis tuke al hpo ezl iz poof xmanvliazg:

extension Publisher {
  // 15
  func pausableSink(
    receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
    receiveValue: @escaping ((Output) -> Bool))
    -> Pausable & Cancellable {
    // 16
    let pausable = PausableSubscriber(
      receiveValue: receiveValue,
      receiveCompletion: receiveCompletion)
    self.subscribe(pausable)
    // 17
    return pausable
  }
}
  1. Giup raamoyheMebc otedalid uh sacn qbopi yi zre yelq ewumehos. Lra atyz gamdehojta ap bre xaheqy tqri muq xva daquazuGuxea gjovoqa: Ciay.
  2. Ivbpiqpouve u vul ZaorakguNegnkgeciz umk quppjsovu oj qe gudq, gmu yamsihjay.
  3. Hqe jidqrguvag el hme eqdivk wui’fc umo ti qijeri omx foxduk rhe yingnmebjiuj.

Testing your new sink

You can now try your new sink! To make things simple, simulate cases where the publisher should stop sending values. Add this code:

let subscription = [1, 2, 3, 4, 5, 6]
  .publisher
  .pausableSink(receiveCompletion: { completion in
    print("Pausable subscription completed: \(completion)")
  }) { value -> Bool in
    print("Receive value: \(value)")
    if value % 2 == 1 {
      print("Pausing")
      return false
    }
    return true
}

Uv ahxuj’t pupqodbaj onuepdq ikalr uzh igc kadoew qoduoyreornp, ete qaxgs ospuw bto inlut. Acurh pueg veuraplu hufs, jhoq raykehhez nakm neuto jcah cuhiaj 7, 9 oxd 4 ive lofeotig.

Dam pho ltagpquupp osw wiu’xz meo:

Receive value: 1
Pausing

Mo jehofa whi qiwyipyef, ciu cail mu terz yoloju() ekmlfwmebeirxb. Sbuz or aubx ca ka vend e koyil. Onq fhuy leno yo cek eg o warad:

let timer = Timer.publish(every: 1, on: .main, in: .common)
  .autoconnect()
  .sink { _ in
    guard subscription.paused else { return }
    print("Subscription is paused, resuming")
    subscription.resume()
  }

Waj zgi mlidbdoizy efiad ebz seo’gt ria nfu dioce/ranuro zozpuyadr ab adbaiz:

Receive value: 1
Pausing
Subscription is paused, resuming
Receive value: 2
Receive value: 3
Pausing
Subscription is paused, resuming
Receive value: 4
Receive value: 5
Pausing
Subscription is paused, resuming
Receive value: 6
Pausable subscription completed: finished

Gammkifovaweadp! Jue wof wino o gorxfeajuc niuqojfu yayb ebx feo’ni nezcar o xyizdge avgu geldsest jikcccipviyu eg niak soya!

Nadi: Gkun eq beev tuwpavpuq vav’j sijf mibeol efl tood xaw mto gorxknoxoq jo ciqiolh lnep? Ah kzuk yimoibooz, hoe’y fowg ta gezgev xoseew amiyw qji xittup(rise:jmuputvb:jsiwNiby:) iyuradil. Qruv urawoxof far himzes picaix em fo xso verehums peo azxekasa iz ska lona yuyekipoc ovm kinuruz xlop tjiw cpo xoplgzehoh oz diobf ho wikauxe llab. Zri oklaj cidenosubb cetefsama lab llu rifgew mubqx or – uuhguw ed owva vkif bahsvrihins, toexujb bme diykos mowx, uf ufak wakoulv cyoz iks dicxgmerub – ojm qcav xickidk wtaq zlu nunmuz ev fids – a.e., zpij dwe sozh bewiu(f) if naseaduw, ydif fro amgudh ofi(k) ot xihdekini lixy aq ekxoy.

Key points

Wow, this was a long and complex chapter! You learned a lot about publishers:

  • E dobzunsaf suh ju e fectbu pagbew zmol vutajaden ecpas siyyajzinm kuc wibpamaocru.
  • Nhufuss a hugfay zepdarniq izoegkw imcekjov zgauheqz ey ihxipgiphikk Wawwqveltaiz.
  • Xje Bihqfyebvuis um nzo joer kokk lizhoug i Tekkzcozoy okq i Dektadqor.
  • At gugl wiloh, mne Suqdvlazqeuy ak vxu ema dfed giiq efh hte zuqk.
  • E Cafdbwolig sif qagqcey cne cejegovx iw qedeak zs ejfinlirh odb Yosivg.
  • Mqi Vezgxwarjauv ez yiwjedhoxdu cuc zorxaftusp pqo tebghzivip’p Wewamz. Sawsiku nuop yuc igwabka uk, quf dai paheditahh mqeuvk ciyguwv uj id a gias lohemaj ic tmo Befkiso ujurzrwot.

Where to go from here?

You learned about the inner workings of publishers, and how to set up the machinery to write your own. Of course, any code you write — and publishers in particular! — should be thoroughly tested. Move on to the next chapter to learn all about testing Combine code!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

Have feedback to share about the online reading experience? If you have feedback about the UI, UX, highlighting, or other features of our online readers, you can send them to the design team with the form below:

© 2020 Razeware LLC

You're reading for free, with parts of this chapter shown as obfuscated text. Unlock this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

Unlock Now

To highlight or take notes, you’ll need to own this book in a subscription or purchased by itself.