does Awaitable
供应是一种线程安全的异步数据流,类似于 Channel
,但它可以有多个订阅者(抽头),所有订阅者都获得流经供应的相同值。
它是 观察者模式 的线程安全实现,并且对于支持 Raku 中的响应式编程至关重要。
有两种类型的供应:live
和 on demand
。当进入 live
供应时,抽头只会看到在创建抽头之后流经供应的值。此类供应通常本质上是无限的,例如鼠标移动。关闭此类抽头不会阻止鼠标事件发生,这只意味着这些值将被忽略。所有抽头者看到相同的值流。
on demand
供应上的抽头将启动值的生成,并且再次抽头供应可能会产生一组新值。例如,Supply.interval
每次抽头时都会生成一个具有适当间隔的新计时器。如果抽头关闭,计时器将停止向该抽头发出值。
live
Supply
从 Supplier
工厂方法 Supply
获得。通过调用 Supplier
对象上的 emit
发出新值。
my = Supplier.new;my = .Supply;.tap(-> );.emit(42); # Will cause the tap to output "42"
live 方法 在实时供应上返回 True
。工厂方法(例如 interval、from-list)将返回按需供应。
可以使用 Supplier::Preserving
创建一个实时 Supply
,该 Supply
会保留值,直到第一次抽头。
可以在 并发页面 中找到更多示例。
返回抽头的方法§
方法 tap§
method tap(Supply: = -> $ ,: = -> ,: = -> ,: = -> $ )
创建新的 tap(某种订阅),除了所有现有 tap。第一个位置参数是当通过 emit
调用获得新值时将调用的代码段。
&done
回调可以在多种情况下调用:如果正在 tap 供应块,则在到达 done
例程时;如果正在 tap 供应块,则当供应块到达末尾时将自动触发;如果在父 Supplier
上调用 done
方法(在供应块的情况下,如果 whenever
引用多个 Supplier
,则必须调用它们的所有 done
方法才能触发 tap 的 &done
回调,因为块随后将到达其末尾)。
如果 tap 位于因错误而退出的供应块上,则调用 &quit
回调。如果在父 Supplier
上调用 quit
方法,也会调用它(在供应块的情况下,任何一个 Supplier
因未捕获的异常而退出都会调用 &quit
回调,因为块将因错误而退出)。错误作为参数传递给回调。
一旦创建 Tap
对象,就会调用 &tap
回调,该对象作为参数传递给回调。回调在 emit
/done
/quit
之前调用,提供获取 Tap
对象的可靠方式。此功能有用的一个情况是当 Supply
开始同步发出值时,因为对 .tap
的调用在发出完成之前不会返回 Tap
对象,从而防止在需要时停止它。
方法 tap
返回类型为 Tap
的对象,可以在其上调用 close
方法以取消订阅。
my = Supply.from-list(0 .. 5);my = .tap(-> , done => );
生成
0 1 2 3 4 5 no more ticks
方法 act§
method act(Supply: , *)
使用给定的代码在给定的供应上创建 tap。与 tap
不同,保证仅由一个线程同时执行给定的代码。
实用方法§
方法 Capture§
method Capture(Supply: --> Capture)
等效于对调用者调用 .List.Capture
。
方法 Channel§
method Channel(Supply: --> Channel)
返回一个 Channel
对象,该对象将接收供应的所有未来值,并且当供应完成时将被 close
,当供应退出时将退出(因错误关闭)。
方法 Promise§
method Promise(Supply: --> Promise)
返回一个 Promise
,当 Supply
为 done
时,该 Promise
将被保留。如果 Supply
还发出任何值,则 Promise
将保留最终值。否则,它将保留 Nil
。如果 Supply
以 quit
而不是 done
结束,则 Promise
将因该异常而中断。
my = Supplier.new;my = .Supply;my = .Promise;.then(-> );.emit('cha'); # not output yet.done(); # got cha
当处理往往只产生一个值、仅对最终值感兴趣或仅与完成(成功或不成功)相关联的供应时,Promise
方法最为有用。
方法 live§
method live(Supply: --> Bool)
如果供应“处于活动状态”,即值在到达后立即发送到抽头,则返回 True
。在默认 Supply
中始终返回 True
(但例如在从 Supply.from-list
返回的供应中,它为 False
)。
say Supplier.new.Supply.live; # OUTPUT: «True»
方法 schedule-on§
method schedule-on(Supply: Scheduler )
在指定调度程序上运行 emit、done 和 quit 回调。
这对于需要从 GUI 线程运行某些操作的 GUI 工具包非常有用。
等待供应完成的方法§
方法 wait§
method wait(Supply:)
抽取其被调用的 Supply
,并阻塞执行,直到供应为 done
(在这种情况下,它将计算为在 Supply
上发出的最终值,或者如果没有发出值则为 Nil
)或 quit
(在这种情况下,它将抛出传递给 quit
的异常)。
my = Supplier.new;start.Supply.wait;say "Two seconds: done";
方法 list§
multi method list(Supply:)
抽取其被调用的 Supply
,并返回一个惰性列表,该列表将随着 Supply
发出值而具体化。一旦 Supply
为 done
,列表将终止。如果 Supply
为 quit
,则一旦达到惰性列表中的该点,就会抛出异常。
方法 Seq§
method Seq(Supply:)
返回一个 Seq
,其中包含一个迭代器,该迭代器包含 Supply
包含的值。
方法 grab§
method grab(Supply: --> Supply)
抽取其被调用的 Supply
。当它为 done
时,调用 &when-done
,然后在结果 Supply
上发出它返回的值列表。如果原始 Supply
为 quit
,则异常将立即传达给返回 Supply
。
my = Supply.from-list(4, 10, 3, 2);my = .grab();.tap(); # OUTPUT: «19»
方法 reverse§
method reverse(Supply: --> Supply)
抽取其被调用的 Supply
。一旦该 Supply
发出 done
,它发出的所有值都将按相反的顺序在返回的 Supply
上发出。如果原始 Supply
为 quit
,则异常将立即传达给返回 Supply
。
my = Supply.from-list(1, 2, 3);my = .reverse;.tap(); # OUTPUT: «321»
方法 sort§
method sort(Supply: ? --> Supply)
点击它被调用的Supply
。一旦该Supply
发出done
,它发出的所有值都将被排序,并且结果将按排序顺序在返回的Supply
上发出。可以选择接受比较器 Block
。如果原始Supply
quit
,则异常将立即传达给返回的Supply
。
my = Supply.from-list(4, 10, 3, 2);my = .sort();.tap(); # OUTPUT: «23410»
方法 collate§
method collate(Supply:)
点击它被调用的Supply
。一旦该Supply
发出done
,它发出的所有值都将被排序,同时考虑 Unicode 音节特征。将返回一个新的Supply
,其中发出已排序的值。有关整理排序的更多详细信息,请参见 Any.collate。
my = Supply.from-list(<ä a o ö>);my = .collate();.tap(); # OUTPUT: «aäoö»
方法 reduce§
method reduce(Supply: --> Supply)
创建一个“缩减”供应,它将发出一个具有与 List.reduce 相同语义的单一值。
my = Supply.from-list(1..5).reduce();.tap(-> ); # OUTPUT: «15»
返回另一个 Supply 的方法§
方法 from-list§
method from-list(Supply: + --> Supply)
根据传递给此方法的值创建一个按需供应。
my = Supply.from-list(1, 2, 3);.tap(); # OUTPUT: «123»
方法 share§
method share(Supply: --> Supply)
从按需供应创建实时供应,从而可以在多个点击中共享按需供应的值,而不是每个点击都看到按需供应的所有值的副本。
# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3"my = Supply.interval(1).share;.tap: ;sleep 1.1;.tap: ;sleep 2
方法 flat§
method flat(Supply: --> Supply)
创建一个供应,在该供应中,在再次发出之前,会展平在给定供应中看到的所有值。
方法 do§
method do(Supply: --> Supply)
创建一个供应,在该供应中,在给定供应中看到的所有值都会再次发出。仅为其副作用执行的给定代码保证一次仅由一个线程执行。
方法 on-close§
method on-close(Supply: --> Supply)
返回一个新的Supply
,该Supply
将在该Supply
的 Tap
关闭时运行&on-close
。这包括对Supply
链接的进一步操作。(例如,$supply.on-close(&on-close).map(*.uc)
)。当使用react
或supply
块时,通常使用 CLOSE 相位器是一个更好的选择。
my = Supplier.new;my = .Supply.on-close().tap(-> ,done => ,quit => -> ,);.emit('Raku');.close; # OUTPUT: «Tap closed»
方法 interval§
method interval(Supply: , = 0, : = --> Supply)
创建一个供应,该供应从调用开始每$interval
秒发出一个值,从$delay
秒开始。发出的值是一个整数,从 0 开始,并且对于发出的每个值都会递增 1。
实现可能会将过小和负值视为它们支持的最低分辨率,可能在这种情况下发出警告;例如,将0.0001
视为0.001
。对于 6.d 语言版本,指定的最大值是0.001
。
方法 grep§
method grep(Supply: Mu --> Supply)
创建一个新供应,它仅发出与 $test
智能匹配的原始供应中的那些值。
my = Supplier.new;my = .Supply;my = .grep(Int);.tap();.emit() for 1, 'a string', 3.14159; # prints only 1
方法 map§
method map(Supply: --> Supply)
返回一个新供应,它通过 &mapper
映射给定供应的每个值,并将其发出到新供应。
my = Supplier.new;my = .Supply;my = .map(-> );.tap();.emit(4); # OUTPUT: «8»
方法 batch§
method batch(Supply: :, : --> Supply)
创建一个新供应,它按批次处理给定供应的值,按批次中的元素数量(使用 :elems
)或持续时间(使用 :seconds
)或两者。当供应完成时,任何剩余的值都会在最后一个批次中发出。
注意:自 Rakudo 2020.12 版本以来,:seconds
参数具有毫秒粒度:例如,1 毫秒持续时间可以指定为 :seconds(0.001)
。在 Rakudo 2020.12 版本之前,:seconds
参数具有秒粒度。
方法 elems§
method elems(Supply: ? --> Supply)
创建一个新供应,其中会发出对所见值数量的更改。如果你只想每隔几秒更新一次,它还可以选择一个间隔(以秒为单位)。
方法 head§
multi method head(Supply:)multi method head(Supply: Callable )multi method head(Supply: \limit)
创建一个具有与 List.head 相同语义的“头”供应。
my = Supply.from-list(4, 10, 3, 2);my = .head(2);.tap(); # OUTPUT: «410»
自 2020.07 版本以来,WhateverCode
也可以使用,同样具有与 List.head
相同的语义
my = Supply.from-list(4, 10, 3, 2, 1);my = .head( * - 2);.tap(); # OUTPUT: «4103»
方法 tail§
multi method tail(Supply:)multi method tail(Supply: Callable )multi method tail(Supply: \limit)
创建一个具有与 List.tail 相同语义的“尾”供应。
my = Supply.from-list(4, 10, 3, 2);my = .tail(2);.tap(); # OUTPUT: «32»
你可以使用 Whatever
或 Inf
调用 .tail
;它将返回一个等效于初始供应的新供应。使用 WhateverCode
调用它等效于跳过直到该数字。
my = Supply.from-list(4, 10, 3, 2);my = .tail( * - 2 );.tap(); # OUTPUT: «32»
此功能仅在 Raku 的 2020.07 版本中可用。
方法 first§
method first(Supply: :, |c)
此方法创建一个供应,其中包含第一个元素,或者如果可选命名参数 :end
为真,则包含最后一个元素,该供应是通过对调用者调用 grep
方法(任何剩余参数作为参数)创建的。如果没有剩余参数,则此方法等效于在调用者上调用 head
或 tail
方法(没有参数),具体取决于命名参数 :end
。
my = supply ;my = .first: ;# output the first prime from the endless random number supply $rand,# then the $first-prime supply reaches its end.tap: ;
方法 split§
multi method split(Supply: \delimiter)multi method split(Supply: \delimiter, \limit)
此方法创建一个供应,其中包含从调用者收集的字符串上调用的 Str.split
方法返回的值。有关 \delimiter
参数以及可用的额外命名参数的详细信息,请参阅 Str.split
。创建的供应可以通过 \limit
参数进行限制,请参阅 .head
。
my = Supply.from-list(<Hello World From Raku!>);my = .split(/ /, 2, :skip-empty);.tap(); # OUTPUT: «HelloWorld»
方法 rotate§
method rotate(Supply: = 1)
当 $rotate
为正数时,创建一个元素向左旋转的供应;否则向右旋转,在这种情况下,在返回新供应之前,会先轻触调用者。
my = Supply.from-list(<a b c d e>).rotate(2);.tap(); # OUTPUT: «cdeab»
注意:自 Rakudo 2020.06 起可用。
方法 rotor§
method rotor(Supply: --> Supply)
创建一个“旋转”供应,其语义与 List.rotor 相同。
方法 delayed§
method delayed(Supply: , : = --> Supply)
创建一个新供应,其中通过给定供应的所有值都会被发出,但延迟给定的秒数。
方法 throttle§
multi method throttle(Supply:Int() ,Real() ,Real() = 0,: = ,:,:,:,:,)
multi method throttle(Supply:Int() ,Callable ,Real() = 0,: = ,:,:,:,:,)
.throttle
的参数定义如下
参数 | 含义 |
---|---|
$limit, | 值/时间或同时处理 |
$seconds 或 $process | 时间单位/同时处理的代码 |
$delay = 0, | 开始前的初始延迟(以秒为单位) |
:$control, | 用于发出控制消息的供应(可选) |
:$status, | 用于轻触状态消息的供应(可选) |
:$bleed, | 用于泄漏消息的供应(可选) |
:$vent-at, | 当缓冲过多时泄漏(可选) |
:$scheduler, | 要使用的计划程序,默认情况下为 $*SCHEDULER |
此方法从给定的供应生成一个 Supply
,但确保通过的消息数量受到限制。
它有两种操作模式:按时间单位或按代码块的最大执行次数:这由第二个位置参数的类型决定。
第一个位置参数指定应应用的限制。
如果第二个位置参数是 Callable
,则限制表示执行 Callable 的并行进程的最大数量,Callable 会收到接收到的值。在这种情况下,发出的值将是通过启动 Callable
获得的 Promise
。
如果第二个位置参数是一个实数,则将其解释为时间单位(以秒为单位)。如果您将 .1 指定为值,则它将确保您不会超过每十分之一秒的限制。
如果超过限制,则传入的消息将被缓冲,直到有空间再次传递/执行 Callable。
第三个位置参数是可选的:它表示在传递任何值之前,节流将等待的秒数。
:control
命名参数可选择指定一个 Supply,你可以在操作过程中使用它来控制节流。可以发送的消息是“键:值”形式的字符串。请参阅以下内容,了解你可以发送哪些类型的消息来控制节流。
:status
命名参数可选择指定一个 Supply,它将接收任何状态消息。如果指定,它至少会在原始 Supply 耗尽后发送一条状态消息。请参见下面的 状态消息。
:bleed
命名参数可选择指定一个 Supply,它将接收任何显式泄漏(带有 bleed 控制消息)或自动泄漏(如果 vent-at 处于活动状态)的值。
:vent-at
命名参数指示在将任何其他值路由到 :bleed
Supply 之前可以缓冲的值的数量。如果未指定,则默认为 0(导致不发生自动泄漏)。仅在还指定了 :bleed
Supply 时才有意义。
:scheduler
命名参数指示要使用的调度程序。默认为 $*SCHEDULER
。
控制消息§
这些消息可以发送到 :control
Supply。控制消息由“键: 值”形式的字符串组成,例如“limit: 4”。
limit
将消息数量(最初在第一个位置中给出)更改为给定的值。
bleed
将给定的缓冲消息数量路由到 :bleed
Supply。
vent-at
更改在发生自动泄漏之前缓冲的最大值数量。如果该值低于之前的值,将导致立即重新路由缓冲值以匹配新的最大值。
status
使用给定的 ID 向 :status
Supply 发送状态消息。
状态消息§
状态返回消息是一个哈希,具有以下键
allowed
仍然允许传递/执行的消息/可调用项的当前数量。
bled
路由到 :bleed
Supply 的消息数量。
buffered
由于溢出而当前缓冲的消息数量。
emitted
已发送(通过)的消息数量。
id
此状态消息的 ID(单调递增的数字)。如果你想记录状态消息,这很方便。
limit
limit
vent-at
正在应用的当前限制。
vent-at
在自动重新路由到 :bleed
Supply 之前,可以缓冲的最大消息数量。
my = Supply.from-list(^6); # set up supplymy = .throttle: 3, # only allow 3 at a time# don't need ; because } at end of line.wait; # wait for the supply to be done
running 0 running 1 running 2 done 2 running 3 done 1 running 4 done 4 running 5 done 0 done 3 done 5
方法 stable§
method stable(Supply: , : = --> Supply)
创建一个新的供应,它只传递给定供应中流经的值,如果它没有被给定 $time
(以秒为单位)中的另一个值取代。可以选择使用另一个调度程序,而不是默认调度程序,使用 :scheduler
参数。
为了澄清上述内容,如果在超时 $time
期间,向 Supplier
发射了其他值,除了最后一个值之外的所有值都将被丢弃。在超时期间,每次向 Supplier
发射一个其他值时,都会重置 $time
。
在处理 UI 输入时,此方法非常有用,在处理 UI 输入时,不希望在用户停止键入一段时间后执行操作,而不是在每次击键时执行操作。
my = Supplier.new;my = .Supply;.tap(-> );.emit(42);my Supply = .stable(5);.tap(-> );sleep(3);.emit(43); # will not be seen by $supply2 but will reset $time.emit(44);sleep(10);# OUTPUT: «Supply1 got: 42Supply1 got: 43Supply1 got: 44Supply2 got: 44»
如上所示,$supply1
接收了向 Supplier
发射的所有值,而 $supply2
只接收了一个值。43 被丢弃,因为它后面跟着另一个“最后一个”值 44,该值被保留并在大约八秒后发送到 $supply2
,这是因为超时 $time
在三秒后被重置。
方法 produce§
method produce(Supply: --> Supply)
创建一个“生产”供应,其语义与 List.produce 相同。
my = Supply.from-list(1..5).produce();.tap(-> ); # OUTPUT: «1361015»
方法 lines§
method lines(Supply: : = True --> Supply)
创建一个供应,它将按行发出来自通常由某些异步 I/O 操作创建的供应的字符。可选的 :chomp
参数指示是否删除行分隔符:默认值为 True
。
方法 words§
method words(Supply: --> Supply)
创建一个供应,它将按单词发出来自通常由某些异步 I/O 操作创建的供应的字符。
my = Supply.from-list("Hello Word!".comb);my = .words;.tap(); # OUTPUT: «HelloWord!»
方法 unique§
method unique(Supply: :, :, : --> Supply)
创建一个只提供唯一值的供应,如可选的 :as
和 :with
参数所定义(与 unique
相同)。可选的 :expires
参数表示在“重置”之前等待多长时间(以秒为单位),并且不考虑某个值已被看到,即使它与旧值相同。
方法 repeated§
method repeated(Supply: :, :)
创建一个只提供重复值的供应,如可选的 :as
和 :with
参数所定义(与 unique
相同)。
my = Supply.from-list(<a A B b c b C>).repeated(:as());.tap(); # OUTPUT: «AbbC»
有关使用其子表单的更多示例,请参见 repeated
。
注意:自版本 6.e (Rakudo 2020.01 及更高版本) 起可用。
方法 squish§
method squish(Supply: :, : --> Supply)
创建一个只提供唯一值的供应,如可选的 :as
和 :with
参数所定义(与 squish
相同)。
方法 max§
method max(Supply: = :<cmp> --> Supply)
创建一个供应,仅当给定供应中的值大于之前看到的任何值时才发出这些值。换句话说,它将从一个持续上升的供应中发出所有值。从一个持续下降的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.max 一样。
方法 min§
method min(Supply: = :<cmp> --> Supply)
创建一个供应,仅当给定供应中的值小于之前看到的任何值时才发出这些值。换句话说,它将从一个持续下降的供应中发出所有值。从一个持续上升的供应中,它只会发出第一个值。可选参数指定比较器,就像使用 Any.min 一样。
方法 minmax§
method minmax(Supply: = :<cmp> --> Supply)
创建一个供应,每当从给定的供应中看到新的最小值或最大值时,它都会发出一个范围。可选参数指定比较器,就像使用 Any.minmax 一样。
方法 skip§
method skip(Supply: Int(Cool) = 1 --> Supply)
返回一个新的 Supply
,它将发出给定 Supply
中的所有值,除了前 $number
个值,这些值将被丢弃。
my = Supplier.new;my = .Supply;= .skip(3);.tap();.emit() for 1..10; # OUTPUT: «45678910»
方法 start§
method start(Supply: --> Supply)
创建一个供应的供应。对于原始供应中的每个值,代码对象都会在另一个线程上安排,并返回一个供应,其中包含单个值(如果代码成功)或一个退出且没有值的供应(如果代码失败)。
这对于异步启动您不阻塞的工作非常有用。
使用 migrate
将值再次合并到一个供应中。
方法 migrate§
method migrate(Supply: --> Supply)
将自身具有类型为 Supply
的值的 Supply
作为输入。每次外部 Supply
发出一个新的 Supply
时,它都会被点击,并发出其值。任何先前点击的 Supply
都将被关闭。这对于在不同的数据源之间迁移并仅关注最新数据源非常有用。
例如,想象一个用户可以在不同股票之间切换的应用程序。当他们切换到一个新的股票时,将建立到 Web 套接字的连接以获取最新值,并且应关闭任何先前的连接。通过 Web 套接字传入的每个值流都将表示为一个供应,这些供应本身会发出到要监视的最新数据源的供应中。migrate
方法可用于将此供应的供应平展为用户关心的当前值的单个供应。
以下是此类程序的简单模拟
my Supplier .= new;sub watch-stock().Supply.migrate.tap: *.say;watch-stock('GOOG');sleep 3;watch-stock('AAPL');sleep 3;
它会生成类似这样的输出
Starting to watch GOOG GOOG: 111.67 GOOG: 111.20 GOOG: 111.37 Lost interest in GOOG Starting to watch AAPL AAPL: 111.55 AAPL: 111.6 AAPL: 111.6
组合供应的方法§
方法 merge§
method merge(Supply --> Supply)
创建一个供应,其中会发出从给定供应中看到的任何值。只有在所有给定供应都完成时,结果供应才会完成。也可以作为类方法调用。
方法 zip§
method zip(**, :)
创建一个供应,只要在所有供应上看到新值,就会立即发出组合值。默认情况下,将创建List
,但可以通过使用 :with
参数指定自己的组合器来更改此设置。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。
这也可以用作类方法;如果将其用作对象方法,则相应的供应将是组合的供应之一(没有特殊处理)。
方法 zip-latest§
method zip-latest(**, :, : )
创建一个供应,只要在任何供应上看到新值,就会立即发出组合值。默认情况下,将创建List
,但可以通过使用 :with
参数指定自己的组合器来更改此设置。可选的 :initial 参数可用于指示组合值的初始状态。默认情况下,在结果供应上发出第一个组合值之前,所有供应都必须至少发出一个值。只要任何给定供应完成,结果供应就会完成。也可以作为类方法调用。
作为供应公开的 I/O 功能§
子例程 signal§
sub signal(*, : = )
为指定的 Signal 枚举(例如 SIGINT)创建一个供应,以及一个可选的 :scheduler
参数。收到的任何信号都将在供应上发出。例如
signal(SIGINT).tap( );
将捕获 Control-C,表示感谢,然后退出。
要从信号号转到Signal,可以执行类似以下操作
signal(Signal(2)).tap( -> );
可以通过检查 Signal::.keys
(就像检查任何枚举一样)来找到受支持信号的列表。有关枚举工作原理的更多详细信息,请参见枚举。
注意:Rakudo 2018.05 及更早版本存在一个错误,导致在某些系统上信号的数值不正确。例如,Signal(10)
返回 SIGBUS
,即使它在特定系统上实际上是 SIGUSR1
。也就是说,在 2018.04、2018.04.1 和 2018.05 以外的所有 Rakudo 版本上,使用 signal(SIGUSR1)
都能按预期工作,在这些版本中,可以通过使用 signal(SIGBUS)
来实现预期行为。这些问题已在 2018.05 之后的 Rakudo 版本中得到解决。
方法 IO::Notification.watch-path§
method watch-path( --> Supply)
创建一个供应,OS 将向其发出值以指示给定路径的文件系统上的更改。此外,还具有一个快捷方式,即 IO 对象上的 watch
方法,如下所示
IO::Notification.watch-path(".").act( );".".IO.watch.act( ); # same