class Channel {}

Channel 是一个线程安全队列,可帮助您将一系列对象从一个或多个生产者发送到一个或多个消费者。每个对象只会到达由调度程序选择的其中一个消费者。如果只有一个消费者和一个生产者,则保证对象顺序不变。在 Channel 上发送是非阻塞的。

my $c = Channel.new;
await (^10).map: {
    start {
        my $r = rand;
        sleep $r;
        $c.send($r);
    }
}
$c.close;
say $c.list;

可以在 并发页面 中找到更多示例

方法§

方法 send§

method send(Channel:D: \item)

将一项放入 Channel 中。如果 Channel 已关闭,则抛出类型为 X::Channel::SendOnClosed 的异常。此调用不会阻塞,等待消费者获取对象。对可以排队的项目数量没有设置限制,因此应注意防止队列失控。

my $c = Channel.new;
$c.send(1);
$c.send([2345]);
$c.close;
say $c.list# OUTPUT: «(1 [2 3 4 5])␤»

方法 receive§

method receive(Channel:D:)

Channel 中接收并移除一项。如果不存在项目,它将阻塞,等待来自另一个线程的 send

如果 Channel 已关闭,并且最后一项已移除,或者在 receive 等待项目到达时调用了 close,则抛出类型为 X::Channel::ReceiveOnClosed 的异常。

如果 Channel 已使用 fail 方法标记为不稳定,并且最后一项已移除,则将作为异常抛出传递给 fail 的参数。

有关不会抛出异常的非阻塞版本的详细信息,请参见方法 poll

my $c = Channel.new;
$c.send(1);
say $c.receive# OUTPUT: «1␤»

方法 poll§

method poll(Channel:D:)

Channel 中接收并移除一项。如果不存在项目,则返回 Nil,而不是等待。

my $c = Channel.new;
Promise.in(2).then: { $c.close}
^10 .map({ $c.send($_); });
loop {
    if $c.poll -> $item { $item.say };
    if $c.closed  { last };
    sleep 0.1;
}

有关正确响应 Channel 关闭和故障的阻塞版本的详细信息,请参见方法 receive

方法 close§

method close(Channel:D:)

正常关闭 Channel。这会使随后的 send 调用使用 X::Channel::SendOnClosed 终止。随后的 .receive 调用仍可以耗尽之前发送的任何剩余项目,但如果队列为空,则会抛出 X::Channel::ReceiveOnClosed 异常。由于您可以通过使用 @() 将上下文设置为数组或调用 .list 方法从 Channel 生成 Seq,因此这些方法在 Channel 关闭之前不会终止。whenever 块也会在关闭的 Channel 上正确终止。

my $c = Channel.new;
$c.close;
$c.send(1);
CATCH { default { put .^name''.Str } };
# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤» 

请注意,抛出的任何异常都可能阻止调用 .close,这可能会挂起接收线程。在这种情况下,请使用 LEAVE 阶段器来强制调用 .close

方法 list§

method list(Channel:D:)

返回基于 Seq 的列表,该列表将迭代队列中的项目,并在迭代时从其中删除每个项目。只有在调用 close 方法后,此操作才能终止。

my $c = Channel.new$c.send(1); $c.send(2);
$c.close;
say $c.list# OUTPUT: «(1 2)␤»

方法 closed§

method closed(Channel:D: --> Promise:D)

返回一个承诺,该承诺将在通过调用方法 close 关闭 Channel 后兑现。

my $c = Channel.new;
$c.closed.then({ say "It's closed!" });
$c.close;
sleep 1;

方法 fail§

method fail(Channel:D: $error)

关闭 Channel(即,使后续 send 调用终止),并将错误排队作为 Channel 中的最后一个元素抛出。方法 receive 将抛出该错误作为异常。如果 Channel 已被关闭或已在其上调用 .fail,则不执行任何操作。

my $c = Channel.new;
$c.fail("Bad error happens!");
$c.receive;
CATCH { default { put .^name''.Str } };
# OUTPUT: «X::AdHoc: Bad error happens!␤»

方法 Capture§

method Capture(Channel:D: --> Capture:D)

等同于在调用者上调用 .List.Capture

方法 Supply§

method Supply(Channel:D:)

这返回一个 on-demand Supply,它为在 Channel 上接收的每个值发出一个值。当 Channel 关闭时,将在 Supply 上调用 done

my $c = Channel.new;
my Supply $s1 = $c.Supply;
my Supply $s2 = $c.Supply;
$s1.tap(-> $v { say "First $v" });
$s2.tap(-> $v { say "Second $v" });
^10 .map({ $c.send($_});
sleep 1;

对该方法的多次调用会产生多个 Supply 实例,这些实例会争用来自 Channel 的值。

子例程 await§

multi await(Channel:D)
multi await(*@)

等待一个或多个 Channel 具有可用值,并返回这些值(它在 Channel 上调用 .receive)。还适用于 Promise

my $c = Channel.new;
Promise.in(1).then({$c.send(1)});
say await $c;

自 6.d 起,它在等待时不再阻塞线程。

类型图§

Channel 的类型关系
raku-type-graph Channel Channel Any Any Channel->Any Mu Mu Any->Mu

展开上方的图表