class Supply does Awaitable {}

供应是一种线程安全的异步数据流,类似于 Channel,但它可以有多个订阅者(抽头),所有订阅者都获得流经供应的相同值。

它是 观察者模式 的线程安全实现,并且对于支持 Raku 中的响应式编程至关重要。

有两种类型的供应:liveon demand。当进入 live 供应时,抽头只会看到在创建抽头之后流经供应的值。此类供应通常本质上是无限的,例如鼠标移动。关闭此类抽头不会阻止鼠标事件发生,这只意味着这些值将被忽略。所有抽头者看到相同的值流。

on demand 供应上的抽头将启动值的生成,并且再次抽头供应可能会产生一组新值。例如,Supply.interval 每次抽头时都会生成一个具有适当间隔的新计时器。如果抽头关闭,计时器将停止向该抽头发出值。

live SupplySupplier 工厂方法 Supply 获得。通过调用 Supplier 对象上的 emit 发出新值。

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "$v" });
$supplier.emit(42); # Will cause the tap to output "42"

live 方法 在实时供应上返回 True。工厂方法(例如 intervalfrom-list)将返回按需供应。

可以使用 Supplier::Preserving 创建一个实时 Supply,该 Supply 会保留值,直到第一次抽头。

可以在 并发页面 中找到更多示例。

返回抽头的方法§

方法 tap§

method tap(Supply:D: &emit = -> $ { },
        :&done = -> {},
        :&quit = -> $ex { $ex.throw },
        :&tap = -> $ {} )

创建新的 tap(某种订阅),除了所有现有 tap。第一个位置参数是当通过 emit 调用获得新值时将调用的代码段。

&done 回调可以在多种情况下调用:如果正在 tap 供应块,则在到达 done 例程时;如果正在 tap 供应块,则当供应块到达末尾时将自动触发;如果在父 Supplier 上调用 done 方法(在供应块的情况下,如果 whenever 引用多个 Supplier,则必须调用它们的所有 done 方法才能触发 tap 的 &done 回调,因为块随后将到达其末尾)。

如果 tap 位于因错误而退出的供应块上,则调用 &quit 回调。如果在父 Supplier 上调用 quit 方法,也会调用它(在供应块的情况下,任何一个 Supplier 因未捕获的异常而退出都会调用 &quit 回调,因为块将因错误而退出)。错误作为参数传递给回调。

一旦创建 Tap 对象,就会调用 &tap 回调,该对象作为参数传递给回调。回调在 emit/done/quit 之前调用,提供获取 Tap 对象的可靠方式。此功能有用的一个情况是当 Supply 开始同步发出值时,因为对 .tap 的调用在发出完成之前不会返回 Tap 对象,从而防止在需要时停止它。

方法 tap 返回类型为 Tap 的对象,可以在其上调用 close 方法以取消订阅。

my $s = Supply.from-list(0 .. 5);
my $t = $s.tap(-> $v { say $v }done => { say "no more ticks" });

生成

0
1
2
3
4
5
no more ticks

方法 act§

method act(Supply:D: &actor*%others)

使用给定的代码在给定的供应上创建 tap。与 tap 不同,保证仅由一个线程同时执行给定的代码。

实用方法§

方法 Capture§

method Capture(Supply:D: --> Capture:D)

等效于对调用者调用 .List.Capture

方法 Channel§

method Channel(Supply:D: --> Channel:D)

返回一个 Channel 对象,该对象将接收供应的所有未来值,并且当供应完成时将被 close,当供应退出时将退出(因错误关闭)。

方法 Promise§

method Promise(Supply:D: --> Promise:D)

返回一个 Promise,当 Supplydone 时,该 Promise 将被保留。如果 Supply 还发出任何值,则 Promise 将保留最终值。否则,它将保留 Nil。如果 Supplyquit 而不是 done 结束,则 Promise 将因该异常而中断。

my $supplier = Supplier.new;
my $s = $supplier.Supply;
my $p = $s.Promise;
$p.then(-> $v { say "got $v.result()" });
$supplier.emit('cha');         # not output yet 
$supplier.done();              # got cha

当处理往往只产生一个值、仅对最终值感兴趣或仅与完成(成功或不成功)相关联的供应时,Promise 方法最为有用。

方法 live§

method live(Supply:D: --> Bool:D)

如果供应“处于活动状态”,即值在到达后立即发送到抽头,则返回 True。在默认 Supply 中始终返回 True(但例如在从 Supply.from-list 返回的供应中,它为 False)。

say Supplier.new.Supply.live;    # OUTPUT: «True␤»

方法 schedule-on§

method schedule-on(Supply:D: Scheduler $scheduler)

在指定调度程序上运行 emit、done 和 quit 回调。

这对于需要从 GUI 线程运行某些操作的 GUI 工具包非常有用。

等待供应完成的方法§

方法 wait§

method wait(Supply:D:)

抽取其被调用的 Supply,并阻塞执行,直到供应为 done(在这种情况下,它将计算为在 Supply 上发出的最终值,或者如果没有发出值则为 Nil)或 quit(在这种情况下,它将抛出传递给 quit 的异常)。

my $s = Supplier.new;
start {
  sleep 1;
  say "One second: running.";
  sleep 1;
  $s.emit(42);
  $s.done;
}
$s.Supply.wait;
say "Two seconds: done";

方法 list§

multi method list(Supply:D:)

抽取其被调用的 Supply,并返回一个惰性列表,该列表将随着 Supply 发出值而具体化。一旦 Supplydone,列表将终止。如果 Supplyquit,则一旦达到惰性列表中的该点,就会抛出异常。

方法 Seq§

method Seq(Supply:D:)

返回一个 Seq,其中包含一个迭代器,该迭代器包含 Supply 包含的值。

方法 grab§

method grab(Supply:D: &when-done --> Supply:D)

抽取其被调用的 Supply。当它为 done 时,调用 &when-done,然后在结果 Supply 上发出它返回的值列表。如果原始 Supplyquit,则异常将立即传达给返回 Supply

my $s = Supply.from-list(41032);
my $t = $s.grab(&sum);
$t.tap(&say);           # OUTPUT: «19␤»

方法 reverse§

method reverse(Supply:D: --> Supply:D)

抽取其被调用的 Supply。一旦该 Supply 发出 done,它发出的所有值都将按相反的顺序在返回的 Supply 上发出。如果原始 Supplyquit,则异常将立即传达给返回 Supply

my $s = Supply.from-list(123);
my $t = $s.reverse;
$t.tap(&say);           # OUTPUT: «3␤2␤1␤»

方法 sort§

method sort(Supply:D: &custom-routine-to-use? --> Supply:D)

点击它被调用的Supply。一旦该Supply发出done,它发出的所有值都将被排序,并且结果将按排序顺序在返回的Supply上发出。可以选择接受比较器 Block。如果原始Supplyquit,则异常将立即传达给返回的Supply

my $s = Supply.from-list(41032);
my $t = $s.sort();
$t.tap(&say);           # OUTPUT: «2␤3␤4␤10␤»

方法 collate§

method collate(Supply:D:)

点击它被调用的Supply。一旦该Supply发出done,它发出的所有值都将被排序,同时考虑 Unicode 音节特征。将返回一个新的Supply,其中发出已排序的值。有关整理排序的更多详细信息,请参见 Any.collate

my $s = Supply.from-list(<ä a o ö>);
my $t = $s.collate();
$t.tap(&say);           # OUTPUT: «a␤ä␤o␤ö␤»

方法 reduce§

method reduce(Supply:D: &with --> Supply:D)

创建一个“缩减”供应,它将发出一个具有与 List.reduce 相同语义的单一值。

my $supply = Supply.from-list(1..5).reduce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «15␤»

返回另一个 Supply 的方法§

方法 from-list§

method from-list(Supply:U: +@values --> Supply:D)

根据传递给此方法的值创建一个按需供应。

my $s = Supply.from-list(123);
$s.tap(&say);           # OUTPUT: «1␤2␤3␤»

方法 share§

method share(Supply:D: --> Supply:D)

从按需供应创建实时供应,从而可以在多个点击中共享按需供应的值,而不是每个点击都看到按需供应的所有值的副本。

# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3" 
my $s = Supply.interval(1).share;
$s.tap: { "first $_".say };
sleep 1.1;
$s.tap: { "second $_".say };
sleep 2

方法 flat§

method flat(Supply:D: --> Supply:D)

创建一个供应,在该供应中,在再次发出之前,会展平在给定供应中看到的所有值。

方法 do§

method do(Supply:D: &do --> Supply:D)

创建一个供应,在该供应中,在给定供应中看到的所有值都会再次发出。仅为其副作用执行的给定代码保证一次仅由一个线程执行。

方法 on-close§

method on-close(Supply:D: &on-close --> Supply:D)

返回一个新的Supply,该Supply将在该SupplyTap 关闭时运行&on-close。这包括对Supply链接的进一步操作。(例如,$supply.on-close(&on-close).map(*.uc))。当使用reactsupply块时,通常使用 CLOSE 相位器是一个更好的选择。

my $s = Supplier.new;
my $tap = $s.Supply.on-close({ say "Tap closed" }).tap(
    -> $v { say "the value is $v" },
    done    => { say "Supply is done" },
    quit    => -> $ex { say "Supply finished with error $ex" },
);
 
$s.emit('Raku');
$tap.close;        # OUTPUT: «Tap closed␤»

方法 interval§

method interval(Supply:U: $interval$delay = 0:$scheduler = $*SCHEDULER --> Supply:D)

创建一个供应,该供应从调用开始每$interval秒发出一个值,从$delay秒开始。发出的值是一个整数,从 0 开始,并且对于发出的每个值都会递增 1。

实现可能会将过小和负值视为它们支持的最低分辨率,可能在这种情况下发出警告;例如,将0.0001视为0.001。对于 6.d 语言版本,指定的最大值是0.001

方法 grep§

method grep(Supply:D: Mu $test --> Supply:D)

创建一个新供应,它仅发出与 $test 智能匹配的原始供应中的那些值。

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $ints     = $all.grep(Int);
$ints.tap(&say);
$supplier.emit($_for 1'a string'3.14159;   # prints only 1

方法 map§

method map(Supply:D: &mapper --> Supply:D)

返回一个新供应,它通过 &mapper 映射给定供应的每个值,并将其发出到新供应。

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $double   = $all.map(-> $value { $value * 2 });
$double.tap(&say);
$supplier.emit(4);           # OUTPUT: «8»

方法 batch§

method batch(Supply:D: :$elems:$seconds --> Supply:D)

创建一个新供应,它按批次处理给定供应的值,按批次中的元素数量(使用 :elems)或持续时间(使用 :seconds)或两者。当供应完成时,任何剩余的值都会在最后一个批次中发出。

注意:自 Rakudo 2020.12 版本以来,:seconds 参数具有毫秒粒度:例如,1 毫秒持续时间可以指定为 :seconds(0.001)。在 Rakudo 2020.12 版本之前,:seconds 参数具有秒粒度。

方法 elems§

method elems(Supply:D: $seconds? --> Supply:D)

创建一个新供应,其中会发出对所见值数量的更改。如果你只想每隔几秒更新一次,它还可以选择一个间隔(以秒为单位)。

方法 head§

multi method head(Supply:D:)
multi method head(Supply:D: Callable:D $limit)
multi method head(Supply:D: \limit)

创建一个具有与 List.head 相同语义的“头”供应。

my $s = Supply.from-list(41032);
my $hs = $s.head(2);
$hs.tap(&say);           # OUTPUT: «4␤10␤»

自 2020.07 版本以来,WhateverCode 也可以使用,同样具有与 List.head 相同的语义

my $s = Supply.from-list(410321);
my $hs = $s.head* - 2);
$hs.tap(&say);           # OUTPUT: «4␤10␤3␤»

方法 tail§

multi method tail(Supply:D:)
multi method tail(Supply:D: Callable:D $limit)
multi method tail(Supply:D: \limit)

创建一个具有与 List.tail 相同语义的“尾”供应。

my $s = Supply.from-list(41032);
my $ts = $s.tail(2);
$ts.tap(&say);           # OUTPUT: «3␤2␤»

你可以使用 WhateverInf 调用 .tail;它将返回一个等效于初始供应的新供应。使用 WhateverCode 调用它等效于跳过直到该数字。

my $s = Supply.from-list(41032);
my $ts = $s.tail* - 2 );
$ts.tap(&say);           # OUTPUT: «3␤2␤»

此功能仅在 Raku 的 2020.07 版本中可用。

方法 first§

method first(Supply:D: :$end|c)

此方法创建一个供应,其中包含第一个元素,或者如果可选命名参数 :end 为真,则包含最后一个元素,该供应是通过对调用者调用 grep 方法(任何剩余参数作为参数)创建的。如果没有剩余参数,则此方法等效于在调用者上调用 headtail 方法(没有参数),具体取决于命名参数 :end

my $rand = supply { emit (rand × 100).floor for ^∞ };
my $first-prime = $rand.first: &is-prime;
# output the first prime from the endless random number supply $rand, 
# then the $first-prime supply reaches its end 
$first-prime.tap: &say;

方法 split§

multi method split(Supply:D: \delimiter)
multi method split(Supply:D: \delimiter, \limit)

此方法创建一个供应,其中包含从调用者收集的字符串上调用的 Str.split 方法返回的值。有关 \delimiter 参数以及可用的额外命名参数的详细信息,请参阅 Str.split。创建的供应可以通过 \limit 参数进行限制,请参阅 .head

my $words = Supply.from-list(<Hello World From Raku!>);
my $s = $words.split(/ <?upper> /2:skip-empty);
$s.tap(&say); # OUTPUT: «Hello␤World␤»

方法 rotate§

method rotate(Supply:D: $rotate = 1)

$rotate 为正数时,创建一个元素向左旋转的供应;否则向右旋转,在这种情况下,在返回新供应之前,会先轻触调用者。

my $supply = Supply.from-list(<a b c d e>).rotate(2);
$supply.tap(&say); # OUTPUT: «c␤d␤e␤a␤b␤»

注意:自 Rakudo 2020.06 起可用。

方法 rotor§

method rotor(Supply:D: @cycle --> Supply:D)

创建一个“旋转”供应,其语义与 List.rotor 相同。

方法 delayed§

method delayed(Supply:D: $seconds:$scheduler = $*SCHEDULER --> Supply:D)

创建一个新供应,其中通过给定供应的所有值都会被发出,但延迟给定的秒数。

方法 throttle§

multi method throttle(Supply:D:
      Int()  $elems,
      Real() $seconds,
      Real() $delay  = 0,
      :$scheduler    = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )
multi method throttle(Supply:D:
      Int()  $elems,
      Callable:D $process,
      Real() $delay = 0,
      :$scheduler   = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )

.throttle 的参数定义如下

参数含义
$limit,值/时间或同时处理
$seconds 或 $process时间单位/同时处理的代码
$delay = 0,开始前的初始延迟(以秒为单位)
:$control,用于发出控制消息的供应(可选)
:$status,用于轻触状态消息的供应(可选)
:$bleed,用于泄漏消息的供应(可选)
:$vent-at,当缓冲过多时泄漏(可选)
:$scheduler,要使用的计划程序,默认情况下为 $*SCHEDULER

此方法从给定的供应生成一个 Supply,但确保通过的消息数量受到限制。

它有两种操作模式:按时间单位或按代码块的最大执行次数:这由第二个位置参数的类型决定。

第一个位置参数指定应应用的限制。

如果第二个位置参数是 Callable,则限制表示执行 Callable 的并行进程的最大数量,Callable 会收到接收到的值。在这种情况下,发出的值将是通过启动 Callable 获得的 Promise

如果第二个位置参数是一个实数,则将其解释为时间单位(以秒为单位)。如果您将 .1 指定为值,则它将确保您不会超过每十分之一秒的限制。

如果超过限制,则传入的消息将被缓冲,直到有空间再次传递/执行 Callable。

第三个位置参数是可选的:它表示在传递任何值之前,节流将等待的秒数。

:control 命名参数可选择指定一个 Supply,你可以在操作过程中使用它来控制节流。可以发送的消息是“键:值”形式的字符串。请参阅以下内容,了解你可以发送哪些类型的消息来控制节流。

:status 命名参数可选择指定一个 Supply,它将接收任何状态消息。如果指定,它至少会在原始 Supply 耗尽后发送一条状态消息。请参见下面的 状态消息

:bleed 命名参数可选择指定一个 Supply,它将接收任何显式泄漏(带有 bleed 控制消息)或自动泄漏(如果 vent-at 处于活动状态)的值。

:vent-at 命名参数指示在将任何其他值路由到 :bleed Supply 之前可以缓冲的值的数量。如果未指定,则默认为 0(导致不发生自动泄漏)。仅在还指定了 :bleed Supply 时才有意义。

:scheduler 命名参数指示要使用的调度程序。默认为 $*SCHEDULER

控制消息§

这些消息可以发送到 :control Supply。控制消息由“键: 值”形式的字符串组成,例如“limit: 4”。

  • limit

将消息数量(最初在第一个位置中给出)更改为给定的值。

  • bleed

将给定的缓冲消息数量路由到 :bleed Supply。

  • vent-at

更改在发生自动泄漏之前缓冲的最大值数量。如果该值低于之前的值,将导致立即重新路由缓冲值以匹配新的最大值。

  • status

使用给定的 ID 向 :status Supply 发送状态消息。

状态消息§

状态返回消息是一个哈希,具有以下键

  • allowed

仍然允许传递/执行的消息/可调用项的当前数量。

  • bled

路由到 :bleed Supply 的消息数量。

  • buffered

由于溢出而当前缓冲的消息数量。

  • emitted

已发送(通过)的消息数量。

  • id

此状态消息的 ID(单调递增的数字)。如果你想记录状态消息,这很方便。

  • limit

limit

  • vent-at

正在应用的当前限制。

vent-at

在自动重新路由到 :bleed Supply 之前,可以缓冲的最大消息数量。

my $s = Supply.from-list(^6);  # set up supply 
my $t = $s.throttle: 3,        # only allow 3 at a time 
{                              # code block to run 
    say "running $_";          # announce we've started 
    sleep rand;                # wait some random time 
    say "done $_"              # announce we're done 
}                              # don't need ; because } at end of line 
$t.wait;                       # wait for the supply to be done

示例§

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5

方法 stable§

method stable(Supply:D: $time:$scheduler = $*SCHEDULER --> Supply:D)

创建一个新的供应,它只传递给定供应中流经的值,如果它没有被给定 $time(以秒为单位)中的另一个值取代。可以选择使用另一个调度程序,而不是默认调度程序,使用 :scheduler 参数。

为了澄清上述内容,如果在超时 $time 期间,向 Supplier 发射了其他值,除了最后一个值之外的所有值都将被丢弃。在超时期间,每次向 Supplier 发射一个其他值时,都会重置 $time

在处理 UI 输入时,此方法非常有用,在处理 UI 输入时,不希望在用户停止键入一段时间后执行操作,而不是在每次击键时执行操作。

my $supplier = Supplier.new;
my $supply1 = $supplier.Supply;
$supply1.tap(-> $v { say "Supply1 got: $v" });
$supplier.emit(42);
 
my Supply $supply2 = $supply1.stable(5);
$supply2.tap(-> $v { say "Supply2 got: $v" });
sleep(3);
$supplier.emit(43);  # will not be seen by $supply2 but will reset $time 
$supplier.emit(44);
sleep(10);
# OUTPUT: «Supply1 got: 42␤Supply1 got: 43␤Supply1 got: 44␤Supply2 got: 44␤» 

如上所示,$supply1 接收了向 Supplier 发射的所有值,而 $supply2 只接收了一个值。43 被丢弃,因为它后面跟着另一个“最后一个”值 44,该值被保留并在大约八秒后发送到 $supply2,这是因为超时 $time 在三秒后被重置。

方法 produce§

method produce(Supply:D: &with --> Supply:D)

创建一个“生产”供应,其语义与 List.produce 相同。

my $supply = Supply.from-list(1..5).produce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «1␤3␤6␤10␤15␤»

方法 lines§

method lines(Supply:D: :$chomp = True --> Supply:D)

创建一个供应,它将按行发出来自通常由某些异步 I/O 操作创建的供应的字符。可选的 :chomp 参数指示是否删除行分隔符:默认值为 True

方法 words§

method words(Supply:D: --> Supply:D)

创建一个供应,它将按单词发出来自通常由某些异步 I/O 操作创建的供应的字符。

my $s = Supply.from-list("Hello Word!".comb);
my $ws = $s.words;
$ws.tap(&say);           # OUTPUT: «Hello␤Word!␤»

方法 unique§

method unique(Supply:D: :$as:$with:$expires --> Supply:D)

创建一个只提供唯一值的供应,如可选的 :as:with 参数所定义(与 unique 相同)。可选的 :expires 参数表示在“重置”之前等待多长时间(以秒为单位),并且不考虑某个值已被看到,即使它与旧值相同。

方法 repeated§

method repeated(Supply:D: :&as:&with)

创建一个只提供重复值的供应,如可选的 :as:with 参数所定义(与 unique 相同)。

my $supply = Supply.from-list(<a A B b c b C>).repeated(:as(&lc));
$supply.tap(&say);           # OUTPUT: «A␤b␤b␤C␤»

有关使用其子表单的更多示例,请参见 repeated

注意:自版本 6.e (Rakudo 2020.01 及更高版本) 起可用。

方法 squish§

method squish(Supply:D: :$as:$with --> Supply:D)

创建一个只提供唯一值的供应,如可选的 :as:with 参数所定义(与 squish 相同)。

方法 max§

method max(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

创建一个供应,仅当给定供应中的值大于之前看到的任何值时才发出这些值。换句话说,它将从一个持续上升的供应中发出所有值。从一个持续下降的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.max 一样。

方法 min§

method min(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

创建一个供应,仅当给定供应中的值小于之前看到的任何值时才发出这些值。换句话说,它将从一个持续下降的供应中发出所有值。从一个持续上升的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.min 一样。

方法 minmax§

method minmax(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

创建一个供应,每当从给定的供应中看到新的最小值或最大值时,它都会发出一个范围。可选参数指定比较器,就像使用 Any.minmax 一样。

方法 skip§

method skip(Supply:D: Int(Cool$number = 1 --> Supply:D)

返回一个新的 Supply,它将发出给定 Supply 中的所有值,除了前 $number 个值,这些值将被丢弃。

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply = $supply.skip(3);
$supply.tap({ say $_ });
$supplier.emit($_for 1..10# OUTPUT: «4␤5␤6␤7␤8␤9␤10␤» 

方法 start§

method start(Supply:D: &startee --> Supply:D)

创建一个供应的供应。对于原始供应中的每个值,代码对象都会在另一个线程上安排,并返回一个供应,其中包含单个值(如果代码成功)或一个退出且没有值的供应(如果代码失败)。

这对于异步启动您不阻塞的工作非常有用。

使用 migrate 将值再次合并到一个供应中。

方法 migrate§

method migrate(Supply:D: --> Supply:D)

将自身具有类型为 Supply 的值的 Supply 作为输入。每次外部 Supply 发出一个新的 Supply 时,它都会被点击,并发出其值。任何先前点击的 Supply 都将被关闭。这对于在不同的数据源之间迁移并仅关注最新数据源非常有用。

例如,想象一个用户可以在不同股票之间切换的应用程序。当他们切换到一个新的股票时,将建立到 Web 套接字的连接以获取最新值,并且应关闭任何先前的连接。通过 Web 套接字传入的每个值流都将表示为一个供应,这些供应本身会发出到要监视的最新数据源的供应中。migrate 方法可用于将此供应的供应平展为用户关心的当前值的单个供应。

以下是此类程序的简单模拟

my Supplier $stock-sources .= new;
 
sub watch-stock($symbol{
    $stock-sources.emit: supply {
        say "Starting to watch $symbol";
        whenever Supply.interval(1{
            emit "$symbol: 111." ~ 99.rand.Int;
        }
        CLOSE say "Lost interest in $symbol";
    }
}
 
$stock-sources.Supply.migrate.tap: *.say;
 
watch-stock('GOOG');
sleep 3;
watch-stock('AAPL');
sleep 3;

它会生成类似这样的输出

Starting to watch GOOG
GOOG: 111.67
GOOG: 111.20
GOOG: 111.37
Lost interest in GOOG
Starting to watch AAPL
AAPL: 111.55
AAPL: 111.6
AAPL: 111.6

组合供应的方法§

方法 merge§

method merge(Supply @*supplies --> Supply:D)

创建一个供应,其中会发出从给定供应中看到的任何值。只有在所有给定供应都完成时,结果供应才会完成。也可以作为类方法调用。

方法 zip§

method zip(**@s:&with)

创建一个供应,只要在所有供应上看到新值,就会立即发出组合值。默认情况下,将创建List,但可以通过使用 :with 参数指定自己的组合器来更改此设置。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。

这也可以用作类方法;如果将其用作对象方法,则相应的供应将是组合的供应之一(没有特殊处理)。

方法 zip-latest§

method zip-latest(**@s:&with:$initial )

创建一个供应,只要在任何供应上看到新值,就会立即发出组合值。默认情况下,将创建List,但可以通过使用 :with 参数指定自己的组合器来更改此设置。可选的 :initial 参数可用于指示组合值的初始状态。默认情况下,在结果供应上发出第一个组合值之前,所有供应都必须至少发出一个值。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。

作为供应公开的 I/O 功能§

子例程 signal§

sub signal(*@signals:$scheduler = $*SCHEDULER)

为指定的 Signal 枚举(例如 SIGINT)创建一个供应,以及一个可选的 :scheduler 参数。收到的任何信号都将在供应上发出。例如

signal(SIGINT).tap{ say "Thank you for your attention"exit 0 } );

将捕获 Control-C,表示感谢,然后退出。

要从信号号转到Signal,可以执行类似以下操作

signal(Signal(2)).tap-> $sig { say "Received signal: $sig" } );

可以通过检查 Signal::.keys(就像检查任何枚举一样)来找到受支持信号的列表。有关枚举工作原理的更多详细信息,请参见枚举

注意:Rakudo 2018.05 及更早版本存在一个错误,导致在某些系统上信号的数值不正确。例如,Signal(10) 返回 SIGBUS,即使它在特定系统上实际上是 SIGUSR1。也就是说,在 2018.04、2018.04.1 和 2018.05 以外的所有 Rakudo 版本上,使用 signal(SIGUSR1) 都能按预期工作,在这些版本中,可以通过使用 signal(SIGBUS) 来实现预期行为。这些问题已在 2018.05 之后的 Rakudo 版本中得到解决。

方法 IO::Notification.watch-path§

method watch-path($path --> Supply:D)

创建一个供应,OS 将向其发出值以指示给定路径的文件系统上的更改。此外,还具有一个快捷方式,即 IO 对象上的 watch 方法,如下所示

IO::Notification.watch-path(".").act{ say "$^file changed" } );
".".IO.watch.act(                     { say "$^file changed" } );   # same 

Typegraph§

Supply 的类型关系
raku-type-graph Supply Supply Any Any Supply->Any Mu Mu Any->Mu

展开上面的图表