This page was generated by Text::SmartLinks v0.01 at 2014-11-24 17:01:06 GMT.
(syn 780f49e)
  [ Index of Synopses ]

TITLE

Synopsis 17: Concurrency

VERSION

    Created: 3 Nov 2013
    Last Modified: 29 October 2014
    Version: 23

This synopsis is based around the concurrency primitives and tools currently being implemented in Rakudo on MoarVM and the JVM. It covers both things that are already implemented today, in addition to things expected to be implemented in the near future (where "near" means O(months)).

Design Philosophy

Focus on composability

Perl 6 generally prefers constructs that compose well, enabling large problems to be solved by putting together solutions for lots of smaller problems. This also helps make it easier to extend and refactor code.

Many common language features related to parallel and asynchronous programming lack composability. For example:

In Perl 6, concurrency features aimed at typical language users should have good composability properties, both with themselves and also with other language features.

Boundaries between synchronous and asynchronous should be explicit

Asynchrony happens when we initiate an operation, then continue running our own idea of "next thing" without waiting for the operation to complete. This differs from synchronous programming, where calling a sub or method causes the caller to wait for a result before continuing.

The vast majority of programmers are much more comfortable with synchrony, as in many senses it's the "normal thing". As soon as we have things taking place asynchronously, there is a need to coordinate the work, and doing so tends to be domain specific. Therefore, placing the programmer in an asynchronous situation when they didn't ask for it is likely to lead to confusion and bugs. We should try to make places where asynchrony happens clear.

It's also worthwhile trying to make it easy to keep asynchronous things flowing asynchronously. While synchronous code is pull-y (for example, eating its way through iterable things, blocking for results), asynchronous code is push-y (results get pushed to things that know what to do next).

Places where we go from synchronous to asynchronous, or from asynchronous to synchronous, are higher risk areas for bugs and potential bottlenecks. Thus, Perl 6 should try to provide features that help minimize the need to make such transitions.

Implicit parallelism is OK

Parallelism is primarily about taking something we could do serially and using multiple CPU cores in order to get to a result more quickly. This leads to a very nice property: a parallel solution to a problem should give the same answer as a serial solution.

While under the hood there is asynchrony and the inherent coordination it requires, on the outside a problem solved using parallel programming is still, when taken as a whole, a single, synchronous operation.

Elsewhere in the specification, Perl 6 provides several features that allow the programmer to indicate that parallelizing an operation will produce the same result as evaluating it serially:

Make the hard things possible

The easy things should be easy, and able to be built out of primitives that compose nicely. However, such things have to be built out of what VMs and operating systems provide: threads, atomic instructions (such as CAS), and concurrency control constructs such as mutexes and semaphores. Perl 6 is meant to last for decades, and the coming decades will doubtless bring new ways do do parallel and asynchronous programming that we do not have today. They will still, however, almost certainly need to be built out of what is available.

Thus, the primitive things should be provided for those who need to work on such hard things. Perl 6 should not hide the existence of OS-level threads, or fail to provide access to lower level concurrency control constructs. However, they should be clearly documented as not the way to solve the majority of problems.

Schedulers

Schedulers lie at the heart of all concurrency in Perl 6. While most users are unlikely to immediately encounter schedulers when starting to use Perl 6's concurrency features, many of them are implemented in terms of it. Thus, they will be described first here to avoid lots of forward references.

A scheduler is something that does the Scheduler role. Its responsibility is taking code objects representing tasks that need to be performed and making sure they get run, as well as handling any time-related operations (such as, "run this code every second").

The current default scheduler is available as $*SCHEDULER. If no such dynamic variable has been declared, then $PROCESS::SCHEDULER is used. This defaults to an instance of ThreadPoolScheduler, which maintains a pool of threads and distributes scheduled work amongst them. Since the scheduler is dynamically scoped, this means that test scheduler modules can be developed that poke a $*SCHEDULER into EXPORT, and then provide the test writer with control over time.

The cue method takes a Callable object and schedules it.

    $*SCHEDULER.cue: { say "Golly, I got scheduled!" }

Various options may be supplied as named arguments. (All references to time are taken to be in seconds, which may be fractional.) You may schedule an event to fire off after some number of seconds:

    $*SCHEDULER.cue: in=>10, { say "10s later" }

or at a given absolute time, specified as an Instant:

    $*SCHEDULER.cue: at=>$instant, { say "at $instant" }

If a scheduled item dies, the scheduler will catch this exception and pass it to a handle_uncaught method, a default implementation of which is provided by the Scheduler role. This by default will report the exception and cause the entire application to terminate. However, it is possible to replace this:

    $*SCHEDULER.uncaught_handler = sub ($exception) {
        $logger.log_error($exception);
    }

For more fine-grained handling, it is possible to schedule code along with a code object to be invoked with the thrown exception if it dies:

    $*SCHEDULER.cue:
        { upload_progress($stuff) },
        quit => -> $ex { warn "Could not upload latest progress" }

Use :every to schedule a task to run at a fixed interval, possibly with a delay before the first scheduling.

    # Every second, from now
    $*SCHEDULER.cue: :every(1), { say "Oh wow, a kangaroo!" };
    # Every 0.5s, but don't start for 2s.
    $*SCHEDULER.cue: { say "Kenya believe it?" }, :every(0.5), :in(2);

Since this will cause the given task to be executed at the given interval ad infinitum, there are two ways to make sure the scheduling of the task is halted at a future time. The first is provided by specifying the :limit parameter in the .cue:

    # Every second, from now, but only 42 times
    $*SCHEDULER.cue: :every(1), :limit(42), { say "Oh wow, a kangaroo!" };

The second is by specifying code that will be checked at the end of each interval. The task will be stopped as soon as it returns a True value. You can do this with the :stop parameter.

    # Every second, from now, until stopped
    my $stop;
    $*SCHEDULER.cue: :every(1), :stop({$stop}), { say "Oh wow, a kangaroo!" };
    sleep 10;
    $stop = True;  # task stopped after 10 seconds

The .cue method returns a Cancellation object, which can also be used to stop a repeating cue:

    my $c = $*SCHEDULER.cue: :every(1), { say "Oh wow, a kangaroo!" };
    sleep 10;
    $c.cancel;  # task stopped after 10 seconds

Schedulers also provide counts of the number of operations in various states:

    say $*SCHEDULER.loads;

This returns, in order, the number of cues that are not yet runnable due to delays, the number of cues that are runnable but not yet assigned to a thread, and the number of cues that are now assigned to a thread (and presumably running). [Conjecture: perhaps these should be separate methods.]

Schedulers may optionally provide further introspection in order to support tools such as debuggers.

There is also a CurrentThreadScheduler, which always schedules things on the current thread. It provides the same methods, just no concurrency, and any exceptions are thrown immediately. This is mostly useful for forcing synchrony in places that default to asynchrony. (Note that .loads can never return anything but 0 for the currently running cues, since they're waiting on the current thread to stop scheduling first!)

Promises

A Promise is a synchronization primitive for an asynchronous piece of work that will produce a single result (thus keeping the promise) or fail in some way (thus breaking the promise).

The simplest way to use a Promise is to create one:

    my $promise = Promise.new;

And then later keep it:

    $promise.keep;      # True
    $promise.keep(42);  # a specific return value for kept Promise

Or break it:

    $promise.break;                             # False
    $promise.break(X::Some::Problem.new);       # With exception
    $promise.break("I just couldn't do it");    # With message

The current status of a Promise is available through the status method, which returns an element from the PromiseStatus enumeration.

    enum PromiseStatus (:Planned(0), :Kept(1), :Broken(2));

The result itself can be obtained by calling result. If the Promise was already kept, the result is immediately returned. If the Promise was broken then the exception that it was broken with is thrown. If the Promise is not yet kept or broken, then the caller will block until this happens.

A Promise will boolify to whether the Promise is already kept or broken. There is also an excuse method for extracting the exception from a Broken Promise rather than having it thrown.

    if $promise {
        if $promise.status == Kept {
            say "Kept, result = " ~ $promise.result;
        }
        else {
            say "Broken because " ~ $promise.excuse;
        }
    }
    else {
        say "Still working!";
    }

You can also simply use a switch:

    given $promise.status {
        when Planned { say "Still working!" }
        when Kept    { say "Kept, result = ", $promise.result }
        when Broken  { say "Broken because ", $promise.excuse }
    }

There are various convenient "factory" methods on Promise. The most common is start.

    my $p = Promise.start(&do_hard_calculation);

This creates a Promise that runs the supplied code, and calls keep with its result. If the code throws an exception, then break is called with the Exception. Most of the time, however, the above is simply written as:

    my $p = start {
        # code here
    }

Which is implemented by calling Promise.start.

There is also a method to create a Promise that is kept after a number of seconds, or at a specific time:

    my $kept_in_10s      = Promise.in(10);
    my $kept_in_duration = Promise.in($duration);
    my $kept_at_instant  = Promise.at($instant);

The result is always True and such a Promise can never be broken. It is mostly useful for combining with other promises.

There are also a couple of Promise combinators. The anyof combinator creates a Promise that is kept whenever any of the specified Promises are kept. If the first promise to produce a result is instead broken, then the resulting Promise is also broken. The excuse is passed along. When the Promise is kept, it has a True result.

    my $calc     = start { ... }
    my $timeout  = Promise.in(10);
    my $timecalc = Promise.anyof($calc, $timeout);

There is also an allof combinator, which creates a Promise that will be kept when all of the specified Promises are kept, or broken if any of them are broken.

[Conjecture: there should be infix operators for these resembling the junctionals.]

The then method on a Promise is used to request that a certain piece of code should be run, receiving the Promise as an argument, when the Promise is kept or broken. If the Promise is already kept or broken, the code is scheduled immediately. It is possible to call then more than once, and each time it returns a Promise representing the completion of both the original Promise as well as the code specified in then.

    my $feedback_promise = $download_promise.then(-> $res {
        given $res.status {
            when Kept   { say "File $res.result().name() download" }
            when Broken { say "FAIL: $res.excuse()"                 }
        }
    });

[Conjecture: this needs better syntax to separate the "then" policies from the "else" policies (and from "catch" policies?), and to avoid a bunch of switch boilerplate. We already know the givens here...]

One risk when working with Promises is that another piece of code will sneak in and keep or break a Promise it should not. The notion of a promise is user-facing. To instead represent the promise from the viewpoint of the promiser, the various built-in Promise factory methods and combinators use Promise::Vow objects to represent that internal resolve to fulfill the promise. ("I have vowed to keep my promise to you.") The vow method on a Promise returns an object with keep and break methods. It can only be called once during a Promise object's lifetime. Since keep and break on the Promise itself just delegate to self.vow.keep(...) or self.vow.break(...), obtaining the vow before letting the Promise escape to the outside world is a way to take ownership of the right to keep or break it. For example, here is how the Promise.in factory is implemented:

    method in(Promise:U: $seconds, :$scheduler = $*SCHEDULER) {
        my $p = Promise.new(:$scheduler);
        my $v = $p.vow;
        $scheduler.cue: { $v.keep(True) }, :in($seconds);
        $p;
    }

The await function is used to wait for one or more Promises to produce a result.

    my ($a, $b) = await $p1, $p2;

This simply calls result on each of the Promises, so any exception will be thrown.

Channels

A Channel is essentially a concurrent queue. One or more threads can put values into the Channel using send:

    my $c = Channel.new;
    $c.send($msg);

Meanwhile, others can receive them:

    my $msg = $c.receive;

Channels are ideal for producer/consumer scenarios, and since there can be many senders and many receivers, they adapt well to scaling certain pipeline stages out over multiple workers also. [Conjectural: The two feed operators ==> and <== are implemented using Channel to connect each of the stages.]

A Channel may be "forever", but it is possible to close it to further sends by telling it to close:

    $c.close();

Trying to send any further messages on a closed channel will throw the X::Channel::SendOnDone exception. Closing a channel has no effect on the receiving end until all sent values have been received. At that point, any further calls to receive will throw X::Channel::ReceiveOnDone. The done method returns a Promise that is kept when a sender has called close and all sent messages have been received. Note that multiple calls to a channel return the same promise, not a new one.

While receive blocks until it can read, poll takes a message from the channel if one is there or immediately returns Nil if nothing is there.

There is also a earliest statement:

    earliest * {
        more $c1 { say "First channel got a value" }
        more $c2 { say "Second channel got a value" }
    }

That will invoke the closure associated with the first channel that receives a value.

It's possible to add a timer using the keyword wait followed by the number of seconds to wait (which may be fractional). As a degenerate case, in order to avoid blocking at all you may use a wait 0. The timeout is always checked last, to guarantee that the other entries are all tried at least once before timing out.

    my $gotone = earliest * {
        more $c1 { say "First channel got a value" }
        more $c2 { say "Second channel got a value" }
        wait 0   { say "Not done yet"; Nil }
    }

The construct as a whole returns the result of whichever block was selected.

It's also possible to process a variadic list of channels together, using generic code that works over some set of the channels (use * to represent any of them). The index and the received value are passed to the code as named arguments $:k and <$:v> (possibly via priming if the code is instantiated ahead of time).

    earliest * {
        more @channels { say "Channel $:k received, result was: ", $:v }
    }

In this case $:k returns the index of the channel, base 0. Likewise $:v returns the value.

The earliest construct also automatically checks the .done promise corresponding to the channel, so it can also be used in order to write a loop to receive from a channel until it is closed:

    gather loop {
        earliest $channel {
            more * { take $_ }
            done * { last }
        }
    }

This is such a common pattern that we make a channel in list context behave that way:

    for @$channel -> $val { ... }
    for $channel.list -> $val { ... }

(Note that this is not a combinator, but a means for transfering data from the reactive realm to the lazy realm. Some reasonable amount of buffering is assumed between the two.)

Supplies

Channels are good for producer/consumer scenarios, but because each worker blocks on receive, it is not such an ideal construct for doing fine-grained processing of asynchronously produced streams of values. Additionally, there can only be one receiver for each value. Supplies exist to address both of these issues.

A Supply pushes or pumps values to one or more receivers who have registered their interest. There are two types of Supplies: live and on demand. When tapping into a live supply, the tap will only see values that are pumped after the tap has been created. Such supplies are normally infinite in nature, such as mouse movements. Closing the tap does not stop events from occurring, it just means nobody is listening. All tappers see the same stream. A tap on an on demand supply will initiate the production of values, and tapping the supply again may result in a new set of values. For example, Supply.interval produces a fresh timer with the appropriate interval each time it is tapped. If the tap is closed, the timer stops pushing out new values.

Anything that does the Supply role can be tapped (that is, subscribed to) by calling the tap method on it. This takes up to three callables as arguments, the optional ones expresses as named arguments:

    $supply.tap: -> $value { say "Got a $value" },
        done => { say "Reached the end" },
        quit => {
            when X::FooBar { die "Major oopsie" };
            default        { warn "Supply shut down early: $_" }
        }

The first, known as the emit closure, is invoked whenever a value is emitted into the thing that has been tapped. The optional named parameter done specifies the code to be invoked when all expected values have been produced and no more values will be emitted. The optional named parameter quit specifies the code to be invoked if there is an error. This also means there will be no further values.

The simplest Supply is a Supply class, which is punned from the role. It creates a live supply. On the "pumping" end, this has corresponding methods emit, done, and quit, which notify all current taps.

    my $s = Supply.new;
    my $t1 = $s.tap({ say $_ });
    $s.emit(1);                              # 1\n
    $s.emit(2);                              # 2\n
    my $t2 = $s.tap({ say 2 * $_ },
                    { say "End" });
    $s.emit(3);                              # 3\n6\n

The object returned by tap represents the subscription. To stop subscribing, call close on it.

    $t1.close;
    $s.emit(4);                              # 8\n
    $s.done;                                 # End\n

This doesn't introduce any asynchrony directly. However, it is possible for values to be pumped into a Supply from an asynchronous worker. In fact, it is possible for many threads to safely pump values into a supply. In the event this happens, the callback of the tap may be executed on many threads at the same time.

The Supply class has various methods that produce more interesting kinds of Supply. These default to working asynchronously.

Supply.for takes a (potentially lazy) list of values, and returns an on demand Supply that, when tapped, will iterate over the values and invoke the emit callable for each of them, and any done callable at the end. If the iteration at some point produces an exception, then the quit callable will be invoked to pass along the exception.

Supply.interval produces an on demand Supply that, when tapped, will produce an ascending value at a regular time interval.

    Supply.interval(1).tap(&say);     # Once a second, starting now
    Supply.interval(5, 10).tap(&say); # Each 5 seconds, starting in 10 seconds

Take the returned tap object and close it to stop the ticks:

    my $t = Supply.interval(1).tap(&say);
    # ...later...
    $t.close();

[TODO: many more of these.]

Supplies are mathematically dual to iterators, and so it is possible to define the same set of operations on them as are available on lazy lists. The key difference is that, while grep on a lazy list pulls a value to process, working synchronously, grep on a Supply has values pushed through it, and pushes those that match the filter onwards to anything that taps it.

The following methods are available on an instantiated Supply ($s in these examples):

list
  my @l := $s.list;

Produces a lazy List with the values of the Supply.

wait
  $s.wait;

Waits until the specified Supply is done or quit.

Channel
  my $c = $s.Channel;

Produces a Channel of the values of the given Supply.

Promise
  my $p = $s.Promise;

Produces a Promise that will be kept for the next value of the given Supply, or will be broken when the Supply is done before a value is produced.

last
  my $l = $s.last(42);  # default: 1

Produces a Supply that will only emit the last N values of the given Supply when it is done. Default is the final value.

grab
  my $g = $s.grab( { .sort } ); # sort the values of a Supply

Produces a Supply will grab all values emitted by the given Supply until it is done. It will then call the given closure and then emit each of the return values of the closure, and then done the Supply that was produced.

flat
  my $f = $s.flat;

Produces a Supply in which all values of the original supply are flattened.

do
  my $seen;
  my $d = $s.do( {$seen++} );

Produces a Supply that is identical to the original supply, but will execute the given code for its side-effects. It promises that only one thread will ever be executing the code object passed to it at a time; others will block behind it.

act
  my $seen;
  $s->act( {$seen++} );

A special case of Supply.do, that will also create a tap on the given Supply, so that you only need to worry about writing the side-effect code.

grep
  my $g = $s.grep( * > 5 );
  my $g = $s.grep(Int);

Produces a Supply that only provides values that you want. Takes either a Callable (which is supposed to return a True value to pass on emitted values) or a value to be smartmatched against.

map
  my $m = $s.map( * * 5 );

Produces a Supply that provides its original's Supply values multiplied by 5.

  my $m2 = $s.map( { $_ xx 2 } );

Produces a Supply that provides its original's Supply values twice.

unique
  my $u = $s.unique( :as( {$_} ), :with( &[===] ), :expires(1) );

Produces a Supply that only provides unique values, as defined by the optional as and with named parameters (same as List.unique). The optional expires parameter specifies how long to wait (in seconds) before "resetting" and not considering a value to have been seen, even if it's the same as an old value.

squish
  my $q = $s.squish( :as( {$_} ), :with( &[===] ), :expires(1) );

Produces a Supply that only provides sequentially different values, as defined by the optional as and with named parameters (same as List.squish). The optional expires parameter specifies how long to wait (in seconds) before "resetting" and not squishing a new value with an old one, even if they are the same.

max
  my $a = $s.max(&by); # default &infix:<cmp>

Produces a Supply that produces the maximum values of the specified Supply. In other words, from a continuously ascending Supply it will produce all the values. From a continuously descending Supply it will only produce the first value. The optional parameter specifies the comparator, just as with Any.max.

min
  my $i = $s.min(&by); # default &infix:<cmp>

Produces a Supply that produces the minimum values of the specified Supply. In other words, from a continuously descending Supply it will produce all the values. From a continuously ascending Supply it will only produce the first value. The optional parameter specifies the comparator, just as with Any.min.

minmax
  my $m = $s.minmax(&by); # default &infix:<cmp>

Produces a Supply that produces the Ranges with the minimum and maximum values of the specified Supply. The optional parameter specifies the comparator, just as with Any.minmax.

batch
  my $b = $s.batch( :elems(100), :seconds(1) );

Produces a Supply that batches the values of the given Supply by either the number of elements (using the elems named parameter) or the maximum number of seconds (using the seconds named parameter) or both. Values are grouped in a single array element when flushed.

elems
  my $e = $s.elems($seconds?); # default: see all

Produces a Supply that produces the number of elements seen in the given Supply. You can also specify an interval to only see the number of elements seen once per that interval.

rotor
  my $b = $s.rotor( $elems, $overlap );
  my $b = $s.rotor;    # elems = 2, overlap = 1

Produces a "rotoring" Supply where every elems number of elements are combined, and the last overlap elements of such a combination become the initial elements of the next combination. This can e.g. be used to convert a Supply of coordinates into a Supply of begin/end points.

delayed
  my $d = $s.delayed( 3.5 );  # delay supply 3.5 seconds

Produces a Supply that passes on the values of the given Supply with the given delay (in seconds).

stable
  my $u = $s.stable( $seconds, :$scheduler );

Produces a Supply that only passes on a value if it wasn't superseded by another value in the given time (in seconds). Optionally uses another scheduler than the default scheduler, using the scheduler named parameter.

start
  my $t = $s.start( {...} );

Takes a closure and, for each supplied value, schedules the closure to run on another thread. It then emits a Supply (resulting in us having a supply of supplies) that will either have a single value emitted and then be done if the async work completes successfully, or quit if the work fails. Useful for kicking off work on the thread pool if you do not want to block up the thread pushing values at you (maybe 'cus you are reacting to UI events, but have some long-running work to kick off). Usually used in combination with migrate.

migrate
  my $m = $t.migrate;

Produces a continuous Supply from a Supply, in which each value is a Supply. As soon as a new Supply appears, it will close the current Supply and provide values from the new Supply. Can be used in combination with schedule_on.

schedule_on
  my $o = $m.schedule_on( $scheduler );

This allows a Supply's emit/done/quit to be scheduled on another scheduler. Useful in GUI situations, for example, where the final stage of some work needs to be done on some UI scheduler in order to have UI updates run on the UI thread.

reduce
  my $r = $s.reduce( {...} );

Produces a Supply that will emit each reduction from the given Supply, just like reduce on Lists.

lines
  my $l = $s.lines;              # chomp lines
  my $l = $s.lines( :!chomp );   # do *not* chomp lines

Produces a Supply that will emit the characters coming in line by line from a Supply that's usually created by some asynchronous I/O operation. The optional :chomp named parameter indicates whether to remove line separators: the default is True.

words
  my $w = $s.words;

Produces a Supply that will emit the characters coming in word by word from a Supply that's usually created by some asynchronous I/O operation.

classify
  my $c = $s.classify( {.WHAT} );  # one Supply per type of value
  my $h = $s.classify( %mapper );
  my $a = $s.classify( @mapper );

Produces a Supply in which the emit values are Pairs consisting of the classification value and the Supply to which values of the given Supply will be emitted. Similar to List.classify, but does not support multi-level classification.

categorize
  my $c = $s.categorize( {@categories} );
  my $h = $s.categorize( %mapper );
  my $a = $s.categorize( @mapper );

Produces a Supply in which the emitted values are Pairs consisting of zero or more classification values and the Supply to which values of the given Supply will be emitted. Similar to List.categorize.

reverse
  my $r = $s.reverse;

Produces a Supply that emits the values of the given Supply in reverse order. Please note that this Supply will only start delivering values when the given Supply is done.

sort
  my $o = $s.sort(&by);  # default &infix:<cmp>

Produces a Supply that emits the values of the given Supply in sorted order. Please note that this Supply will only start delivering values when the given Supply is done. Optionally accepts a comparator Block.

There are some combinators that deal with bringing multiple supplies together:

merge
  my $m = $s1.merge($s2);
  my $m = Supply.merge(@s);  # also as class method

Produces a Supply containing the values produced by given and the specified supply or supplies, and triggering done once all of the supplies have done so.

zip
  my $z = $s1.zip($s2);                   # defaults to :with( &[,] )
  my $z = Supply.zip(@s, :with( &[,] ));  # also as class method

Produces a Supply that pairs together items from the given and the specified supply or supplies, using infix:<,> by default or any other user-supplied function with the with named parameter.

zip-latest
  my $z = $s1.zip-latest($s2);                  # like zip, defaults to :with( &[,] )
  my $z = Supply.zip-latest(@s, :with( &[,] )); # also a method on Supply.
  my $z = Supply.zip-latest( @s, :initial(42,63) ); # initial state

Produces a Supply that will emit tuples of values as soon as any combined Supply produces a value. Before any tuples are emitted, all supplies have to have produced at least one value. By default, it uses infix:<,> to produce the tuples, but the named parameter with can override that.

The named parameter initial can optionally be used to indicate the initial state of the values to be emitted.

[TODO: plenty more of these: while, until...]

These combinators that involve multiple supplies need care in their implementation, since values may arrive at any point on each, and possibly at the same time. To help write such combinators, the on meta-combinator is useful. on taps many supplies, and ensures that only one callback will be running at a time, freeing the combinator writer of worrying about synchronization issues.

The on combinator takes a block that receives the Supply it will generate (and return) as the parameter. That block is supposed to return list of Pairs, in which the keys are one or more Supplies. And the values are either a Block (to be called for each value for that Supply), or a hash with Pairs for emit, done and quit.

A simple combinator for Pairing values from two Supplies ($a and $b), would look like this:

  my $result = on -> $res {
      my @as;
      my @bs;
      on -> $res {
          $a => sub ($val) {
              @as.push($val);
              if @as && @bs {
                  $res.emit( @as.shift => @bs.shift );
              }
          },
          $b => sub ($val) {
              @bs.push($val);
              if @as && @bs {
                  $res.emit( @as.shift => @bs.shift );
              }
          }
      }
  }

Thus there is never any race or other thread-safety problems with mutating the @as and @bs. The default behaviour, if a Callable is specified along with the supply, is to use it for emit and provide a default done and quit. The default done triggers done on the result Supply.

Note that the code blocks for both Supplies are identical. There must be a better way of doing this. And indeed, there is: you can also specify more than one Supply per block. The same as above implemented using that:

  my $result = on -> $res {
      my @values = ([],[]);
      ($a,$b) => sub ($val,$index) {
          @values[$index].push($val);
          if all(@values) {
              $res.emit( (@values>>.shift) );
          }
      }
  }

Note that the block that is being called for each value from any of the Supplies also receives an index value to be able to group the values received. By default, any done or quit will be immediately propagated. This is basically how zip is implemented.

Sometimes, we want the resulting Supply to be done only when all specified Supplies are done. This is possible by specifying a hash with keys for emit, done and/or quit, instead of just a Callable. Given an array @s with Supplies:

  my $done = 0;
  my $result = on -> $res {
      @s => {
          emit => -> \val { $res.more(val) },
          done => { $res.done if ++$done == +@s }
      }
  }

This is essentially how merge is implemented. Note that if we don't need the index (as indicated by its absence in the signature of the Callables), it will not be passed.

A quit handler can be provided in a similar way, although the default - convey the failure to the result supply - is normally what is wanted. The exception is writing combinators related to error handling.

System events exposed as Supplies

System events, such as signals, or mouse events, can be exposed as Supplies. Because of lack of portability, these will most likely be implemented as third-party modules.

Basic signal support is offered by the signal function, which takes one or more Signal enums, and an optional scheduler named parameter. It produces a Supply which, when tapped, will emit any signal coming in. For example:

  signal(SIGINT).tap( { say "Thank you for your attention"; exit 0 } );

would catch Control-C, thank you, and then exit. Of course, you don't need to exit immediately. Here's an example of how you would make sure that an iteration in a loop is completed before exiting:

  for @todo {
      state $quitting;
      state $tap = signal(SIGINT).tap( { $quitting = True } );
      LAST  $tap.close;
      LEAVE exit(0) if $quitting;
      ... # code to protect
  }

This probably could use some syntactic sugar.

The list of supported Signals can be found by checking Signal::.keys, as you would any enum.

I/O features exposed as Supplies

Various I/O-related things are also exposed as supplies. For example, it is possible to get notifications on changes to files or files (directly) in a directory, using:

    IO::Notification.watch_path(".").tap(-> $file {
        say "$file changed";
    });

This is quite a mouthful, so there is a shortcut available with the IO coercer and the watch method:

    ".".IO.watch.tap: -> $file { say "$file changed" };

Note that since I/O callbacks are, by default, scheduled on the thread pool, then it's possible that your callback will be executing twice on the same thread. One way to cope is with do, and then a tap at the end:

    ".".IO.watch.do(-> $file {
        state %changes;
        say "$file changed (change {++%changes{$file}})";
    }).tap();

Here, we are tapping it purely for the side-effects, and do promises we will only be in that code block one thread at a time. To make this more convenient, there is also shortcut with the act method:

    ".".IO.watch.act(-> $file {
        state %changes;
        say "$file changed (change {++%changes{$file}})";
    });

It can also take done and quit named parameters; these go to the tap, while the emit closure is put in a do. A Tap is returned, which may be closed in the usual way. (Note that the name act is also a subtle reference to actor semantics.)

Inter-Process Communication exposed as Promises and Supplies

Starting external processes is rather easy: shell(), run() and qx//. Having external processes run asynchronously, is slightly more involved. But not much. The workhorse of asynchronous IPC in Perl 6 is Proc::Async:

    my $proc = Proc::Async.new( $path, @args );

If you like to send data to the process, you need to open it with the :w named parameter.

    my $proc = Proc::Async.new( $path, @args, :w );

By default, the current environment (as available in %*ENV) will be set for the external process. You can override this with the :ENV named parameter:

    my $proc = Proc::Async.new( $path, @args, :ENV(%hash) );

The returned object can then be called whenever needed to start the external process. However, before you do that, one needs to be clear what to do about the output of the external process. Getting information back from the external process's STDOUT or STDERR, is done by a Supply that either gets characters or bytes.

    $proc.stdout.act(&say);   # simply pass it on to our $*OUT as chars
    $proc.stderr.act(&note);  # and $*ERR as chars, but could be any code

or:

    $proc.stdout(:bin).act: { # process STDOUT bytes };
    $proc.stderr(:bin).act: { # process STDERR bytes };

So, to make sure no information will be lost, you need to create and tap the supplies before the process is started.

To start the external process, you need to call the .start method. It returns a Promise that becomes Kept (and True) if the process concludes successfully, or Broken (and False) if the process failed for some reason.

    my $done = $proc.start( :$scheduler = $*SCHEDULER );

To send data to the running process, you can use the .print, .say and .write methods on the Proc::Async object:

    my $printed = $proc.print( "Hello world\n", :$scheduler = $*SCHEDULER );
    my $said    = $proc.say(   "Hello world",   :$scheduler = $*SCHEDULER );
    my $written = $proc.write( $buffer,         :$scheduler = $*SCHEDULER );

They all also return a Promise that is Kept when communication with the process was successful.

Some programs expect their STDIN to be closed to signify the end of their processing. This can be achieved with the .close-stdin method:

    $proc.close-stdin;

Finally, if your process as going awry, you can stop it with the .kill method:

    $proc.kill;            # sends HUP signal to process
    $proc.kill("SIGINT");  # send INT signal
    $proc.kill(1);         # if you just know the signal number on your system

The parameter should be something that is acceptable to the Kernel.signal method.

The Event Loop

There is no event loop. Previous versions of this synopsis mentioned an event loop that would be underlying all concurrency. In this version, this is not the case.

Threads

VM-level threads, which typically correspond to OS-level threads, are exposed through the Thread class. Whatever underlies it, a Thread should always be backed by something that is capable of being scheduled on a CPU core (that is, it may not be a "green thread" or similar). Most users will not need to work with Threads directly. However, those building their own schedulers may well need to do so, and there may be other exceptional circumstances that demand such low-level control.

The easiest way to start a thread is with the start method, which takes a Callable and runs it on a new thread:

    my $thread = Thread.start({
        say "Gosh, I'm in a thread!";
    });

It is also possible to create a thread object, and set it running later:

    my $thread = Thread.new(code => {
        say "A thread, you say?";
    });
    # later...
    $thread.run();

Both approaches result in $thread containing a Thread object. At some point, finish should be called on the thread, from the thread that started it. This blocks until the thread has completed.

    say "Certainly before the thread is started";
    my $thread = Thread.start({ say "In the thread" });
    say "This could come before or after the thread's output";
    $thread.finish();
    say "Certainly after all the above output";

As an alternative to finish, it is possible to create a thread whose lifetime is bounded by that of the overall application. Such threads are automatically terminated when the application exits. In a scenario where the initial thread creates an application lifetime thread and no others, then the exit of the initial thread will cause termination of the overall program. Such a thread is created by either:

    my $thread = Thread.new(:code({ ... }), :app_lifetime);

Or just, by using the start method:

    my $thread = Thread.start({ ... }, :app_lifetime);

The property can be introspected:

    say $thread.app_lifetime; # True/False

Each thread also has a unique ID, which can be obtained by the id property.

    say $thread.id;

This should be treated as an opaque number. It can not be assumed to map to any particular operating system's idea of thread ID, for example. For that, use something that lets you get at OS-level identifiers (such as calling an OS API using NativeCall).

A thread may also be given a name.

    my $thread = Thread.start({ ... }, :name<Background CPU Eater>);

This can be useful for understanding its usage. Uniqueness is not enforced; indeed, the default is "<anon>".

A thread stringifies to something of the form:

    Thread<id>(name)

For example:

    Thread<1234>(<anon>)

The currently executing thread is available through $*THREAD. This is even available in the initial thread of the program, in this case by falling back to $PROCESS::THREAD, which is the initial thread of the process.

Finally, the yield method can be called on Thread (not on any particular thread) to hint to the OS that the thread has nothing useful to do for the moment, and so another thread should run instead.

Atomic Compare and Swap

The Atomic Compare and Swap (CAS) primitive is directly supported by most modern hardware. It has been shown that it can be used to build a whole range of concurrency control mechanisms (such as mutexes and semaphores). It can also be used to implement lock-free data structures. It is decidedly a primitive, and not truly composable due to risk of livelock. However, since so much can be built out of it, Perl 6 provides it directly.

A Perl 6 implementation of CAS would look something like this:

    sub cas($ref is rw, $expected, $new) {
        my $seen = $ref;
        if $ref === $expected {
            $ref = $new;
        }
        return $seen;
    }

Except that it happens atomically. For example, a crappy non-reentrant mutex could be implemented as:

    class CrappyMutex {
        has $!locked = 0;
        method lock() {
            loop {
                return if cas($!locked, 0, 1) == 0;
            }
        }
        method unlock() {
            $!locked = 0;
        }
    }

Another common use of CAS is in providing lock-free data structures. Any data structure can be made lock-free as long as you're willing to never mutate it, but build a fresh one each time. To support this, there is another &cas candidate that takes a scalar and a block. It calls the block with the seen initial value. The block returns the new, updated value. If nothing else updated the value in the meantime, the reference will be updated. If the CAS fails because another update got in first, the block will be run again, passing in the latest value.

So, atomically incrementing a variable is done thusly:

    cas $a, { $_.succ };    # $a++

or more generally for all assignment meta-operators:

    cas $a, { $_ * 5 };     # $a *= 5

Another example, implementing a top-5 news headlines list to be accessed and updated without ever locking, as:

    class TopHeadlines {
        has $!headlines = [];   # Scalar holding array, as CAS needs
        method headlines() {
            $!headlines
        }
        method add_headline($headline) {
            cas($!headlines, -> @current {
                my @new = $headline, @current;
                @new.pop while @new.elems > 5;
                @new
            });
        }
    }

It's the programmer's duty to ensure that the original data structure is never mutated and that the block has no side-effects (since it may be run any number of times).

Low-level primitives

Perl6 offers high-level concurrency methods, but in extreme cases, like if you need to implement a fundamentally different mechanism, these primitives are available.

Locks

Locks are unpleasant to work with, and users are pushed towards higher level synchronization primitives. However, those need to be implemented via lower level constructs for efficiency. As such, a simple lock mechanism - as close to what the execution environment offers as possible - is provided by the Lock class. Note that it is erroneous to rely on the exact representation of an instance of this type (for example, don't assume it can be mixed into). Put another way, treat Lock like a native type.

A Lock is instantiated with new:

    $!lock = Lock.new;

The best way to use it is:

    $!lock.protect: {
        # code to run with the lock held
    }

This acquires the lock, runs the code passed, and then releases the lock. It ensures the lock will be released even if an exception is thrown. It is also possible to do:

    {
        $!lock.lock();
        # do stuff
        LEAVE $!lock.unlock()
    }

When using the lock and unlock methods, the programmer must ensure that the lock is unlocked. Lock is reentrant. Naturally, it's easy to introduce deadlocks. Again, this is a last resort, intended for those who are building first resorts.

Semaphore

The Semaphore class implements traditional semaphores that can be initiated with a fixed number of permits and offers the operations acquire to block on a positive number of permits to become available and then reduce that number by one, tryacquire to try to acquire a permit, but return False instead of blocking if there are no permits available yet. The last operation is release, which will increase the number of permits by one.

The initial number of permits may be negative, positive or 0.

Some implementations allow for race-free acquisition and release of multiple permits at once, but this primitive does not offer that capability.

AUTHORS

    Jonathan Worthington <jnthn@jnthn.net>
    Elizabeth Mattijsen <liz@dijkmat.nl>
[ Top ]   [ Index of Synopses ]