does Awaitable
供应是一种线程安全的异步数据流,类似于 Channel,但它可以有多个订阅者(抽头),所有订阅者都获得流经供应的相同值。
它是 观察者模式 的线程安全实现,并且对于支持 Raku 中的响应式编程至关重要。
有两种类型的供应:live 和 on demand。当进入 live 供应时,抽头只会看到在创建抽头之后流经供应的值。此类供应通常本质上是无限的,例如鼠标移动。关闭此类抽头不会阻止鼠标事件发生,这只意味着这些值将被忽略。所有抽头者看到相同的值流。
on demand 供应上的抽头将启动值的生成,并且再次抽头供应可能会产生一组新值。例如,Supply.interval 每次抽头时都会生成一个具有适当间隔的新计时器。如果抽头关闭,计时器将停止向该抽头发出值。
live Supply 从 Supplier 工厂方法 Supply 获得。通过调用 Supplier 对象上的 emit 发出新值。
my = Supplier.new;my = .Supply;.tap(-> );.emit(42); # Will cause the tap to output "42"
live 方法 在实时供应上返回 True。工厂方法(例如 interval、from-list)将返回按需供应。
可以使用 Supplier::Preserving 创建一个实时 Supply,该 Supply 会保留值,直到第一次抽头。
可以在 并发页面 中找到更多示例。
返回抽头的方法§
方法 tap§
method tap(Supply: = -> $ ,: = -> ,: = -> ,: = -> $ )
创建新的 tap(某种订阅),除了所有现有 tap。第一个位置参数是当通过 emit 调用获得新值时将调用的代码段。
&done 回调可以在多种情况下调用:如果正在 tap 供应块,则在到达 done 例程时;如果正在 tap 供应块,则当供应块到达末尾时将自动触发;如果在父 Supplier 上调用 done 方法(在供应块的情况下,如果 whenever 引用多个 Supplier,则必须调用它们的所有 done 方法才能触发 tap 的 &done 回调,因为块随后将到达其末尾)。
如果 tap 位于因错误而退出的供应块上,则调用 &quit 回调。如果在父 Supplier 上调用 quit 方法,也会调用它(在供应块的情况下,任何一个 Supplier 因未捕获的异常而退出都会调用 &quit 回调,因为块将因错误而退出)。错误作为参数传递给回调。
一旦创建 Tap 对象,就会调用 &tap 回调,该对象作为参数传递给回调。回调在 emit/done/quit 之前调用,提供获取 Tap 对象的可靠方式。此功能有用的一个情况是当 Supply 开始同步发出值时,因为对 .tap 的调用在发出完成之前不会返回 Tap 对象,从而防止在需要时停止它。
方法 tap 返回类型为 Tap 的对象,可以在其上调用 close 方法以取消订阅。
my = Supply.from-list(0 .. 5);my = .tap(-> , done => );
生成
0 1 2 3 4 5 no more ticks
方法 act§
method act(Supply: , *)
使用给定的代码在给定的供应上创建 tap。与 tap 不同,保证仅由一个线程同时执行给定的代码。
实用方法§
方法 Capture§
method Capture(Supply: --> Capture)
等效于对调用者调用 .List.Capture。
方法 Channel§
method Channel(Supply: --> Channel)
返回一个 Channel 对象,该对象将接收供应的所有未来值,并且当供应完成时将被 close,当供应退出时将退出(因错误关闭)。
方法 Promise§
method Promise(Supply: --> Promise)
返回一个 Promise,当 Supply 为 done 时,该 Promise 将被保留。如果 Supply 还发出任何值,则 Promise 将保留最终值。否则,它将保留 Nil。如果 Supply 以 quit 而不是 done 结束,则 Promise 将因该异常而中断。
my = Supplier.new;my = .Supply;my = .Promise;.then(-> );.emit('cha'); # not output yet.done(); # got cha
当处理往往只产生一个值、仅对最终值感兴趣或仅与完成(成功或不成功)相关联的供应时,Promise 方法最为有用。
方法 live§
method live(Supply: --> Bool)
如果供应“处于活动状态”,即值在到达后立即发送到抽头,则返回 True。在默认 Supply 中始终返回 True(但例如在从 Supply.from-list 返回的供应中,它为 False)。
say Supplier.new.Supply.live; # OUTPUT: «True»
方法 schedule-on§
method schedule-on(Supply: Scheduler )
在指定调度程序上运行 emit、done 和 quit 回调。
这对于需要从 GUI 线程运行某些操作的 GUI 工具包非常有用。
等待供应完成的方法§
方法 wait§
method wait(Supply:)
抽取其被调用的 Supply,并阻塞执行,直到供应为 done(在这种情况下,它将计算为在 Supply 上发出的最终值,或者如果没有发出值则为 Nil)或 quit(在这种情况下,它将抛出传递给 quit 的异常)。
my = Supplier.new;start.Supply.wait;say "Two seconds: done";
方法 list§
multi method list(Supply:)
抽取其被调用的 Supply,并返回一个惰性列表,该列表将随着 Supply 发出值而具体化。一旦 Supply 为 done,列表将终止。如果 Supply 为 quit,则一旦达到惰性列表中的该点,就会抛出异常。
方法 Seq§
method Seq(Supply:)
返回一个 Seq,其中包含一个迭代器,该迭代器包含 Supply 包含的值。
方法 grab§
method grab(Supply: --> Supply)
抽取其被调用的 Supply。当它为 done 时,调用 &when-done,然后在结果 Supply 上发出它返回的值列表。如果原始 Supply 为 quit,则异常将立即传达给返回 Supply。
my = Supply.from-list(4, 10, 3, 2);my = .grab();.tap(); # OUTPUT: «19»
方法 reverse§
method reverse(Supply: --> Supply)
抽取其被调用的 Supply。一旦该 Supply 发出 done,它发出的所有值都将按相反的顺序在返回的 Supply 上发出。如果原始 Supply 为 quit,则异常将立即传达给返回 Supply。
my = Supply.from-list(1, 2, 3);my = .reverse;.tap(); # OUTPUT: «321»
方法 sort§
method sort(Supply: ? --> Supply)
点击它被调用的Supply。一旦该Supply发出done,它发出的所有值都将被排序,并且结果将按排序顺序在返回的Supply上发出。可以选择接受比较器 Block。如果原始Supplyquit,则异常将立即传达给返回的Supply。
my = Supply.from-list(4, 10, 3, 2);my = .sort();.tap(); # OUTPUT: «23410»
方法 collate§
method collate(Supply:)
点击它被调用的Supply。一旦该Supply发出done,它发出的所有值都将被排序,同时考虑 Unicode 音节特征。将返回一个新的Supply,其中发出已排序的值。有关整理排序的更多详细信息,请参见 Any.collate。
my = Supply.from-list(<ä a o ö>);my = .collate();.tap(); # OUTPUT: «aäoö»
方法 reduce§
method reduce(Supply: --> Supply)
创建一个“缩减”供应,它将发出一个具有与 List.reduce 相同语义的单一值。
my = Supply.from-list(1..5).reduce();.tap(-> ); # OUTPUT: «15»
返回另一个 Supply 的方法§
方法 from-list§
method from-list(Supply: + --> Supply)
根据传递给此方法的值创建一个按需供应。
my = Supply.from-list(1, 2, 3);.tap(); # OUTPUT: «123»
方法 share§
method share(Supply: --> Supply)
从按需供应创建实时供应,从而可以在多个点击中共享按需供应的值,而不是每个点击都看到按需供应的所有值的副本。
# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3"my = Supply.interval(1).share;.tap: ;sleep 1.1;.tap: ;sleep 2
方法 flat§
method flat(Supply: --> Supply)
创建一个供应,在该供应中,在再次发出之前,会展平在给定供应中看到的所有值。
方法 do§
method do(Supply: --> Supply)
创建一个供应,在该供应中,在给定供应中看到的所有值都会再次发出。仅为其副作用执行的给定代码保证一次仅由一个线程执行。
方法 on-close§
method on-close(Supply: --> Supply)
返回一个新的Supply,该Supply将在该Supply的 Tap 关闭时运行&on-close。这包括对Supply链接的进一步操作。(例如,$supply.on-close(&on-close).map(*.uc))。当使用react或supply块时,通常使用 CLOSE 相位器是一个更好的选择。
my = Supplier.new;my = .Supply.on-close().tap(-> ,done => ,quit => -> ,);.emit('Raku');.close; # OUTPUT: «Tap closed»
方法 interval§
method interval(Supply: , = 0, : = --> Supply)
创建一个供应,该供应从调用开始每$interval秒发出一个值,从$delay秒开始。发出的值是一个整数,从 0 开始,并且对于发出的每个值都会递增 1。
实现可能会将过小和负值视为它们支持的最低分辨率,可能在这种情况下发出警告;例如,将0.0001视为0.001。对于 6.d 语言版本,指定的最大值是0.001。
方法 grep§
method grep(Supply: Mu --> Supply)
创建一个新供应,它仅发出与 $test 智能匹配的原始供应中的那些值。
my = Supplier.new;my = .Supply;my = .grep(Int);.tap();.emit() for 1, 'a string', 3.14159; # prints only 1
方法 map§
method map(Supply: --> Supply)
返回一个新供应,它通过 &mapper 映射给定供应的每个值,并将其发出到新供应。
my = Supplier.new;my = .Supply;my = .map(-> );.tap();.emit(4); # OUTPUT: «8»
方法 batch§
method batch(Supply: :, : --> Supply)
创建一个新供应,它按批次处理给定供应的值,按批次中的元素数量(使用 :elems)或持续时间(使用 :seconds)或两者。当供应完成时,任何剩余的值都会在最后一个批次中发出。
注意:自 Rakudo 2020.12 版本以来,:seconds 参数具有毫秒粒度:例如,1 毫秒持续时间可以指定为 :seconds(0.001)。在 Rakudo 2020.12 版本之前,:seconds 参数具有秒粒度。
方法 elems§
method elems(Supply: ? --> Supply)
创建一个新供应,其中会发出对所见值数量的更改。如果你只想每隔几秒更新一次,它还可以选择一个间隔(以秒为单位)。
方法 head§
multi method head(Supply:)multi method head(Supply: Callable )multi method head(Supply: \limit)
创建一个具有与 List.head 相同语义的“头”供应。
my = Supply.from-list(4, 10, 3, 2);my = .head(2);.tap(); # OUTPUT: «410»
自 2020.07 版本以来,WhateverCode 也可以使用,同样具有与 List.head 相同的语义
my = Supply.from-list(4, 10, 3, 2, 1);my = .head( * - 2);.tap(); # OUTPUT: «4103»
方法 tail§
multi method tail(Supply:)multi method tail(Supply: Callable )multi method tail(Supply: \limit)
创建一个具有与 List.tail 相同语义的“尾”供应。
my = Supply.from-list(4, 10, 3, 2);my = .tail(2);.tap(); # OUTPUT: «32»
你可以使用 Whatever 或 Inf 调用 .tail;它将返回一个等效于初始供应的新供应。使用 WhateverCode 调用它等效于跳过直到该数字。
my = Supply.from-list(4, 10, 3, 2);my = .tail( * - 2 );.tap(); # OUTPUT: «32»
此功能仅在 Raku 的 2020.07 版本中可用。
方法 first§
method first(Supply: :, |c)
此方法创建一个供应,其中包含第一个元素,或者如果可选命名参数 :end 为真,则包含最后一个元素,该供应是通过对调用者调用 grep 方法(任何剩余参数作为参数)创建的。如果没有剩余参数,则此方法等效于在调用者上调用 head 或 tail 方法(没有参数),具体取决于命名参数 :end。
my = supply ;my = .first: ;# output the first prime from the endless random number supply $rand,# then the $first-prime supply reaches its end.tap: ;
方法 split§
multi method split(Supply: \delimiter)multi method split(Supply: \delimiter, \limit)
此方法创建一个供应,其中包含从调用者收集的字符串上调用的 Str.split 方法返回的值。有关 \delimiter 参数以及可用的额外命名参数的详细信息,请参阅 Str.split。创建的供应可以通过 \limit 参数进行限制,请参阅 .head。
my = Supply.from-list(<Hello World From Raku!>);my = .split(/ /, 2, :skip-empty);.tap(); # OUTPUT: «HelloWorld»
方法 rotate§
method rotate(Supply: = 1)
当 $rotate 为正数时,创建一个元素向左旋转的供应;否则向右旋转,在这种情况下,在返回新供应之前,会先轻触调用者。
my = Supply.from-list(<a b c d e>).rotate(2);.tap(); # OUTPUT: «cdeab»
注意:自 Rakudo 2020.06 起可用。
方法 rotor§
method rotor(Supply: --> Supply)
创建一个“旋转”供应,其语义与 List.rotor 相同。
方法 delayed§
method delayed(Supply: , : = --> Supply)
创建一个新供应,其中通过给定供应的所有值都会被发出,但延迟给定的秒数。
方法 throttle§
multi method throttle(Supply:Int() ,Real() ,Real() = 0,: = ,:,:,:,:,)
multi method throttle(Supply:Int() ,Callable ,Real() = 0,: = ,:,:,:,:,)
.throttle 的参数定义如下
| 参数 | 含义 |
|---|---|
| $limit, | 值/时间或同时处理 |
| $seconds 或 $process | 时间单位/同时处理的代码 |
| $delay = 0, | 开始前的初始延迟(以秒为单位) |
| :$control, | 用于发出控制消息的供应(可选) |
| :$status, | 用于轻触状态消息的供应(可选) |
| :$bleed, | 用于泄漏消息的供应(可选) |
| :$vent-at, | 当缓冲过多时泄漏(可选) |
| :$scheduler, | 要使用的计划程序,默认情况下为 $*SCHEDULER |
此方法从给定的供应生成一个 Supply,但确保通过的消息数量受到限制。
它有两种操作模式:按时间单位或按代码块的最大执行次数:这由第二个位置参数的类型决定。
第一个位置参数指定应应用的限制。
如果第二个位置参数是 Callable,则限制表示执行 Callable 的并行进程的最大数量,Callable 会收到接收到的值。在这种情况下,发出的值将是通过启动 Callable 获得的 Promise。
如果第二个位置参数是一个实数,则将其解释为时间单位(以秒为单位)。如果您将 .1 指定为值,则它将确保您不会超过每十分之一秒的限制。
如果超过限制,则传入的消息将被缓冲,直到有空间再次传递/执行 Callable。
第三个位置参数是可选的:它表示在传递任何值之前,节流将等待的秒数。
:control 命名参数可选择指定一个 Supply,你可以在操作过程中使用它来控制节流。可以发送的消息是“键:值”形式的字符串。请参阅以下内容,了解你可以发送哪些类型的消息来控制节流。
:status 命名参数可选择指定一个 Supply,它将接收任何状态消息。如果指定,它至少会在原始 Supply 耗尽后发送一条状态消息。请参见下面的 状态消息。
:bleed 命名参数可选择指定一个 Supply,它将接收任何显式泄漏(带有 bleed 控制消息)或自动泄漏(如果 vent-at 处于活动状态)的值。
:vent-at 命名参数指示在将任何其他值路由到 :bleed Supply 之前可以缓冲的值的数量。如果未指定,则默认为 0(导致不发生自动泄漏)。仅在还指定了 :bleed Supply 时才有意义。
:scheduler 命名参数指示要使用的调度程序。默认为 $*SCHEDULER。
控制消息§
这些消息可以发送到 :control Supply。控制消息由“键: 值”形式的字符串组成,例如“limit: 4”。
limit
将消息数量(最初在第一个位置中给出)更改为给定的值。
bleed
将给定的缓冲消息数量路由到 :bleed Supply。
vent-at
更改在发生自动泄漏之前缓冲的最大值数量。如果该值低于之前的值,将导致立即重新路由缓冲值以匹配新的最大值。
status
使用给定的 ID 向 :status Supply 发送状态消息。
状态消息§
状态返回消息是一个哈希,具有以下键
allowed
仍然允许传递/执行的消息/可调用项的当前数量。
bled
路由到 :bleed Supply 的消息数量。
buffered
由于溢出而当前缓冲的消息数量。
emitted
已发送(通过)的消息数量。
id
此状态消息的 ID(单调递增的数字)。如果你想记录状态消息,这很方便。
limit
limit
vent-at
正在应用的当前限制。
vent-at
在自动重新路由到 :bleed Supply 之前,可以缓冲的最大消息数量。
my = Supply.from-list(^6); # set up supplymy = .throttle: 3, # only allow 3 at a time# don't need ; because } at end of line.wait; # wait for the supply to be done
running 0 running 1 running 2 done 2 running 3 done 1 running 4 done 4 running 5 done 0 done 3 done 5
方法 stable§
method stable(Supply: , : = --> Supply)
创建一个新的供应,它只传递给定供应中流经的值,如果它没有被给定 $time(以秒为单位)中的另一个值取代。可以选择使用另一个调度程序,而不是默认调度程序,使用 :scheduler 参数。
为了澄清上述内容,如果在超时 $time 期间,向 Supplier 发射了其他值,除了最后一个值之外的所有值都将被丢弃。在超时期间,每次向 Supplier 发射一个其他值时,都会重置 $time。
在处理 UI 输入时,此方法非常有用,在处理 UI 输入时,不希望在用户停止键入一段时间后执行操作,而不是在每次击键时执行操作。
my = Supplier.new;my = .Supply;.tap(-> );.emit(42);my Supply = .stable(5);.tap(-> );sleep(3);.emit(43); # will not be seen by $supply2 but will reset $time.emit(44);sleep(10);# OUTPUT: «Supply1 got: 42Supply1 got: 43Supply1 got: 44Supply2 got: 44»
如上所示,$supply1 接收了向 Supplier 发射的所有值,而 $supply2 只接收了一个值。43 被丢弃,因为它后面跟着另一个“最后一个”值 44,该值被保留并在大约八秒后发送到 $supply2,这是因为超时 $time 在三秒后被重置。
方法 produce§
method produce(Supply: --> Supply)
创建一个“生产”供应,其语义与 List.produce 相同。
my = Supply.from-list(1..5).produce();.tap(-> ); # OUTPUT: «1361015»
方法 lines§
method lines(Supply: : = True --> Supply)
创建一个供应,它将按行发出来自通常由某些异步 I/O 操作创建的供应的字符。可选的 :chomp 参数指示是否删除行分隔符:默认值为 True。
方法 words§
method words(Supply: --> Supply)
创建一个供应,它将按单词发出来自通常由某些异步 I/O 操作创建的供应的字符。
my = Supply.from-list("Hello Word!".comb);my = .words;.tap(); # OUTPUT: «HelloWord!»
方法 unique§
method unique(Supply: :, :, : --> Supply)
创建一个只提供唯一值的供应,如可选的 :as 和 :with 参数所定义(与 unique 相同)。可选的 :expires 参数表示在“重置”之前等待多长时间(以秒为单位),并且不考虑某个值已被看到,即使它与旧值相同。
方法 repeated§
method repeated(Supply: :, :)
创建一个只提供重复值的供应,如可选的 :as 和 :with 参数所定义(与 unique 相同)。
my = Supply.from-list(<a A B b c b C>).repeated(:as());.tap(); # OUTPUT: «AbbC»
有关使用其子表单的更多示例,请参见 repeated。
注意:自版本 6.e (Rakudo 2020.01 及更高版本) 起可用。
方法 squish§
method squish(Supply: :, : --> Supply)
创建一个只提供唯一值的供应,如可选的 :as 和 :with 参数所定义(与 squish 相同)。
方法 max§
method max(Supply: = :<cmp> --> Supply)
创建一个供应,仅当给定供应中的值大于之前看到的任何值时才发出这些值。换句话说,它将从一个持续上升的供应中发出所有值。从一个持续下降的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.max 一样。
方法 min§
method min(Supply: = :<cmp> --> Supply)
创建一个供应,仅当给定供应中的值小于之前看到的任何值时才发出这些值。换句话说,它将从一个持续下降的供应中发出所有值。从一个持续上升的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.min 一样。
方法 minmax§
method minmax(Supply: = :<cmp> --> Supply)
创建一个供应,每当从给定的供应中看到新的最小值或最大值时,它都会发出一个范围。可选参数指定比较器,就像使用 Any.minmax 一样。
方法 skip§
method skip(Supply: Int(Cool) = 1 --> Supply)
返回一个新的 Supply,它将发出给定 Supply 中的所有值,除了前 $number 个值,这些值将被丢弃。
my = Supplier.new;my = .Supply;= .skip(3);.tap();.emit() for 1..10; # OUTPUT: «45678910»
方法 start§
method start(Supply: --> Supply)
创建一个供应的供应。对于原始供应中的每个值,代码对象都会在另一个线程上安排,并返回一个供应,其中包含单个值(如果代码成功)或一个退出且没有值的供应(如果代码失败)。
这对于异步启动您不阻塞的工作非常有用。
使用 migrate 将值再次合并到一个供应中。
方法 migrate§
method migrate(Supply: --> Supply)
将自身具有类型为 Supply 的值的 Supply 作为输入。每次外部 Supply 发出一个新的 Supply 时,它都会被点击,并发出其值。任何先前点击的 Supply 都将被关闭。这对于在不同的数据源之间迁移并仅关注最新数据源非常有用。
例如,想象一个用户可以在不同股票之间切换的应用程序。当他们切换到一个新的股票时,将建立到 Web 套接字的连接以获取最新值,并且应关闭任何先前的连接。通过 Web 套接字传入的每个值流都将表示为一个供应,这些供应本身会发出到要监视的最新数据源的供应中。migrate 方法可用于将此供应的供应平展为用户关心的当前值的单个供应。
以下是此类程序的简单模拟
my Supplier .= new;sub watch-stock().Supply.migrate.tap: *.say;watch-stock('GOOG');sleep 3;watch-stock('AAPL');sleep 3;
它会生成类似这样的输出
Starting to watch GOOG GOOG: 111.67 GOOG: 111.20 GOOG: 111.37 Lost interest in GOOG Starting to watch AAPL AAPL: 111.55 AAPL: 111.6 AAPL: 111.6
组合供应的方法§
方法 merge§
method merge(Supply --> Supply)
创建一个供应,其中会发出从给定供应中看到的任何值。只有在所有给定供应都完成时,结果供应才会完成。也可以作为类方法调用。
方法 zip§
method zip(**, :)
创建一个供应,只要在所有供应上看到新值,就会立即发出组合值。默认情况下,将创建List,但可以通过使用 :with 参数指定自己的组合器来更改此设置。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。
这也可以用作类方法;如果将其用作对象方法,则相应的供应将是组合的供应之一(没有特殊处理)。
方法 zip-latest§
method zip-latest(**, :, : )
创建一个供应,只要在任何供应上看到新值,就会立即发出组合值。默认情况下,将创建List,但可以通过使用 :with 参数指定自己的组合器来更改此设置。可选的 :initial 参数可用于指示组合值的初始状态。默认情况下,在结果供应上发出第一个组合值之前,所有供应都必须至少发出一个值。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。
作为供应公开的 I/O 功能§
子例程 signal§
sub signal(*, : = )
为指定的 Signal 枚举(例如 SIGINT)创建一个供应,以及一个可选的 :scheduler 参数。收到的任何信号都将在供应上发出。例如
signal(SIGINT).tap( );
将捕获 Control-C,表示感谢,然后退出。
要从信号号转到Signal,可以执行类似以下操作
signal(Signal(2)).tap( -> );
可以通过检查 Signal::.keys(就像检查任何枚举一样)来找到受支持信号的列表。有关枚举工作原理的更多详细信息,请参见枚举。
注意:Rakudo 2018.05 及更早版本存在一个错误,导致在某些系统上信号的数值不正确。例如,Signal(10) 返回 SIGBUS,即使它在特定系统上实际上是 SIGUSR1。也就是说,在 2018.04、2018.04.1 和 2018.05 以外的所有 Rakudo 版本上,使用 signal(SIGUSR1) 都能按预期工作,在这些版本中,可以通过使用 signal(SIGBUS) 来实现预期行为。这些问题已在 2018.05 之后的 Rakudo 版本中得到解决。
方法 IO::Notification.watch-path§
method watch-path( --> Supply)
创建一个供应,OS 将向其发出值以指示给定路径的文件系统上的更改。此外,还具有一个快捷方式,即 IO 对象上的 watch 方法,如下所示
IO::Notification.watch-path(".").act( );".".IO.watch.act( ); # same
Typegraph§
Supply 的类型关系