与大多数现代编程语言一样,Raku 旨在支持并行、异步和 并发。并行是指同时执行多项任务。异步编程,有时称为事件驱动或反应式编程,是指支持由程序其他地方触发的事件引起的程序流变化。最后,并发是指协调对某些共享资源的访问和修改。
Raku 并发设计的目标是提供一个高级、可组合且一致的接口,无论虚拟机如何为特定操作系统实现它,都通过以下描述的设施层实现。
此外,某些 Raku 特性可能以异步方式隐式运行,因此为了确保与这些特性的可预测互操作性,用户代码应尽可能避免使用低级并发 API(例如,Thread
和 Scheduler
)并使用高级接口。
高级 API§
承诺§
一个 Promise
(在其他编程环境中也称为future)封装了计算结果,该计算可能在获取承诺时尚未完成甚至尚未开始。一个 Promise
从 Planned
状态开始,可以导致 Kept
状态(表示承诺已成功完成)或 Broken
状态(表示承诺已失败)。通常,这是用户代码以并发或异步方式运行所需的大部分功能。
my = Promise.new;say .status; # OUTPUT: «Planned».keep('Result');say .status; # OUTPUT: «Kept»say .result; # OUTPUT: «Result»# (since it has been kept, a result is available!)my = Promise.new;.break('oh no');say .status; # OUTPUT: «Broken»say .result; # dies, because the promise has been brokenCATCH ;# OUTPUT: «X::AdHoc+{X::Promise::Broken}: oh no»
承诺通过可组合性获得了强大的功能,例如通过链接,通常通过 then 方法
my = Promise.new();my = .then(->);.keep("First Result");say .result; # OUTPUT: «First ResultSecond Result»
这里,then 方法安排代码在第一个 Promise
保持或破坏时执行,它本身返回一个新的 Promise
,该承诺将在代码执行时(或如果代码失败则破坏)与代码的结果保持一致。keep
将承诺的状态更改为 Kept
,并将结果设置为位置参数。result
阻塞当前执行线程,直到承诺保持或破坏,如果保持,则将返回结果(即传递给 keep
的值),否则将根据传递给 break
的值抛出异常。后一种行为用以下示例说明
my = Promise.new();my = .then(-> );.break("First Result");try .result;say .cause; # OUTPUT: «Handled but : First Result»
这里,break
将导致 then
的代码块在调用传递为参数的原始承诺上的 result
方法时抛出异常,这将随后导致第二个承诺被破坏,从而在获取其结果时引发异常。实际的 Exception
对象将随后从 cause
中获得。如果承诺没有被破坏,cause
将引发 X::Promise::CauseOnlyValidOnBroken
异常。
一个 Promise
也可以安排在将来某个时间自动保持
my = Promise.in(5);my = .then(-> );say .result;
method in 创建一个新的承诺并安排一个新的任务,该任务在不早于提供的秒数时调用 keep
,并返回新的 Promise
对象。
承诺的非常常见的用法是运行一段代码,并在代码成功返回时保持承诺,或者在代码死亡时破坏承诺。 start 方法 为此提供了一个快捷方式
my = Promise.start();say .result; # OUTPUT: «55»
这里,返回的承诺的 result
是代码返回的值。类似地,如果代码失败(因此承诺被破坏),则 cause
将是抛出的 Exception
对象
my = Promise.start();try .result;say .cause;
这被认为是一种如此常见的模式,以至于它也被提供为关键字
my = startmy = await ;say ;
子例程 await 几乎等同于调用 start
返回的承诺对象上的 result
,但它也会接受一个承诺列表并返回每个承诺的结果
my = start;my = start;my = await , ;say ; # OUTPUT: «[55 -55]»
除了 await
之外,还有两个类方法将多个 Promise
对象组合成一个新的承诺:allof
返回一个承诺,该承诺在所有原始承诺都保持或破坏时保持
my = Promise.allof(Promise.in(2),Promise.in(3));await ;say "All done"; # Should be not much more than three seconds later
而 anyof
返回一个新的承诺,该承诺在任何一个原始承诺保持或破坏时将保持
my = Promise.anyof(Promise.in(3),Promise.in(8600));await ;say "All done"; # Should be about 3 seconds later
然而,与 await
不同的是,原始保持承诺的结果无法获得,除非引用原始承诺,因此当任务的完成与否对消费者比实际结果更重要,或者当结果已通过其他方式收集时,这些方法更有用。例如,您可能希望创建一个依赖承诺,该承诺将检查每个原始承诺
my ;for 1..5 ->say await Promise.allof().then();
如果所有承诺都保持为 True,则将返回 True,否则返回 False。
如果您正在创建一个打算自己保持或破坏的承诺,那么您可能不希望任何可能接收该承诺的代码在您之前无意中(或以其他方式)保持或破坏该承诺。为此,存在 method vow,它返回一个 Vow 对象,该对象成为保持或破坏承诺的唯一机制。如果尝试直接保持或破坏承诺,则将抛出异常 X::Promise::Vowed
,只要 vow 对象保持私有,承诺的状态就是安全的
sub get_promisemy = get_promise();# Will throw an exception# "Access denied to keep/break this Promise; already vowed".keep;CATCH ;# OUTPUT: «X::Promise::Vowed: Access denied to keep/break this Promise; already vowed»
像 `in` 或 `start` 这样的自动履行或违反承诺的方法会这样做,因此无需为这些方法执行此操作。
供应§
一个 Supply
是一种异步数据流机制,可以被一个或多个消费者同时使用,类似于其他编程语言中的“事件”,可以看作是启用事件驱动或反应式设计。
最简单地说,一个 Supply
是一个消息流,可以使用 `tap` 方法创建多个订阅者,可以使用 `emit` 将数据项放置到这些订阅者中。
该 Supply
可以是 `live` 或 `on-demand`。`live` 供应就像电视广播:那些收看的人不会收到之前发出的值。`on-demand` 广播就像 Netflix:每个开始流式传输电影(点击供应)的人,总是从头开始(获取所有值),无论现在有多少人在观看它。请注意,`on-demand` 供应没有保留历史记录,而是为每次点击供应运行 `supply` 块。
一个 `live` Supply
由 Supplier
工厂创建,每个发出的值都会传递给所有活动点击者,因为它们被添加了
my = Supplier.new;my = .Supply;.tap( -> );for 1 .. 10
请注意,`tap` 被调用在一个由 Supplier
创建的 Supply
对象上,新值在 Supplier
上发出。
一个 `on-demand` Supply
由 `supply` 关键字创建
my = supply.tap( -> );
在这种情况下,供应块中的代码在每次由 `supply` 返回的 Supply
被点击时执行,如以下示例所示
my = supply.tap( -> );.tap( -> );
该 `tap` 方法返回一个 Tap
对象,该对象可用于获取有关点击的信息,以及在不再对事件感兴趣时将其关闭
my = Supplier.new;my = .Supply;my = .tap( -> );.emit("OK");.close;.emit("Won't trigger the tap");
在供应对象上调用 `done` 会调用 `done` 回调,该回调可能为任何点击指定,但不会阻止任何进一步的事件被发送到流中,或者点击接收它们。
该 `interval` 方法返回一个新的 `on-demand` 供应,该供应会定期以指定的时间间隔发出一个新事件。发出的数据是一个从 0 开始的整数,它会为每个事件递增。以下代码输出 0 .. 5
my = Supply.interval(2);.tap(-> );sleep 10;
可以向 `interval` 提供第二个参数,该参数指定第一个事件触发之前的延迟时间(以秒为单位)。由 `interval` 创建的每个供应点击都有自己的从 0 开始的序列,如下所示
my = Supply.interval(2);.tap(-> );sleep 6;.tap(-> );sleep 10;
一个 live Supply
,它保留值直到第一次点击,可以使用 Supplier::Preserving
创建。
whenever
§
该 `whenever` 关键字可以在供应块或反应块中使用。从 6.d 版本开始,它需要在它们的词法范围内使用。它引入了一个代码块,该代码块将在异步事件的提示下运行,它指定了该事件 - 这可能是一个 Supply
、一个 Channel
、一个 Promise
或一个 Iterable
。
请注意,应尽可能使 `whenever` 内部的代码保持简短,因为任何时候都只执行一个 `whenever` 块。可以在 `whenever` 块内使用 `start` 块来运行运行时间更长的代码。
在这个例子中,我们正在观察两个供应。
my = Supplier.new;my = Supplier.new;my = supply.tap( -> );.emit("Radish"); # OUTPUT: «We've got a vegetable: Radish».emit("Thick sliced"); # OUTPUT: «We've got bread: Thick sliced».emit("Lettuce"); # OUTPUT: «We've got a vegetable: Lettuce»
react
§
该 `react` 关键字引入了一个代码块,该代码块包含一个或多个 `whenever` 关键字来观察异步事件。供应块和反应块的主要区别在于,反应块中的代码在代码流中出现的地方运行,而供应块必须被点击才能执行任何操作。
另一个区别是,供应块可以在没有 `whenever` 关键字的情况下使用,但反应块至少需要一个 `whenever` 才能真正有用。
react
这里,whenever
关键字使用 .act
从提供的代码块创建对 Supply
的点击。当在其中一个点击中调用 done()
时,react
代码块将退出。使用 last
退出代码块会导致错误,表明它实际上不是循环结构。
也可以从将依次发出的一系列值创建 on-demand
Supply
,因此第一个 on-demand
示例可以写成
react
转换供应§
可以使用 grep
和 map
方法分别过滤或转换现有的供应对象,以类似于同名列表方法的方式创建新的供应:grep
返回一个供应,使得仅在源流上发出的满足 grep
条件的事件才会在第二个供应上发出
my = Supplier.new;my = .Supply;.tap(-> );my = .grep();.tap(-> );my = .grep();.tap(-> );for 0 .. 10
map
返回一个新的供应,使得对于发送到原始供应的每个项目,都会发出一个新项目,该项目是传递给 map
的表达式的结果
my = Supplier.new;my = .Supply;.tap(-> );my = .map();.tap(-> );for 0 .. 10
结束供应§
如果您需要在供应完成时运行的操作,可以通过在调用 tap
时设置 done
和 quit
选项来实现
.tap: ,done => ,quit =>;
quit
代码块的工作原理与 CATCH
非常相似。如果异常被 when
或 default
代码块标记为已查看,则该异常将被捕获并处理。否则,该异常将继续向上调用树(即,与未设置 quit
时相同的行为)。
供应或反应块中的相位器§
如果您在 whenever
中使用 react
或 supply
代码块语法,可以在 whenever
代码块中添加相位器来处理来自点击供应的 done
和 quit
消息
react
此处的行为与在 tap
上设置 done
和 quit
相同。
通道§
Channel
是一个线程安全的队列,可以有多个读写器,可以认为其操作类似于“fifo”或命名管道,但它不启用进程间通信。需要注意的是,作为一个真正的队列,发送到 Channel
的每个值仅对第一个读取的单个读取器可用,先到先得:如果您希望多个读取器能够接收发送的每个项目,您可能需要考虑使用 Supply
。
使用 方法 send 将项目排队到 Channel
,而 方法 receive 从队列中删除一个项目并返回它,如果队列为空,则阻塞直到发送新项目
my = Channel.new;.send('Channel One');say .receive; # OUTPUT: «Channel One»
如果通道已使用 方法 close 关闭,则任何 send
都会导致异常 X::Channel::SendOnClosed
被抛出,而 receive
会抛出 X::Channel::ReceiveOnClosed
。
方法 list 返回 Channel
上的所有项目,并将阻塞直到排队更多项目,除非通道已关闭
my = Channel.new;await (^10).map: ->.close;for .list ->
还有一个非阻塞的 方法 poll,它从通道返回一个可用项目,或者如果通道为空或已关闭,则返回 Nil
。这意味着必须检查通道以确定它是否已关闭
my = Channel.new;# Start three Promises that sleep for 1..3 seconds, and then# send a value to our Channel^3 .map: -># Wait 3 seconds before closing the channelPromise.in(3).then:# Continuously loop and poll the channel, until it's closedmy = .closed;loop# Doing some unrelated things...# Doing some unrelated things...# 2 from thread 5 received after 1.2063182 seconds# Doing some unrelated things...# Doing some unrelated things...# 1 from thread 4 received after 2.41117376 seconds# Doing some unrelated things...# 0 from thread 3 received after 3.01364461 seconds# Doing some unrelated things...
方法 closed 返回一个 Promise
,当通道关闭时,该承诺将被兑现(因此在布尔上下文中将评估为 True)。
.poll
方法可以与 .receive
方法结合使用,作为一种缓存机制,其中 .poll
返回的值为空是需要获取更多值并将其加载到通道中的信号
sub get-valuesub replenish-cache
通道可以在前面描述的 react
代码块的 whenever
中代替 Supply
使用
my = Channel.new;my = startawait (^10).map: ->.close;await ;
也可以从 Supply
使用 Channel 方法 获取 Channel
,该方法返回一个由 Supply
上的 tap
馈送的 Channel
my = Supplier.new;my = .Supply;my = .Channel;my = startawait (^10).map: ->.done;await ;
Channel
每次调用时将返回一个不同的 Channel
,该 Channel
使用相同的数据馈送。例如,这可以用于将 Supply
扇出到一个或多个 Channel
,以提供程序中的不同接口。
Proc::Async§
Proc::Async
基于所描述的设施来异步运行和交互外部程序
my = Proc::Async.new('echo', 'foo', 'bar');.stdout.tap(-> );.stderr.tap(-> );say "Starting...";my = .start;await ;say "Done.";# Output:# Starting...# Output: foo bar# Done.
命令的路径以及命令的任何参数都提供给构造函数。命令不会在调用 start 之前执行,该方法将返回一个 Promise
,该 Promise
在程序退出时将被保留。程序的标准输出和标准错误作为 Supply
对象从方法 stdout 和 stderr 分别提供,可以根据需要进行抽取。
如果要写入程序的标准输入,可以将 :w
副词提供给构造函数,并在程序启动后使用方法 write、print 或 say 写入打开的管道
my = Proc::Async.new(:w, 'grep', 'foo');.stdout.tap(-> );say "Starting...";my = .start;.say("this line has foo");.say("this one doesn't");.close-stdin;await ;say "Done.";# Output:# Starting...# Output: this line has foo# Done.
某些程序(例如,在本例中没有文件参数的 grep
)在标准输入关闭之前不会退出,因此当您完成写入时,可以调用 close-stdin 以允许 start
返回的 Promise
被保留。
低级 API§
线程§
并发性的最低级接口由 Thread
提供。线程可以被认为是一段代码,最终可能在处理器上运行,其安排几乎完全由虚拟机和/或操作系统完成。线程应该被认为,就所有意图而言,基本上是未管理的,并且应该避免在用户代码中直接使用它们。
线程可以创建,然后在稍后实际运行
my = Thread.new(code => );# ....run;
或者可以在一次调用中创建和运行
my = Thread.start();
在这两种情况下,都可以使用 finish
方法等待 Thread
对象封装的代码完成,该方法将在线程完成之前阻塞
.finish;
除此之外,没有其他用于同步或资源共享的设施,这在很大程度上是为什么应该强调线程不太可能在用户代码中直接使用。
调度器§
并发 API 的下一级由实现角色 Scheduler
定义的接口的类提供。调度器接口的目的是提供一种机制来确定使用哪些资源来运行特定任务以及何时运行它。大多数高级并发 API 都建立在调度器之上,用户代码可能根本不需要使用它们,尽管某些方法(例如在 Proc::Async
、Promise
和 Supply
中找到的方法)允许您显式地提供调度器。
当前默认全局调度器在变量 $*SCHEDULER
中可用。
调度器的主要接口(实际上是 Scheduler
接口所需的唯一方法)是 cue
方法
method cue(:, Instant :, :, :, : = 1; :)
这将调度 &code
中的 Callable
,以使用调度器实现的执行方案,以副词(如 Scheduler
中所述)确定的方式执行。例如
my = 0;my = .cue(, every => 2 );sleep 20;
假设 $*SCHEDULER
没有从默认值更改,将大约(即,在操作系统调度容差范围内)每两秒打印一次数字 0 到 10。在这种情况下,代码将被调度运行,直到程序正常结束,但是该方法返回一个 Cancellation
对象,该对象可用于在正常完成之前取消已安排的执行
my = 0;my = .cue(, every => 2 );sleep 10;.cancel;sleep 10;
应该只输出 0 到 5。
尽管 Scheduler
接口相对于 Thread
具有明显的优势,但所有功能都可通过更高级别的接口获得,因此没有必要直接使用调度器,除非是在上面提到的情况下,您可以在其中显式地将调度器提供给某些方法。
如果库有特殊需求,它可能希望提供一个替代的调度器实现,例如,UI 库可能希望所有代码都在单个 UI 线程中运行,或者可能需要一些自定义优先级机制,但是下面描述的标准提供的实现应该足以满足大多数用户代码。
ThreadPoolScheduler§
ThreadPoolScheduler
是默认的调度器,它维护一个线程池,这些线程按需分配,并在需要时创建新的线程,直到达到创建调度器对象时作为参数给出的最大数量(默认值为 16)。如果超过最大值,则 cue
可能会将代码排队,直到有线程可用。
Rakudo 允许在程序启动时通过环境变量 RAKUDO_MAX_THREADS
设置默认调度器中允许的最大线程数。
CurrentThreadScheduler§
CurrentThreadScheduler
是一个非常简单的调度器,它总是将代码调度到当前线程上立即运行。这意味着在这个调度器上的 cue
将阻塞,直到代码执行完毕,这限制了它在某些特殊情况下的实用性,例如测试。
锁§
类 Lock
提供了在并发环境中保护共享数据的低级机制,因此它是支持高级 API 中线程安全性的关键,这在其他编程语言中有时被称为“互斥锁”。由于更高级别的类(Promise
、Supply
和 Channel
)在需要时使用 Lock
,因此用户代码不太可能需要直接使用 Lock
。
Lock
的主要接口是方法 protect,它确保代码块(通常称为“临界区”)一次只在一个线程中执行。
my = Lock.new;my = 0;await (^10).map:say ; # OUTPUT: «10»
protect
返回代码块返回的任何内容。
由于 protect
会阻塞任何等待执行临界区的线程,因此代码应该尽可能快。
安全问题§
一些共享数据并发问题不像其他问题那么明显。有关此主题的良好一般性文章,请参阅此 博客文章。
一个需要注意的特殊问题是容器自动生成或扩展发生时。当 Array
或 Hash
条目最初被分配时,底层结构会发生改变,并且该操作不是异步安全的。例如,在此代码中
my ;my := [20];= 'foo';
第三行是临界区,因为这是数组扩展的地方。最简单的解决方法是使用 Lock
来保护临界区。一个可能更好的解决方法是重构代码,以便不需要共享容器。