与大多数现代编程语言一样,Raku 旨在支持并行、异步和 并发。并行是指同时执行多项任务。异步编程,有时称为事件驱动或反应式编程,是指支持由程序其他地方触发的事件引起的程序流变化。最后,并发是指协调对某些共享资源的访问和修改。

Raku 并发设计的目标是提供一个高级、可组合且一致的接口,无论虚拟机如何为特定操作系统实现它,都通过以下描述的设施层实现。

此外,某些 Raku 特性可能以异步方式隐式运行,因此为了确保与这些特性的可预测互操作性,用户代码应尽可能避免使用低级并发 API(例如,ThreadScheduler)并使用高级接口。

高级 API§

承诺§

一个 Promise(在其他编程环境中也称为future)封装了计算结果,该计算可能在获取承诺时尚未完成甚至尚未开始。一个 PromisePlanned 状态开始,可以导致 Kept 状态(表示承诺已成功完成)或 Broken 状态(表示承诺已失败)。通常,这是用户代码以并发或异步方式运行所需的大部分功能。

my $p1 = Promise.new;
say $p1.status;         # OUTPUT: «Planned␤» 
$p1.keep('Result');
say $p1.status;         # OUTPUT: «Kept␤» 
say $p1.result;         # OUTPUT: «Result␤» 
                        # (since it has been kept, a result is available!) 
 
my $p2 = Promise.new;
$p2.break('oh no');
say $p2.status;         # OUTPUT: «Broken␤» 
say $p2.result;         # dies, because the promise has been broken 
CATCH { default { say .^name''.Str } };
# OUTPUT: «X::AdHoc+{X::Promise::Broken}: oh no␤» 

承诺通过可组合性获得了强大的功能,例如通过链接,通常通过 then 方法

my $promise1 = Promise.new();
my $promise2 = $promise1.then(
    -> $v { say $v.result"Second Result" }
);
$promise1.keep("First Result");
say $promise2.result;   # OUTPUT: «First Result␤Second Result␤»

这里,then 方法安排代码在第一个 Promise 保持或破坏时执行,它本身返回一个新的 Promise,该承诺将在代码执行时(或如果代码失败则破坏)与代码的结果保持一致。keep 将承诺的状态更改为 Kept,并将结果设置为位置参数。result 阻塞当前执行线程,直到承诺保持或破坏,如果保持,则将返回结果(即传递给 keep 的值),否则将根据传递给 break 的值抛出异常。后一种行为用以下示例说明

my $promise1 = Promise.new();
my $promise2 = $promise1.then(-> $v { say "Handled but : "say $v.result});
$promise1.break("First Result");
try $promise2.result;
say $promise2.cause;        # OUTPUT: «Handled but : ␤First Result␤»

这里,break 将导致 then 的代码块在调用传递为参数的原始承诺上的 result 方法时抛出异常,这将随后导致第二个承诺被破坏,从而在获取其结果时引发异常。实际的 Exception 对象将随后从 cause 中获得。如果承诺没有被破坏,cause 将引发 X::Promise::CauseOnlyValidOnBroken 异常。

一个 Promise 也可以安排在将来某个时间自动保持

my $promise1 = Promise.in(5);
my $promise2 = $promise1.then(-> $v { say $v.status'Second Result' });
say $promise2.result;

method in 创建一个新的承诺并安排一个新的任务,该任务在不早于提供的秒数时调用 keep,并返回新的 Promise 对象。

承诺的非常常见的用法是运行一段代码,并在代码成功返回时保持承诺,或者在代码死亡时破坏承诺。 start 方法 为此提供了一个快捷方式

my $promise = Promise.start(
    { my $i = 0for 1 .. 10 { $i += $_ }$i}
);
say $promise.result;    # OUTPUT: «55␤»

这里,返回的承诺的 result 是代码返回的值。类似地,如果代码失败(因此承诺被破坏),则 cause 将是抛出的 Exception 对象

my $promise = Promise.start({ die "Broken Promise" });
try $promise.result;
say $promise.cause;

这被认为是一种如此常见的模式,以至于它也被提供为关键字

my $promise = start {
    my $i = 0;
    for 1 .. 10 {
        $i += $_
    }
    $i
}
my $result = await $promise;
say $result;

子例程 await 几乎等同于调用 start 返回的承诺对象上的 result,但它也会接受一个承诺列表并返回每个承诺的结果

my $p1 = start {
    my $i = 0;
    for 1 .. 10 {
        $i += $_
    }
    $i
};
my $p2 = start {
    my $i = 0;
    for 1 .. 10 {
        $i -= $_
    }
    $i
};
my @result = await $p1$p2;
say @result;            # OUTPUT: «[55 -55]␤»

除了 await 之外,还有两个类方法将多个 Promise 对象组合成一个新的承诺:allof 返回一个承诺,该承诺在所有原始承诺都保持或破坏时保持

my $promise = Promise.allof(
    Promise.in(2),
    Promise.in(3)
);
 
await $promise;
say "All done"# Should be not much more than three seconds later

anyof 返回一个新的承诺,该承诺在任何一个原始承诺保持或破坏时将保持

my $promise = Promise.anyof(
    Promise.in(3),
    Promise.in(8600)
);
 
await $promise;
say "All done"# Should be about 3 seconds later

然而,与 await 不同的是,原始保持承诺的结果无法获得,除非引用原始承诺,因此当任务的完成与否对消费者比实际结果更重要,或者当结果已通过其他方式收集时,这些方法更有用。例如,您可能希望创建一个依赖承诺,该承诺将检查每个原始承诺

my @promises;
for 1..5 -> $t {
    push @promisesstart {
        sleep $t;
        Bool.pick;
    };
}
say await Promise.allof(@promises).then({ so all(@promises>>.result});

如果所有承诺都保持为 True,则将返回 True,否则返回 False。

如果您正在创建一个打算自己保持或破坏的承诺,那么您可能不希望任何可能接收该承诺的代码在您之前无意中(或以其他方式)保持或破坏该承诺。为此,存在 method vow,它返回一个 Vow 对象,该对象成为保持或破坏承诺的唯一机制。如果尝试直接保持或破坏承诺,则将抛出异常 X::Promise::Vowed,只要 vow 对象保持私有,承诺的状态就是安全的

sub get_promise {
    my $promise = Promise.new;
    my $vow = $promise.vow;
    Promise.in(10).then({$vow.keep});
    $promise;
}
 
my $promise = get_promise();
 
# Will throw an exception 
# "Access denied to keep/break this Promise; already vowed" 
$promise.keep;
CATCH { default { say .^name''.Str } };
# OUTPUT: «X::Promise::Vowed: Access denied to keep/break this Promise; already vowed␤»

像 `in` 或 `start` 这样的自动履行或违反承诺的方法会这样做,因此无需为这些方法执行此操作。

供应§

一个 Supply 是一种异步数据流机制,可以被一个或多个消费者同时使用,类似于其他编程语言中的“事件”,可以看作是启用事件驱动或反应式设计。

最简单地说,一个 Supply 是一个消息流,可以使用 `tap` 方法创建多个订阅者,可以使用 `emit` 将数据项放置到这些订阅者中。

Supply 可以是 `live` 或 `on-demand`。`live` 供应就像电视广播:那些收看的人不会收到之前发出的值。`on-demand` 广播就像 Netflix:每个开始流式传输电影(点击供应)的人,总是从头开始(获取所有值),无论现在有多少人在观看它。请注意,`on-demand` 供应没有保留历史记录,而是为每次点击供应运行 `supply` 块。

一个 `live` SupplySupplier 工厂创建,每个发出的值都会传递给所有活动点击者,因为它们被添加了

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
 
$supply.tap-> $v { say $v });
 
for 1 .. 10 {
    $supplier.emit($_);
}

请注意,`tap` 被调用在一个由 Supplier 创建的 Supply 对象上,新值在 Supplier 上发出。

一个 `on-demand` Supply 由 `supply` 关键字创建

my $supply = supply {
    for 1 .. 10 {
        emit($_);
    }
}
$supply.tap-> $v { say $v });

在这种情况下,供应块中的代码在每次由 `supply` 返回的 Supply 被点击时执行,如以下示例所示

my $supply = supply {
    for 1 .. 10 {
        emit($_);
    }
}
$supply.tap-> $v { say "First : $v" });
$supply.tap-> $v { say "Second : $v" });

该 `tap` 方法返回一个 Tap 对象,该对象可用于获取有关点击的信息,以及在不再对事件感兴趣时将其关闭

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
 
my $tap = $supply.tap-> $v { say $v });
 
$supplier.emit("OK");
$tap.close;
$supplier.emit("Won't trigger the tap");

在供应对象上调用 `done` 会调用 `done` 回调,该回调可能为任何点击指定,但不会阻止任何进一步的事件被发送到流中,或者点击接收它们。

该 `interval` 方法返回一个新的 `on-demand` 供应,该供应会定期以指定的时间间隔发出一个新事件。发出的数据是一个从 0 开始的整数,它会为每个事件递增。以下代码输出 0 .. 5

my $supply = Supply.interval(2);
$supply.tap(-> $v { say $v });
sleep 10;

可以向 `interval` 提供第二个参数,该参数指定第一个事件触发之前的延迟时间(以秒为单位)。由 `interval` 创建的每个供应点击都有自己的从 0 开始的序列,如下所示

my $supply = Supply.interval(2);
$supply.tap(-> $v { say "First $v" });
sleep 6;
$supply.tap(-> $v { say "Second $v"});
sleep 10;

一个 live Supply,它保留值直到第一次点击,可以使用 Supplier::Preserving 创建。

whenever§

该 `whenever` 关键字可以在供应块或反应块中使用。从 6.d 版本开始,它需要在它们的词法范围内使用。它引入了一个代码块,该代码块将在异步事件的提示下运行,它指定了该事件 - 这可能是一个 Supply、一个 Channel、一个 Promise 或一个 Iterable

请注意,应尽可能使 `whenever` 内部的代码保持简短,因为任何时候都只执行一个 `whenever` 块。可以在 `whenever` 块内使用 `start` 块来运行运行时间更长的代码。

在这个例子中,我们正在观察两个供应。

my $bread-supplier = Supplier.new;
my $vegetable-supplier = Supplier.new;
 
my $supply = supply {
    whenever $bread-supplier.Supply {
        emit("We've got bread: " ~ $_);
    };
    whenever $vegetable-supplier.Supply {
        emit("We've got a vegetable: " ~ $_);
    };
}
$supply.tap-> $v { say "$v" });
 
$vegetable-supplier.emit("Radish");   # OUTPUT: «We've got a vegetable: Radish␤» 
$bread-supplier.emit("Thick sliced"); # OUTPUT: «We've got bread: Thick sliced␤» 
$vegetable-supplier.emit("Lettuce");  # OUTPUT: «We've got a vegetable: Lettuce␤» 

react§

该 `react` 关键字引入了一个代码块,该代码块包含一个或多个 `whenever` 关键字来观察异步事件。供应块和反应块的主要区别在于,反应块中的代码在代码流中出现的地方运行,而供应块必须被点击才能执行任何操作。

另一个区别是,供应块可以在没有 `whenever` 关键字的情况下使用,但反应块至少需要一个 `whenever` 才能真正有用。

react {
    whenever Supply.interval(2-> $v {
        say $v;
        done() if $v == 4;
    }
}

这里,whenever 关键字使用 .act 从提供的代码块创建对 Supply 的点击。当在其中一个点击中调用 done() 时,react 代码块将退出。使用 last 退出代码块会导致错误,表明它实际上不是循环结构。

也可以从将依次发出的一系列值创建 on-demand Supply,因此第一个 on-demand 示例可以写成

react {
    whenever Supply.from-list(1..10-> $v {
        say $v;
    }
}

转换供应§

可以使用 grepmap 方法分别过滤或转换现有的供应对象,以类似于同名列表方法的方式创建新的供应:grep 返回一个供应,使得仅在源流上发出的满足 grep 条件的事件才会在第二个供应上发出

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $odd_supply = $supply.grep({ $_ % 2 });
$odd_supply.tap(-> $v { say "Odd : $v" });
my $even_supply = $supply.grep({ not $_ % 2 });
$even_supply.tap(-> $v { say "Even : $v" });
for 0 .. 10 {
    $supplier.emit($_);
}

map 返回一个新的供应,使得对于发送到原始供应的每个项目,都会发出一个新项目,该项目是传递给 map 的表达式的结果

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $half_supply = $supply.map({ $_ / 2 });
$half_supply.tap(-> $v { say "Half : $v" });
for 0 .. 10 {
    $supplier.emit($_);
}

结束供应§

如果您需要在供应完成时运行的操作,可以通过在调用 tap 时设置 donequit 选项来实现

$supply.tap: { ... },
    done => { say 'Job is done.' },
    quit => {
        when X::MyApp::Error { say "App Error: "$_.message }
    };

quit 代码块的工作原理与 CATCH 非常相似。如果异常被 whendefault 代码块标记为已查看,则该异常将被捕获并处理。否则,该异常将继续向上调用树(即,与未设置 quit 时相同的行为)。

供应或反应块中的相位器§

如果您在 whenever 中使用 reactsupply 代码块语法,可以在 whenever 代码块中添加相位器来处理来自点击供应的 donequit 消息

react {
    whenever $supply {
        ...# your usual supply tap code here 
        LAST { say 'Job is done.' }
        QUIT { when X::MyApp::Error { say "App Error: "$_.message } }
    }
}

此处的行为与在 tap 上设置 donequit 相同。

通道§

Channel 是一个线程安全的队列,可以有多个读写器,可以认为其操作类似于“fifo”或命名管道,但它不启用进程间通信。需要注意的是,作为一个真正的队列,发送到 Channel 的每个值仅对第一个读取的单个读取器可用,先到先得:如果您希望多个读取器能够接收发送的每个项目,您可能需要考虑使用 Supply

使用 方法 send 将项目排队到 Channel,而 方法 receive 从队列中删除一个项目并返回它,如果队列为空,则阻塞直到发送新项目

my $channel = Channel.new;
$channel.send('Channel One');
say $channel.receive;  # OUTPUT: «Channel One␤»

如果通道已使用 方法 close 关闭,则任何 send 都会导致异常 X::Channel::SendOnClosed 被抛出,而 receive 会抛出 X::Channel::ReceiveOnClosed

方法 list 返回 Channel 上的所有项目,并将阻塞直到排队更多项目,除非通道已关闭

my $channel = Channel.new;
await (^10).map: -> $r {
    start {
        sleep $r;
        $channel.send($r);
    }
}
$channel.close;
for $channel.list -> $r {
    say $r;
}

还有一个非阻塞的 方法 poll,它从通道返回一个可用项目,或者如果通道为空或已关闭,则返回 Nil。这意味着必须检查通道以确定它是否已关闭

my $c = Channel.new;
 
# Start three Promises that sleep for 1..3 seconds, and then 
# send a value to our Channel 
^3 .map: -> $v {
    start {
        sleep 3 - $v;
        $c.send: "$v from thread {$*THREAD.id}";
    }
}
 
# Wait 3 seconds before closing the channel 
Promise.in(3).then: { $c.close }
 
# Continuously loop and poll the channel, until it's closed 
my $is-closed = $c.closed;
loop {
    if $c.poll -> $item {
        say "$item received after {now - INIT now} seconds";
    }
    elsif $is-closed {
        last;
    }
 
    say 'Doing some unrelated things...';
    sleep .6;
}
 
# Doing some unrelated things... 
# Doing some unrelated things... 
# 2 from thread 5 received after 1.2063182 seconds 
# Doing some unrelated things... 
# Doing some unrelated things... 
# 1 from thread 4 received after 2.41117376 seconds 
# Doing some unrelated things... 
# 0 from thread 3 received after 3.01364461 seconds 
# Doing some unrelated things...

方法 closed 返回一个 Promise,当通道关闭时,该承诺将被兑现(因此在布尔上下文中将评估为 True)。

.poll 方法可以与 .receive 方法结合使用,作为一种缓存机制,其中 .poll 返回的值为空是需要获取更多值并将其加载到通道中的信号

sub get-value {
    return $c.poll // do { start replenish-cache$c.receive };
}
 
sub replenish-cache {
    for ^20 {
        $c.send: $_ for slowly-fetch-a-thing();
    }
}

通道可以在前面描述的 react 代码块的 whenever 中代替 Supply 使用

my $channel = Channel.new;
my $p = start {
    react {
        whenever $channel {
            say $_;
        }
    }
}
 
await (^10).map: -> $r {
    start {
        sleep $r;
        $channel.send($r);
    }
}
 
$channel.close;
await $p;

也可以从 Supply 使用 Channel 方法 获取 Channel,该方法返回一个由 Supply 上的 tap 馈送的 Channel

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
my $channel = $supply.Channel;
 
my $p = start {
    react  {
        whenever $channel -> $item {
            say "via Channel: $item";
        }
    }
}
 
await (^10).map: -> $r {
    start {
        sleep $r;
        $supplier.emit($r);
    }
}
 
$supplier.done;
await $p;

Channel 每次调用时将返回一个不同的 Channel,该 Channel 使用相同的数据馈送。例如,这可以用于将 Supply 扇出到一个或多个 Channel,以提供程序中的不同接口。

Proc::Async§

Proc::Async 基于所描述的设施来异步运行和交互外部程序

my $proc = Proc::Async.new('echo''foo''bar');
 
$proc.stdout.tap(-> $v { print "Output: $v" });
$proc.stderr.tap(-> $v { print "Error:  $v" });
 
say "Starting...";
my $promise = $proc.start;
 
await $promise;
say "Done.";
 
# Output: 
# Starting... 
# Output: foo bar 
# Done.

命令的路径以及命令的任何参数都提供给构造函数。命令不会在调用 start 之前执行,该方法将返回一个 Promise,该 Promise 在程序退出时将被保留。程序的标准输出和标准错误作为 Supply 对象从方法 stdoutstderr 分别提供,可以根据需要进行抽取。

如果要写入程序的标准输入,可以将 :w 副词提供给构造函数,并在程序启动后使用方法 writeprintsay 写入打开的管道

my $proc = Proc::Async.new(:w'grep''foo');
 
$proc.stdout.tap(-> $v { print "Output: $v" });
 
say "Starting...";
my $promise = $proc.start;
 
$proc.say("this line has foo");
$proc.say("this one doesn't");
 
$proc.close-stdin;
await $promise;
say "Done.";
 
# Output: 
# Starting... 
# Output: this line has foo 
# Done.

某些程序(例如,在本例中没有文件参数的 grep)在标准输入关闭之前不会退出,因此当您完成写入时,可以调用 close-stdin 以允许 start 返回的 Promise 被保留。

低级 API§

线程§

并发性的最低级接口由 Thread 提供。线程可以被认为是一段代码,最终可能在处理器上运行,其安排几乎完全由虚拟机和/或操作系统完成。线程应该被认为,就所有意图而言,基本上是未管理的,并且应该避免在用户代码中直接使用它们。

线程可以创建,然后在稍后实际运行

my $thread = Thread.new(code => { for  1 .. 10  -> $v { say $v }});
# ... 
$thread.run;

或者可以在一次调用中创建和运行

my $thread = Thread.start({ for  1 .. 10  -> $v { say $v }});

在这两种情况下,都可以使用 finish 方法等待 Thread 对象封装的代码完成,该方法将在线程完成之前阻塞

$thread.finish;

除此之外,没有其他用于同步或资源共享的设施,这在很大程度上是为什么应该强调线程不太可能在用户代码中直接使用。

调度器§

并发 API 的下一级由实现角色 Scheduler 定义的接口的类提供。调度器接口的目的是提供一种机制来确定使用哪些资源来运行特定任务以及何时运行它。大多数高级并发 API 都建立在调度器之上,用户代码可能根本不需要使用它们,尽管某些方法(例如在 Proc::AsyncPromiseSupply 中找到的方法)允许您显式地提供调度器。

当前默认全局调度器在变量 $*SCHEDULER 中可用。

调度器的主要接口(实际上是 Scheduler 接口所需的唯一方法)是 cue 方法

method cue(:&codeInstant :$at:$in:$every:$times = 1:&catch)

这将调度 &code 中的 Callable,以使用调度器实现的执行方案,以副词(如 Scheduler 中所述)确定的方式执行。例如

my $i = 0;
my $cancellation = $*SCHEDULER.cue({ say $i++}every => 2 );
sleep 20;

假设 $*SCHEDULER 没有从默认值更改,将大约(即,在操作系统调度容差范围内)每两秒打印一次数字 0 到 10。在这种情况下,代码将被调度运行,直到程序正常结束,但是该方法返回一个 Cancellation 对象,该对象可用于在正常完成之前取消已安排的执行

my $i = 0;
my $cancellation = $*SCHEDULER.cue({ say $i++}every => 2 );
sleep 10;
$cancellation.cancel;
sleep 10;

应该只输出 0 到 5。

尽管 Scheduler 接口相对于 Thread 具有明显的优势,但所有功能都可通过更高级别的接口获得,因此没有必要直接使用调度器,除非是在上面提到的情况下,您可以在其中显式地将调度器提供给某些方法。

如果库有特殊需求,它可能希望提供一个替代的调度器实现,例如,UI 库可能希望所有代码都在单个 UI 线程中运行,或者可能需要一些自定义优先级机制,但是下面描述的标准提供的实现应该足以满足大多数用户代码。

ThreadPoolScheduler§

ThreadPoolScheduler 是默认的调度器,它维护一个线程池,这些线程按需分配,并在需要时创建新的线程,直到达到创建调度器对象时作为参数给出的最大数量(默认值为 16)。如果超过最大值,则 cue 可能会将代码排队,直到有线程可用。

Rakudo 允许在程序启动时通过环境变量 RAKUDO_MAX_THREADS 设置默认调度器中允许的最大线程数。

CurrentThreadScheduler§

CurrentThreadScheduler 是一个非常简单的调度器,它总是将代码调度到当前线程上立即运行。这意味着在这个调度器上的 cue 将阻塞,直到代码执行完毕,这限制了它在某些特殊情况下的实用性,例如测试。

§

Lock 提供了在并发环境中保护共享数据的低级机制,因此它是支持高级 API 中线程安全性的关键,这在其他编程语言中有时被称为“互斥锁”。由于更高级别的类(PromiseSupplyChannel)在需要时使用 Lock,因此用户代码不太可能需要直接使用 Lock

Lock 的主要接口是方法 protect,它确保代码块(通常称为“临界区”)一次只在一个线程中执行。

my $lock = Lock.new;
 
my $a = 0;
 
await (^10).map: {
    start {
        $lock.protect({
            my $r = rand;
            sleep $r;
            $a++;
        });
    }
}
 
say $a# OUTPUT: «10␤»

protect 返回代码块返回的任何内容。

由于 protect 会阻塞任何等待执行临界区的线程,因此代码应该尽可能快。

安全问题§

一些共享数据并发问题不像其他问题那么明显。有关此主题的良好一般性文章,请参阅此 博客文章

一个需要注意的特殊问题是容器自动生成或扩展发生时。当 ArrayHash 条目最初被分配时,底层结构会发生改变,并且该操作不是异步安全的。例如,在此代码中

my @array;
my $slot := @array[20];
$slot = 'foo';

第三行是临界区,因为这是数组扩展的地方。最简单的解决方法是使用 Lock 来保护临界区。一个可能更好的解决方法是重构代码,以便不需要共享容器。