class Supply does Awaitable {}

A supply is a thread-safe, asynchronous data stream like a Channel, but it can have multiple subscribers (taps) that all get the same values flowing through the supply.

It is a thread-safe implementation of the Observer Pattern, and central to supporting reactive programming in Raku.

There are two types of Supplies: live and on demand. When tapping into a live supply, the tap will only see values that are flowing through the supply after the tap has been created. Such supplies are normally infinite in nature, such as mouse movements. Closing such a tap does not stop mouse events from occurring, it just means that the values will go by unseen. All tappers see the same flow of values.

A tap on an on demand supply will initiate the production of values, and tapping the supply again may result in a new set of values. For example, Supply.interval produces a fresh timer with the appropriate interval each time it is tapped. If the tap is closed, the timer simply stops emitting values to that tap.

A live Supply is obtained from the Supplier factory method Supply. New values are emitted by calling emit on the Supplier object.

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "$v" });
$supplier.emit(42); # Will cause the tap to output "42"

The live method returns True on live supplies. Factory methods such as interval, from-list will return on demand supplies.

A live Supply that keeps values until tapped the first time can be created with Supplier::Preserving.

Further examples can be found in the concurrency page.

Methods that return Taps§

method tap§

method tap(Supply:D: &emit = -> $ { },
        :&done = -> {},
        :&quit = -> $ex { $ex.throw },
        :&tap = -> $ {} )

Creates a new tap (a kind of subscription if you will), in addition to all existing taps. The first positional argument is a piece of code that will be called when a new value becomes available through the emit call.

The &done callback can be called in a number of cases: if a supply block is being tapped, when a done routine is reached; if a supply block is being tapped, it will be automatically triggered if the supply block reaches the end; if the done method is called on the parent Supplier (in the case of a supply block, if there are multiple Suppliers referenced by whenever, they must all have their done method invoked for this to trigger the &done callback of the tap as the block will then reach its end).

The &quit callback is called if the tap is on a supply block which exits with an error. It is also called if the quit method is invoked on the parent Supplier (in the case of a supply block any one Supplier quitting with an uncaught exception will call the &quit callback as the block will exit with an error). The error is passed as a parameter to the callback.

The &tap callback is called once the Tap object is created, which is passed as a parameter to the callback. The callback is called ahead of emit/done/quit, providing a reliable way to get the Tap object. One case where this is useful is when the Supply begins emitting values synchronously, since the call to .tap won't return the Tap object until it is done emitting, preventing it from being stopped if needed.

Method tap returns an object of type Tap, on which you can call the close method to cancel the subscription.

my $s = Supply.from-list(0 .. 5);
my $t = $s.tap(-> $v { say $v }, done => { say "no more ticks" });

Produces:

0
1
2
3
4
5
no more ticks

method act§

method act(Supply:D: &actor, *%others)

Creates a tap on the given supply with the given code. Differently from tap, the given code is guaranteed to be executed by only one thread at a time.

Utility methods§

method Capture§

method Capture(Supply:D: --> Capture:D)

Equivalent to calling .List.Capture on the invocant.

method Channel§

method Channel(Supply:D: --> Channel:D)

Returns a Channel object that will receive all future values from the supply, and will be closed when the Supply is done, and quit (shut down with error) when the supply is quit.

method Promise§

method Promise(Supply:D: --> Promise:D)

Returns a Promise that will be kept when the Supply is done. If the Supply also emits any values, then the Promise will be kept with the final value. Otherwise, it will be kept with Nil. If the Supply ends with a quit instead of a done, then the Promise will be broken with that exception.

my $supplier = Supplier.new;
my $s = $supplier.Supply;
my $p = $s.Promise;
$p.then(-> $v { say "got $v.result()" });
$supplier.emit('cha');         # not output yet
$supplier.done();              # got cha

The Promise method is most useful when dealing with supplies that will tend to produce just one value, when only the final value is of interest, or when only completion (successful or not) is relevant.

method live§

method live(Supply:D: --> Bool:D)

Returns True if the supply is "live", that is, values are emitted to taps as soon as they arrive. Always returns True in the default Supply (but for example on the supply returned from Supply.from-list it's False).

say Supplier.new.Supply.live;    # OUTPUT: «True␤»

method schedule-on§

method schedule-on(Supply:D: Scheduler $scheduler)

Runs the emit, done and quit callbacks on the specified scheduler.

This is useful for GUI toolkits that require certain actions to be run from the GUI thread.

Methods that wait until the supply is done§

method wait§

method wait(Supply:D:)

Taps the Supply it is called on, and blocks execution until the either the supply is done (in which case it evaluates to the final value that was emitted on the Supply, or Nil if not value was emitted) or quit (in which case it will throw the exception that was passed to quit).

my $s = Supplier.new;
start {
  sleep 1;
  say "One second: running.";
  sleep 1;
  $s.emit(42);
  $s.done;
}
$s.Supply.wait;
say "Two seconds: done";

method list§

multi method list(Supply:D:)

Taps the Supply it is called on, and returns a lazy list that will be reified as the Supply emits values. The list will be terminated once the Supply is done. If the Supply quits, then an exception will be thrown once that point in the lazy list is reached.

method Seq§

method Seq(Supply:D:)

Returns a Seq with an iterator containing the values that the Supply contains.

method grab§

method grab(Supply:D: &when-done --> Supply:D)

Taps the Supply it is called on. When it is done, calls &when-done and then emits the list of values that it returns on the result Supply. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.grab(&sum);
$t.tap(&say);           # OUTPUT: «19␤»

method reverse§

method reverse(Supply:D: --> Supply:D)

Taps the Supply it is called on. Once that Supply emits done, all of the values it emitted will be emitted on the returned Supply in reverse order. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(1, 2, 3);
my $t = $s.reverse;
$t.tap(&say);           # OUTPUT: «3␤2␤1␤»

method sort§

method sort(Supply:D: &custom-routine-to-use? --> Supply:D)

Taps the Supply it is called on. Once that Supply emits done, all of the values that it emitted will be sorted, and the results emitted on the returned Supply in the sorted order. Optionally accepts a comparator Block. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.sort();
$t.tap(&say);           # OUTPUT: «2␤3␤4␤10␤»

method collate§

method collate(Supply:D:)

Taps the Supply it is called on. Once that Supply emits done, all of the values that it emitted will be sorted taking into account Unicode grapheme characteristics. A new Supply is returned with the sorted values emitted. See Any.collate for more details on the collated sort.

my $s = Supply.from-list(<ä a o ö>);
my $t = $s.collate();
$t.tap(&say);           # OUTPUT: «a␤ä␤o␤ö␤»

method reduce§

method reduce(Supply:D: &with --> Supply:D)

Creates a "reducing" supply, which will emit a single value with the same semantics as List.reduce.

my $supply = Supply.from-list(1..5).reduce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «15␤»

Methods that return another Supply§

method from-list§

method from-list(Supply:U: +@values --> Supply:D)

Creates an on-demand supply from the values passed to this method.

my $s = Supply.from-list(1, 2, 3);
$s.tap(&say);           # OUTPUT: «1␤2␤3␤»

method share§

method share(Supply:D: --> Supply:D)

Creates a live supply from an on-demand supply, thus making it possible to share the values of the on-demand supply on multiple taps, instead of each tap seeing its own copy of all values from the on-demand supply.

# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3"
my $s = Supply.interval(1).share;
$s.tap: { "first $_".say };
sleep 1.1;
$s.tap: { "second $_".say };
sleep 2

method flat§

method flat(Supply:D: --> Supply:D)

Creates a supply on which all of the values seen in the given supply are flattened before being emitted again.

method do§

method do(Supply:D: &do --> Supply:D)

Creates a supply to which all values seen in the given supply, are emitted again. The given code, executed for its side-effects only, is guaranteed to be only executed by one thread at a time.

method on-close§

method on-close(Supply:D: &on-close --> Supply:D)

Returns a new Supply which will run &on-close whenever a Tap of that Supply is closed. This includes if further operations are chained on to the Supply. (for example, $supply.on-close(&on-close).map(*.uc)). When using a react or supply block, using the CLOSE phaser is usually a better choice.

my $s = Supplier.new;
my $tap = $s.Supply.on-close({ say "Tap closed" }).tap(
    -> $v { say "the value is $v" },
    done    => { say "Supply is done" },
    quit    => -> $ex { say "Supply finished with error $ex" },
);

$s.emit('Raku');
$tap.close;        # OUTPUT: «Tap closed␤»

method interval§

method interval(Supply:U: $interval, $delay = 0, :$scheduler = $*SCHEDULER --> Supply:D)

Creates a supply that emits a value every $interval seconds, starting $delay seconds from the call. The emitted value is an integer, starting from 0, and is incremented by one for each value emitted.

Implementations may treat too-small and negative values as lowest resolution they support, possibly warning in such situations; e.g. treating 0.0001 as 0.001. For 6.d language version, the minimal value specified is 0.001.

method grep§

method grep(Supply:D: Mu $test --> Supply:D)

Creates a new supply that only emits those values from the original supply that smartmatch against $test.

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $ints     = $all.grep(Int);
$ints.tap(&say);
$supplier.emit($_) for 1, 'a string', 3.14159;   # prints only 1

method map§

method map(Supply:D: &mapper --> Supply:D)

Returns a new supply that maps each value of the given supply through &mapper and emits it to the new supply.

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $double   = $all.map(-> $value { $value * 2 });
$double.tap(&say);
$supplier.emit(4);           # OUTPUT: «8»

method batch§

method batch(Supply:D: :$elems, :$seconds --> Supply:D)

Creates a new supply that batches the values of the given supply by either the number of elements in the batch (using :elems) or a duration (using :seconds) or both. Any remaining values are emitted in a final batch when the supply is done.

Note: Since Rakudo version 2020.12, the :seconds parameter has a millisecond granularity: for example a 1 millisecond duration could be specified as :seconds(0.001). Before Rakudo version 2020.12, the :seconds parameter had a second granularity.

method elems§

method elems(Supply:D: $seconds? --> Supply:D)

Creates a new supply in which changes to the number of values seen are emitted. It optionally also takes an interval (in seconds) if you only want to be updated every so many seconds.

method head§

multi method head(Supply:D:)
multi method head(Supply:D: Callable:D $limit)
multi method head(Supply:D: \limit)

Creates a "head" supply with the same semantics as List.head.

my $s = Supply.from-list(4, 10, 3, 2);
my $hs = $s.head(2);
$hs.tap(&say);           # OUTPUT: «4␤10␤»

Since release 2020.07, A WhateverCode can be used also, again with the same semantics as List.head

my $s = Supply.from-list(4, 10, 3, 2, 1);
my $hs = $s.head( * - 2);
$hs.tap(&say);           # OUTPUT: «4␤10␤3␤»

method tail§

multi method tail(Supply:D:)
multi method tail(Supply:D: Callable:D $limit)
multi method tail(Supply:D: \limit)

Creates a "tail" supply with the same semantics as List.tail.

my $s = Supply.from-list(4, 10, 3, 2);
my $ts = $s.tail(2);
$ts.tap(&say);           # OUTPUT: «3␤2␤»

You can call .tail with Whatever or Inf; which will return a new supply equivalent to the initial one. Calling it with a WhateverCode will be equivalent to skipping until that number.

my $s = Supply.from-list(4, 10, 3, 2);
my $ts = $s.tail( * - 2 );
$ts.tap(&say);           # OUTPUT: «3␤2␤»

This feature is only available from the 2020.07 release of Raku.

method first§

method first(Supply:D: :$end, |c)

This method creates a supply of the first element, or the last element if the optional named parameter :end is truthy, from a supply created by calling the grep method on the invocant, with any remaining arguments as parameters. If there is no remaining argument, this method is equivalent to calling on the invocant, without parameter, the head or the tail method, according to named parameter :end.

my $rand = supply { emit (rand × 100).floor for ^∞ };
my $first-prime = $rand.first: &is-prime;
# output the first prime from the endless random number supply $rand,
# then the $first-prime supply reaches its end
$first-prime.tap: &say;

method split§

multi method split(Supply:D: \delimiter)
multi method split(Supply:D: \delimiter, \limit)

This method creates a supply of the values returned by the Str.split method called on the string collected from the invocant. See Str.split for details on the \delimiter argument as well as available extra named parameters. The created supply can be limited with the \limit argument, see .head.

my $words = Supply.from-list(<Hello World From Raku!>);
my $s = $words.split(/ <?upper> /, 2, :skip-empty);
$s.tap(&say); # OUTPUT: «Hello␤World␤»

method rotate§

method rotate(Supply:D: $rotate = 1)

Creates a supply with elements rotated to the left when $rotate is positive or to the right otherwise, in which case the invocant is tapped on before a new supply is returned.

my $supply = Supply.from-list(<a b c d e>).rotate(2);
$supply.tap(&say); # OUTPUT: «c␤d␤e␤a␤b␤»

Note: Available since Rakudo 2020.06.

method rotor§

method rotor(Supply:D: @cycle --> Supply:D)

Creates a "rotoring" supply with the same semantics as List.rotor.

method delayed§

method delayed(Supply:D: $seconds, :$scheduler = $*SCHEDULER --> Supply:D)

Creates a new supply in which all values flowing through the given supply are emitted, but with the given delay in seconds.

method throttle§

multi method throttle(Supply:D:
      Int()  $elems,
      Real() $seconds,
      Real() $delay  = 0,
      :$scheduler    = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )
multi method throttle(Supply:D:
      Int()  $elems,
      Callable:D $process,
      Real() $delay = 0,
      :$scheduler   = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )

Arguments to .throttle are defined as follows:

ArgumentMeaning
$limit,values / time or simultaneous processing
$seconds or $processtime-unit / code to process simultaneously
$delay = 0,initial delay before starting, in seconds
:$control,supply to emit control messages on (optional)
:$status,supply to tap status messages from (optional)
:$bleed,supply to bleed messages to (optional)
:$vent-at,bleed when so many buffered (optional)
:$scheduler,scheduler to use, default $*SCHEDULER

This method produces a Supply from a given one, but makes sure the number of messages passed through is limited.

It has two modes of operation: per time-unit or by maximum number of executions of a block of code: this is determined by the type of the second positional parameter.

The first positional parameter specifies the limit that should be applied.

If the second positional parameter is a Callable, then the limit indicates the maximum number of parallel processes executing the Callable, which is given the value that was received. The emitted values in this case will be the Promises that were obtained from starting the Callable.

If the second positional parameter is a real number, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.

If the limit is exceeded, then incoming messages are buffered until there is room to pass on / execute the Callable again.

The third positional parameter is optional: it indicates the number of seconds the throttle will wait before passing on any values.

The :control named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value". Please see below for the types of messages that you can send to control the throttle.

The :status named parameter optionally specifies a Supply that will receive any status messages. If specified, it will at least send one status message after the original Supply is exhausted. See status message below.

The :bleed named parameter optionally specifies a Supply that will receive any values that were either explicitly bled (with the bleed control message), or automatically bled (if there's a vent-at active).

The :vent-at named parameter indicates the number of values that may be buffered before any additional value will be routed to the :bleed Supply. Defaults to 0 if not specified (causing no automatic bleeding to happen). Only makes sense if a :bleed Supply has also been specified.

The :scheduler named parameter indicates the scheduler to be used. Defaults to $*SCHEDULER.

control messages§

These messages can be sent to the :control Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".

  • limit

Change the number of messages (as initially given in the first positional) to the value given.

  • bleed

Route the given number of buffered messages to the :bleed Supply.

  • vent-at

Change the maximum number of buffered values before automatic bleeding takes place. If the value is lower than before, will cause immediate rerouting of buffered values to match the new maximum.

  • status

Send a status message to the :status Supply with the given id.

status message§

The status return message is a hash with the following keys:

  • allowed

The current number of messages / callables that is still allowed to be passed / executed.

  • bled

The number of messages routed to the :bleed Supply.

  • buffered

The number of messages currently buffered because of overflow.

  • emitted

The number of messages emitted (passed through).

  • id

The id of this status message (a monotonically increasing number). Handy if you want to log status messages.

  • limit

The current limit that is being applied.

  • vent-at

The maximum number of messages that may be buffered before they're automatically re-routed to the :bleed Supply.

Examples§

Have a simple piece of code announce when it starts running asynchronously, wait a random amount of time, then announce when it is done. Do this 6 times, but don't let more than 3 of them run simultaneously.

my $s = Supply.from-list(^6);  # set up supply
my $t = $s.throttle: 3,        # only allow 3 at a time
{                              # code block to run
    say "running $_";          # announce we've started
    sleep rand;                # wait some random time
    say "done $_"              # announce we're done
}                              # don't need ; because } at end of line
$t.wait;                       # wait for the supply to be done

and the result of one run will be:

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5

method stable§

method stable(Supply:D: $time, :$scheduler = $*SCHEDULER --> Supply:D)

Creates a new supply that only passes on a value flowing through the given supply if it wasn't superseded by another value in the given $time (in seconds). Optionally uses another scheduler than the default scheduler, using the :scheduler parameter.

To clarify the above, if, during the timeout $time, additional values are emitted to the Supplier all but the last one will be thrown away. Each time an additional value is emitted to the Supplier, during the timeout, $time is reset.

This method can be quite useful when handling UI input, where it is not desired to perform an operation until the user has stopped typing for a while rather than on every keystroke.

my $supplier = Supplier.new;
my $supply1 = $supplier.Supply;
$supply1.tap(-> $v { say "Supply1 got: $v" });
$supplier.emit(42);

my Supply $supply2 = $supply1.stable(5);
$supply2.tap(-> $v { say "Supply2 got: $v" });
sleep(3);
$supplier.emit(43);  # will not be seen by $supply2 but will reset $time
$supplier.emit(44);
sleep(10);
# OUTPUT: «Supply1 got: 42␤Supply1 got: 43␤Supply1 got: 44␤Supply2 got: 44␤»

As can be seen above, $supply1 received all values emitted to the Supplier while $supply2 only received one value. The 43 was thrown away because it was followed by another 'last' value 44 which was retained and sent to $supply2 after approximately eight seconds, this due to the fact that the timeout $time was reset after three seconds.

method produce§

method produce(Supply:D: &with --> Supply:D)

Creates a "producing" supply with the same semantics as List.produce.

my $supply = Supply.from-list(1..5).produce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «1␤3␤6␤10␤15␤»

method lines§

method lines(Supply:D: :$chomp = True --> Supply:D)

Creates a supply that will emit the characters coming in line by line from a supply that's usually created by some asynchronous I/O operation. The optional :chomp parameter indicates whether to remove line separators: the default is True.

method words§

method words(Supply:D: --> Supply:D)

Creates a supply that will emit the characters coming in word for word from a supply that's usually created by some asynchronous I/O operation.

my $s = Supply.from-list("Hello Word!".comb);
my $ws = $s.words;
$ws.tap(&say);           # OUTPUT: «Hello␤Word!␤»

method unique§

method unique(Supply:D: :$as, :$with, :$expires --> Supply:D)

Creates a supply that only provides unique values, as defined by the optional :as and :with parameters (same as with unique). The optional :expires parameter how long to wait (in seconds) before "resetting" and not considering a value to have been seen, even if it's the same as an old value.

method repeated§

method repeated(Supply:D: :&as, :&with)

Creates a supply that only provides repeated values, as defined by the optional :as and :with parameters (same as with unique).

my $supply = Supply.from-list(<a A B b c b C>).repeated(:as(&lc));
$supply.tap(&say);           # OUTPUT: «A␤b␤b␤C␤»

See repeated for more examples that use its sub form.

Note: Available since version 6.e (Rakudo 2020.01 and later).

method squish§

method squish(Supply:D: :$as, :$with --> Supply:D)

Creates a supply that only provides unique values, as defined by the optional :as and :with parameters (same as with squish).

method max§

method max(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that only emits values from the given supply if they are larger than any value seen before. In other words, from a continuously ascending supply it will emit all the values. From a continuously descending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.max.

method min§

method min(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that only emits values from the given supply if they are smaller than any value seen before. In other words, from a continuously descending supply it will emit all the values. From a continuously ascending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.min.

method minmax§

method minmax(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that emits a Range every time a new minimum or maximum values is seen from the given supply. The optional parameter specifies the comparator, just as with Any.minmax.

method skip§

method skip(Supply:D: Int(Cool) $number = 1 --> Supply:D)

Returns a new Supply which will emit all values from the given Supply except for the first $number values, which will be thrown away.

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply = $supply.skip(3);
$supply.tap({ say $_ });
$supplier.emit($_) for 1..10; # OUTPUT: «4␤5␤6␤7␤8␤9␤10␤»

method start§

method start(Supply:D: &startee --> Supply:D)

Creates a supply of supplies. For each value in the original supply, the code object is scheduled on another thread, and returns a supply either of a single value (if the code succeeds), or one that quits without a value (if the code fails).

This is useful for asynchronously starting work that you don't block on.

Use migrate to join the values into a single supply again.

method migrate§

method migrate(Supply:D: --> Supply:D)

Takes a Supply which itself has values that are of type Supply as input. Each time the outer Supply emits a new Supply, this will be tapped and its values emitted. Any previously tapped Supply will be closed. This is useful for migrating between different data sources, and only paying attention to the latest one.

For example, imagine an application where the user can switch between different stocks. When they switch to a new one, a connection is established to a web socket to get the latest values, and any previous connection should be closed. Each stream of values coming over the web socket would be represented as a Supply, which themselves are emitted into a Supply of latest data sources to watch. The migrate method could be used to flatten this supply of supplies into a single Supply of the current values that the user cares about.

Here is a simple simulation of such a program:

my Supplier $stock-sources .= new;

sub watch-stock($symbol) {
    $stock-sources.emit: supply {
        say "Starting to watch $symbol";
        whenever Supply.interval(1) {
            emit "$symbol: 111." ~ 99.rand.Int;
        }
        CLOSE say "Lost interest in $symbol";
    }
}

$stock-sources.Supply.migrate.tap: *.say;

watch-stock('GOOG');
sleep 3;
watch-stock('AAPL');
sleep 3;

Which produces output like:

Starting to watch GOOG
GOOG: 111.67
GOOG: 111.20
GOOG: 111.37
Lost interest in GOOG
Starting to watch AAPL
AAPL: 111.55
AAPL: 111.6
AAPL: 111.6

Methods that combine supplies§

method merge§

method merge(Supply @*supplies --> Supply:D)

Creates a supply to which any value seen from the given supplies, is emitted. The resulting supply is done only when all given supplies are done. Can also be called as a class method.

method zip§

method zip(**@s, :&with)

Creates a supply that emits combined values as soon as there is a new value seen on all of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with parameter. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.

This can also be used as a class method; in case it's used as an object method the corresponding supply will be one of the supplies combined (with no special treatment).

method zip-latest§

method zip-latest(**@s, :&with, :$initial )

Creates a supply that emits combined values as soon as there is a new value seen on any of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with parameter. The optional :initial parameter can be used to indicate the initial state of the combined values. By default, all supplies have to have at least one value emitted on them before the first combined values is emitted on the resulting supply. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.

I/O features exposed as supplies§

sub signal§

sub signal(*@signals, :$scheduler = $*SCHEDULER)

Creates a supply for the Signal enums (such as SIGINT) specified, and an optional :scheduler parameter. Any signals received, will be emitted on the supply. For example:

signal(SIGINT).tap( { say "Thank you for your attention"; exit 0 } );

would catch Control-C, thank you, and then exit.

To go from a signal number to a Signal, you can do something like this:

signal(Signal(2)).tap( -> $sig { say "Received signal: $sig" } );

The list of supported signals can be found by checking Signal::.keys (as you would any enum). For more details on how enums work see enum.

Note: Rakudo versions up to 2018.05 had a bug due to which numeric values of signals were incorrect on some systems. For example, Signal(10) was returning SIGBUS even if it was actually SIGUSR1 on a particular system. That being said, using signal(SIGUSR1) was working as expected on all Rakudo versions except 2018.04, 2018.04.1 and 2018.05, where the intended behavior can be achieved by using signal(SIGBUS) instead. These issues are resolved in Rakudo releases after 2018.05.

method IO::Notification.watch-path§

method watch-path($path --> Supply:D)

Creates a supply to which the OS will emit values to indicate changes on the filesystem for the given path. Also has a shortcut with the watch method on an IO object, like this:

IO::Notification.watch-path(".").act( { say "$^file changed" } );
".".IO.watch.act(                     { say "$^file changed" } );   # same

Typegraph§

Type relations for Supply
raku-type-graph Supply Supply Any Any Supply->Any Mu Mu Any->Mu

Expand chart above