This repository is no longer maintained.
A set of types and protocols that build from atomicity (Atomic
) to futures (Future
) to Reactive Streams. Futures are built on to of Atomic
and Reactives Streams on top of futures. All three are useful when writing concurrent programs and the art of sucessfully writing concurrent programs is choosing the most suitable abstraction for the problem. When writing a new concurrent program it is suggested that you start with Reactive Streams, since these are the easiest to use, and only if there are problems consider the other abstractions.
The easiest way to use these types and protocols in your own code is to clone this project from GitHub inside Xcode 9 and then drag the relevant swift files into your project. The files are only small so having a copy shouldn't be an issue (however you will need to manually update). If you just want atomicity then you just need Atomic.swift
, for futures Future.swift
and Atomic.swift
, and for reactive collections you need ReactiveCollection.swift
, ReactiveCollectionBaseClasses.swift
, ReactiveStream.swift
, Future.swift
, and Atomic.swift
.
The file ReactiveStream.swift
just contains the protocols etc. to define a Reactive Stream and can be used to build an implementation of Reactive Streams, one such implementation is ReactiveCollection.swift
,
Provides atomic access to a value; you can get
, set
, update
, and mutate
the value. To update a value you supply a closure to update
that accepts the old value and returns the new value; the update
method also returns the new value from the closure. To mutate in-place a value, rather than replace (which update
does), you supply a closure to mutate
that accepts an inout
value. Calls to get
, set
, update
, and mutate
can occur is any order and from any thread and are guaranteed not to expose partially updated values. Calls to get
, set
, update
, and mutate
block until they have completed.
mutate
can be used as a lock as well as providing atomicity, e.g.:
let lock = Atomic<Void>(()) // `()` is `Void`'s literal.
…
// In thread 1
lock.mutate { _ in
…
}
…
// In thread 2
lock.mutate { _ in
…
}
The threads will block until the other has finished because they are sharing lock
.
get
and set
can be used to ensure visibility accross threads (volatile
keyword in other C like languages), e.g.:
let volatile = Atomic<T>(initialValue)
…
// In thread 1
volatile.value = … // Write to volatile.
…
// In thread 2
let value = volatile.value // Read from volatile.
Thread 2 will see the changes made by thread 1.
Atomic
is a class and therefore instances would normally be declared using let
(which can seem odd since they obviously mutate!).
See AtomicTests.swift
for examples.
A future allows control and monitoring of a background task that returns a value (though the value may be a Void
, i.e. ()
). You obtain the value of a future using wait
(which will timeout), you cancel a future with cancel
, and you can find out their status using status
(which is primarily for debugging and is one of .running
, .completed(result: T)
, or .thew(error: Error)
).
You typically type function arguments and returns as Future
, so that any of the more specific types of future can be supplied. Most commonly you create an AsynchronousFuture
which is a future that evaluates asynchronously on a specified queue (defaults to global default queue which is concurrent) and with a specified timeout (defaults to 1 second). An AsynchronousFuture
is given a closure to execute in the background that accepts a termination test argument (try testTermination()
), can throw, and returns the future's value. testTermination()
is automattically tested before and after the closure is run, but it is up to the programmer to test whilst the closure is running.
The future's timeout limits how long wait
will wait (block) from when the future was created and therefore breaks and/or detects deadlocks. If the future timeouts, is cancelled, or throws then wait
will return nil
, otherwise it will return the future's value. The future's status
is only updated after the closure has run, however wait
reflects timout and cancel whether the closure is still running or not. wait
returning nil
can be used to provide a default value using the Nil-Coalescing Operator (??
).
The futures frameworks also includes an extension on Thread
to allow easy running on the main thread for UI updates. EG:
func getFromWeb(url: URL) throws -> Future<String> { ... }
func updateUI(_ s1: String, _ s2: String) { ... }
...
let cancellableUIAction = AsynchronousFuture { isTerminated -> Void in
let future1 = getFromWeb(url: address1) // These two lines run in parallel (if > 1 core).
let future2 = getFromWeb(url: address2)
let s1 = future1.wait ?? defaultS1 // `wait` returns `nil` on error/timeout/cancel, hence default.
let s2 = future2.wait ?? defaultS2
try isTerminated() // If cancelled there is no update to do.
Thread.executeOnMain {
updateUI(s1, s2) // Does not block main thread because all waiting in background.
}
}
...
cancellableUIAction.cancel() // Wired to cancel button on UI.
An existing API that uses a completion handler (common in Cocoa) can easitly be converted into using a Future
. Suppose in the above example getFromWeb
was written using a completion handler:
func getFromWeb(url: URL, completion: (_ result: String?, _ error: Error?) -> Void) { ... }
Then this can easily be converted into a Future
:
func getFromWeb(url: URL) -> Future<String> {
return AsynchronousFuture { () -> (String?, Error?) in
var resultError: (String?, Error?)
getFromWeb(url: url) { // Call the original completion handler version.
resultError = ($0, $1) // Store its result.
}
return resultError
}
}
A further feature of Future
s is that they only timeout when wait
is called. A design pattern used with futures is the completable future, this is accomplished using Future
via a zero timeout. A completable future lets you override the result if the future hasn't completed regardless of how long it has had to complete. A typical use case might be:
let f = AsyncronousFuture(timeout: .seconds(0)) { isTerminated -> String in // Note zero timeout.
// Get or calculate text.
}
// Stuff that would take some time goes here.
let s = f.wait ?? defaultText // Because timeout is zero `wait` never waits.
The above is a completable future because wait
returns instantly with either nil
if the future hasn't completed or if the future threw or with the value if the future completed, therefore without waiting you recieve either the completed value or the default (i.e. a completable future).
A future may be a continually running background task and therefore have no value; in which case wait
would not be called and hence timeout would be ignored, it can however still be cancelled. EG:
let backgroundTask = AsynchronuousFuture { isTerminated -> Void in
while true { // Runs until cancelled
try isTerminated() // Test for cancellation
... // Background task
}
}
...
backgroundTask.cancel() // Background task runs until it is cancelled.
Futures are classes and therefore instances would normally be declared using let
(which might seem odd because they mutate) and they are also thread safe and therefore can be shared between threads. Futures are also used to extract values from Reactive Collection classes, which execute in the background, see Reactive Coillection below.
See FutureTests.swift
for examples.
Reactive Steams are a standardised, Actor like, way to transfer items between asynchronous tasks; they are widely supported in many languages and frameworks and therefore both general and detailed descriptions are available:
Reactive Streams are dynamic (can be reconfigured on the fly), can signal errors, can be cancelled, and use pull requests to control the number of items transferred. The terminology used is:
- Back pressure: A subscriber controls the flow of items from a producer by requesting a number of items from the producer, if it stops requesting items then the producer must stop producing items (this is termed back pressure).
- Complete: When a flow finished normally (without cancellation or errors).
- Cancel: When a flow is cancelled.
- Error: When a flow finished abnormally and signals an error.
- Item: What is transferred from a producer, optionally via a processor, to a subscriber.
Processor
: Represents a processing stage, which obtains items from an upstream producer, processes these items, and supplies the processed items to a downstream subscriber (i.e. a processor is both a producer, for downstream subscribers, and a subscriber to upstream producers).Producer
: Provider of a potentially unbounded number of sequenced items, producing them according to the demand received from its subscriber(s).- Request: Made by a subscriber, via its subscription, to a producer to produce
n
more items. Subscriber
: Subscribe to a processor and receive from the processor a subscription, using this subscription the subscriber controls the flow of items from the producer to the subscriber.Subscription
: 'Contract' between a producer and subscriber for the supply of items, in particular the subscription regulates the rate of flow of items and signals completion, errors, and cancellation.- Terminate: Any of: completion, cancellation, or error.
The Reactive Stream standard defines just four protocols: Processor
, Producer
, Subscriber
, and Subscription
, which in turn define just seven methods. The methods are described in the Reactive Streams Specification. A summary of how the protocols and methods interact and in what sequence is shown diagramatically:
In practice Reactive Streams are easy to use because their action is largely automated; the programmer declare instances of producers, processors, and subscribers and then arranges for the processors to subscribe to the producers and the subscribers to subscribe to the processors. Using the Reactive Collection Library, described below, the overview diagram referenced above is:
producer ~~> processor ~~> subscriber
The interactions shown in the diagram occur automatically once subscriptions are established, via the ~~>
operator.
The implementation in Swift of these protocols faithfully follows the Java specification, but with Swift naming and type conventions, e.g. in Java the standard specifies:
interface Subscriber<T> {
void onError(Throwable t);
...
}
and in Swift this becomes:
protocol Subscriber {
associatedType InputT
func on(error: Error)
...
}
This is similar to how Objective-C and C APIs are 'translated' when imported into Swift.
On top of the specification's protocols the library provides implementations of processors, producers, and subscribers with their associated subscriptions. The reactive collections run asynchronously and provide locking and background threads without the need for the user of the library to deal with threads/locking manually; but they are not themselves thread safe since it makes no sense to share them between threads, you use them instead of threads! These implementations are styled after the standard Swift Collection Library, in particular Sequence
, for example there is a ForEachProducer
and a ReduceFutureSubscriber
:
-
The arguments when creating instances of these classes mimic the arguments to the methods from
Sequence
, e.g.ReduceFutureSubscriber
accepts ainto
argument into which the reduction happens and a reduction closure that reduces the stream of items to a single item. -
Like the Swift Collection Library the action of these classes is specified using a trailing closure, e.g.
ReduceFutureSubscriber
's trailing closure accumulates the results. -
There is a logical naming convention going from most important to least important part of the name, left to right, of
[<SwiftCollectionName> | <Other>][Future]?[Seeded]?[Producer | Processor | Subscriber | Forker | Joiner]
, where:<SwiftCollectionName> | <Other>
: The method/ptotocol name of the nearest equivalent in the Swift Collection Library (e.g.forEach
fromSequence
forForEachProducer
) or another name if nothing is suitable (e.g.ItemTimeoutProcessor
).Future
: If the class is also aFuture
; e.g.ReduceFutureSubscriber
which gives access to the result of the reduction using future's interface, in particularwait
.Seeded
: Is appended to the name if the constructor has aninitialSeed
argument that is not present in the equivalent Swift Collection method. The seed is used as working storage for the trailing closure and is passed in as aninout
parameter. EGIteratorSeededPublisher
passes its seed to itsnextItem
closure, the equivalent in Swift's Collection Library isIteratorProtocol
which would be implemented in astruct
/class
and the implementation would provide the storage instead of seed. 'Seeded' is styled after the Swift Collectionreduce(into: initialResult) { ... }
method, where theinto
argument is in this case both the seed and the final rersult.Producer | Processor | Subscriber | Forker | Joiner
: Describe the role of the class. Producers are at the start of a flow, processors in the middle, and subscribers at the end. Typical usage isproducer ~~> processor ~~> subscriber
.Forker
s are processors that can have mutiple output subscriptions.Joiner
s are the reverse of forkers and join multiple streams into one, they are publishers that have multiple input subscribers.
Reactive Collections are easy to use, since the client programmer makes instances of the Reactive Collection classes and then joins these instances together using the subscribe
method or ~~>
operator (see below). The other methods defined by the Reactive Stream API are not used by the client programmer; but by the library automatically. Some Subscriber
s also extend Future
and the wait
, cancel
, and status
methods from Future
provide client interaction. In particular wait
gives access to the value of the subscriber, if any, and waits for the subscriber to complete.
To simplify connecting producers, to processors, to subscribers the operator ~~>
is defined; that is two tildes (not minus) followed by greater-than. This was chosen because the tilde looks like an 's' on its side and the operator establishes a subscription, because the tilde is wavy and therefore represents dynamic flow, and because the greater-than indicates the direction of flow. The operator ~~>
is prefered over method subscribe
because it habituates the programmer away from method calls which as stated above are mainly not for programmer use.
Hello World using this library is:
let helloWorldPublisher = ForEachPublisher(sequence: "Hello, world!")
let helloWorldSubscriber = ReduceFutureSubscriber(into: "") { (result: inout String, next: Character) in
result.append(next)
}
helloWorldPublisher ~~> helloWorldSubscriber
let helloWorldResult = helloWorldSubscriber.wait ?? "Failed!"
Note how the arguments to ForEachProducer
and ReduceFutureSubscriber
mimic those to similarly named methods in Swifts Sequence
protocol, how Subscriber
's ~~>
is evocative of the process that is occurring, and how Future
's wait method clearly marks were the concurrent processing stops.
Publisher
s produce items in the background via Grand Central Dispatch (GCD) queues and the items are passed to and processed by subsequent stages in the thread processing the queued production task, i.e. helloWorldPublisher ~~> helloWorldSubscriber
runs asynchronously. The production of items per task can be specified by the subscribers requestSize
argument, since it is the subscriber that requests items to be produced. The queue that a producer is to use can be specified when constructing the producer.
Typically you would have intermediate stages in a calculation, Processor
s that take an input and produce an output (these are similar to Sequence
's map
and filter
methods). The Monte Carlo method of approximating Pi estimates the ratio of the area of a square to the area of an arc. Consider a square piece of paper 1 by 1, i.e. both x and y ordinates run from 0 to 1, with an arc drawn with centre at (0, 0) from (0, 1) to (1, 0), i.e. it has a radius of 1. If darts are randomly thrown at the paper then approximately the ratio of arc area / square area is the number of darts inside arc / total number of darts. From which Pi can be approximated as 4 times the area ratio. Using the Reactive Collection Library this is:
let maxRandom = Double(UInt32.max)
let randomCoordinatePublisher = IteratorSeededPublisher(initialSeed: ()) { _ in
return (Double(arc4random()) / maxRandom, Double(arc4random()) / maxRandom)
}
let piEstimatorProcesssor = MapSeededProcessor(initialSeed: (0, 0)) { (seed: inout (Int, Int), coordinate: (Double, Double)) -> Double in
var (total, inside) = seed
total += 1
let (x, y) = coordinate
if x * x + y * y <= 1.0 {
inside += 1
}
guard total < 14_000 && inside < 11_000 else {
throw SubscriberSignal.cancelInputSubscriptionAndComplete
}
seed = (total, inside)
return 4.0 * Double(inside) / Double(total)
}
let lastValueSubscriber = ReduceFutureSubscriber(into: 0.0) { (old: inout Double, new: Double) in
old = new
}
randomCoordinatePublisher ~~> piEstimatorProcesssor ~~> lastValueSubscriber
let estimatedPi = lastValueSubscriber.wait ?? Double.nan
Note how the processor, piEstimatorProcesssor
sits between the publisher, randomCoordinatePublisher
, and the subsciber, lastValueSubscriber
. The publisher generates an infinite stream of random coordinates. The subscriber memorizes the last value, ad-infinitum. The intersting code is in the processor which:
- Estimates Pi as described above.
- Is a
map
much likeSequences
'smap
, however it is also seeded which allows it to keep track of the total number of coordinates and the number inside the arc between each call to its mapping closure. - Terminates the estimation when sufficient number of total points and points inside the arc have occurred (both the publisher and the subscriber run indefinitely).
Termination can be achieved by a subscriber or as in this case a processor by throwing SubscriberSignal.cancelInputSubscriptionAndComplete
, which as the name suggests terminates the input stream by cancellation and the output stream by completion. Publishers, like helloWorldPublisher
from the 1st example, terminate streams by calling the onComplete
method of their subscriber (which for ForEachPublisher
occues when the sequence's iterator's next method returns nil
). (The Reactive Stream specification only allows input subscriptions to cancelled and output subscribers to be notified of completion, this throwing of SubscriberSignal.cancelInputSubscriptionAndComplete
to allow subscribers and hence processors to signal completion is an extension provided by the Reactive Collection Library.)
See ReativeCollectionTests.swift
for examples.
Typically you use the sequence like classes, IteratorSeededPublisher
, ForEachPublisher
, ReduceFutureSubscriber
, etc., from the Reactive Collection Library. However an alternative to these are the base protocols/classes provided by the Reactive Collection Bases Library. The protocols etc. in this file have internal
access deliberatly because they are subject to the most change and unlike ReactiveCollection there is no intension of a stable ABI.
These protocols/classes:
-
Simplify writing your own Reactive Stream implementations.
-
Can be used as an alternative to the sequence like classes by inheriting/subclassing.
-
These classes are the base protocol/classes for the classes provided by the Reactive Collection Library described above.
-
Are abstract protocols/classes and require implementing/sub-classing, see description of protocols/classes to see which methods require implementing/overridding. Swift 4 does not have the concept of an abstract class and therefore default implementations for classes throw a fatal error. Also there is no concept in Swift 4 of protected access therefore the methods to overrride in classes have
open
access. When using a sub-class of these base clases it is safer to use the operator~~>
and not call the methods directly since this will prevent the error of calling a method that ideally would be protected accidently. -
There is a logical naming convention going from most important to least important part of the name left to right, similar to that of the Reactive Collection Library described above, of
[<SwiftCollectionName>]?[Producer | Processor | Subscriber | Forker | Joiner][Future]?[Class]?[Base]
where:<SwiftCollectionName>
: The method/protocol name of the nearest equivalent in the Swift Collection Library (e.g.Iterator
fromIteratorProtocol
forIteratorProducerBase
), or nothing if the class is not specialised in and way (e.g.SubscriberBase
).Producer | Processor | Subscriber | Forker | Joiner
: Describe the role of the class. Producers are at the start of a flow, processors in the middle, and subscribers at the end. Typical usage isproducer ~~> processor ~~> subscriber
.Forker
s are processors that fork a single input stream into multiple output streams andJoiner
s are the reverse, processors that join multiple input streams into one output stream.Future
: If the class is also aFuture
; e.g.FutureSubscriberBase
which gives access to the result of the subscription using future's interface, in particularwait
.Class
: If the base is a class rather than a protocol then the name hasClass
as its 2nd last element, e.g.IteratorProducerClassBase
. (These would be abstract classes in other languages, but Swift doesn't have abstract classes. Similarly default implementations of methods that must be overridden throw a fatal exception because there are no abstract methods in Swift.)Base
: All the names end inBase
to indicate that the protocol/class is 'abstract' and requires implementing/sub-classing.
Where these protocols/classes introduce new methods/properties/types their names begin with
_
; treat these as protected, i.e. do not call/assign them - they are part of the library. (Swift doesn't have the concept of a protected method/property/type. Similarly if the method is in a protocol there is no way to mark it as final, therefore read the documentation carefully to decide if it is suitable for overridding. Conversely if the method is defined in a class there is no way to mark it as abstract and so methods that would be abstract throw a fatal error.)Methods whose name begins
_handle...
are often overridden in other derrived protocols and classes. The marking_handle
is used to indicate that a derrived type should consider overridding, like all_...
methods they should be considered protected.
- If a subscriber cancels its subscription, the producer keeps producing, and the subscriber subscribes to another producer whilst the 1st is still producing, then it will recieve items from both producers! (The Reactive Stream Specification allows producers to keep producing post cancellation.) Whilst it would be possible to fix this, it would be a noticable performance overhead and therefore this option of items from multiple subscriptions is chosen as the 'lesser of the evils'! See
testKeepProducingRequestSizeItemsAfterCancel
inReactiveCollectionTests.swift
for an example.
-
Add
AllItemsForker
andAllItemsJoiner
to enable:randomCoordinate ~~> forker forker ~~> countTotal ~~> joiner forker ~~> filterInside ~~> countInside ~~> joiner joiner ~~> piEstimator ~~> rememberLast forker.fork() joiner.join() let piEstimate = rememberLast.wait ?? Double.NaN
-
Tagged request reply with timeout,
TaggedRequestReplyProcessor
- name? How does it fit with naming convention. Mateches a tagged request to a (tagged) reply and if no reply within timeout supplies a default reply and disguards any subsequent reply. Handles multiple 'in-flight' requests by keeping track of tags. Tags must be unique. -
Bidirectional streams/flows, e.g. simulating international, credit card transactions at Point Of Sale (POS) terminals:
let kyd = Currency(oneUSDIs: 0.82) let pab = Currency(oneUSDIs: 1.00) let chf = Currency(oneUSDIs: 0.97) let currencies = [kyd, pab, chf] let masterCard = CardAssociation(currencies) let visaCard = CardAssociation(currencies) let caymanBank = Bank(currency: kyd, association: masterCard) let panamaBank = Bank(currency: pab, association: visaCard) let swissBank = Bank(currency: chf, association: masterCard) let banks = [caymanBank, panamaBank, swissBank] let donaldsCard = Card(limit: 1_000_000, bank: caymanBank) let sarahsCard = Card(limit: 1_000, bank: panamaBank) let vladimirsCard = Card(limit: 1_000_000_000, bank: swissBank) let cards = [donaldsCard, sarahsCard, vladimirsCard] let posTerminals = (0 ..< 2 * banks.count).map { _ in // Two POS terminals per bank. POSTerminal(cards) } var posTerminal = posTerminals.makeIterator() for bank in banks { [posTerminal.next(), posTerminal.next()] <~~> bank.posTerminal // Two terminals each bank. bank.cardAssociation <~~> [masterCard, visaCard] // All associations each bank. bank.otherIssuer <~~> banks.filter($0 != bank).map($0.transactionValidation) // All other banks each bank. } for posTerminal in posTerminals { posTerminal.wait // Wait for each POS terminal simulation to finish. }
-
See if
DispatchWorkItem
would be a better implementation forFuture
? -
Is it worth providing non-seeded versions of
map
etc.? -
Is it worth having a
flatMapSequenceSeededProcessor
, whose closure returns a sequence that is then flattened? -
Is it worth providing a periodic iterator?
-
Reactive UI interfaces?
-
Reactive HTTP interfaces?
-
Reactive Codable interfaces?
Copyright © 2017 Howard Lovatt. Creative Commons Attribution 4.0 International License.
The full licence is here and an easy to follow summary here.