class Lock::Async {}

一个 Lock::Async 实例提供了一个互斥机制:当锁被持有时,任何其他想要 lock 的代码都必须等到持有者在它上面调用 unlock,这有助于防止因数据从不同线程同时读取和修改而导致的各种问题。

与提供传统操作系统支持的互斥机制的 Lock 不同,Lock::Async 与 Raku 的高级并发特性一起工作。lock 方法返回一个 Promise,它将在锁可用时被保留。这个 Promise 可以与非阻塞 await 一起使用。这意味着不必在等待 Lock::Async 可用时消耗线程池中的线程,并且一旦锁可用,尝试获取锁的代码将恢复。

结果是完全有可能有数千个未完成的 Lock::Async 锁请求,但线程池中只有少量线程。尝试使用传统的 Lock 不会那么顺利!

没有要求 Lock::Async 被同一个物理线程锁定和解锁,这意味着在持有锁时可以执行非阻塞 await。反过来说,Lock::Async 不可重入。

虽然 Lock::Async 根据高级 Raku 并发机制工作,但它应该被视为一个构建块。事实上,它位于 Supply 并发模型的核心。更喜欢构建程序,以便它们传达结果而不是改变共享数据结构,使用 PromiseChannelSupply 等机制。

方法§

方法 lock§

method lock(Lock::Async:D: --> Promise:D)

返回一个 Promise,它将在锁可用时被保留。如果锁已经可用,则将返回一个已经保留的 Promise。使用 await 以非阻塞方式等待锁可用。

my $l = Lock::Async.new;
await $l.lock;

更喜欢使用 protect 而不是显式调用 lockunlock

方法 unlock§

method unlock(Lock::Async:D: --> Nil)

释放锁。如果有任何未完成的 lock Promise,队列首部的那个将被保留,并且可能在线程池上调度代码(因此调用 unlock 的成本仅限于调度另一段想要获取锁的代码所需的工作,但不会执行该代码)。

my $l = Lock::Async.new;
await $l.lock;
$l.unlock;

更喜欢使用 protect 而不是显式调用 lockunlock。但是,如果希望分别使用这些方法,明智的做法是使用 LEAVE 块来确保可靠地调用 unlock。未能 unlock 意味着没有人能够再次 lock 这个特定的 Lock::Async 实例。

my $l = Lock::Async.new;
{
    await $l.lock;
    LEAVE $l.unlock;
}

方法 protect§

method protect(Lock::Async:D: &code)

此方法可靠地将传递给 &code 参数的代码用锁包裹起来,该锁就是其调用的锁。它调用 lock,执行 await 以等待锁可用,然后即使代码抛出异常,也会可靠地调用 unlock

请注意,Lock::Async 本身需要在代码的线程化部分外部创建,并且需要保护该部分。在下面的第一个示例中,首先创建 Lock::Async 并将其分配给 $lock,然后在 Promise 代码内部使用它来保护敏感代码。在第二个示例中,出现了一个错误,Lock::Async 直接在 Promise 内部创建,因此代码最终会产生一堆不同的锁,这些锁在不同的线程中创建,因此它们实际上并没有保护我们想要保护的代码。在第二个示例中,从不同同时修改一个数组是不安全的,并且会导致内存错误。

# Compute how many prime numbers there are in first 10 000 of them 
# using 50 threads 
my @primes = 0 .. 10_000;
my @results;
my @threads;
 
# Right: $lock is instantiated outside the portion of the 
# code that will get threaded and be in need of protection, 
# so all threads share the lock 
my $lock = Lock::Async.new;
for ^50 -> $thread {
    @threads.push: start {
        $lock.protect: {
            my $from = $thread * 200;
            my $to = ($thread + 1* 200;
            @results.append: @primes[$from..$to].map(*.is-prime);
        }
    }
}
 
# await for all threads to finish calculation 
await Promise.allof(@writers);
# say how many prime numbers we found 
say "We found " ~ @results.grep(*.value).elems ~ " prime numbers";

下面的示例演示了错误的方法:如果没有适当的锁定,此代码在大多数情况下都能正常工作,但偶尔会导致错误的错误消息或低级内存错误

# !!! WRONG !!! Lock::Async is instantiated inside threaded area, 
# so all the 20 threads use 20 different locks, not syncing with 
# each other 
for ^50 -> $thread {
    @threads.push: start {
        my $lock = Lock::Async.new;
        $lock.protect: {
            my $from = $thread * 200;
            my $to = ($thread + 1* 200;
            @results.append: @primes[$from..$to].map(*.is-prime);
        }
    }
}

方法 protect-or-queue-on-recursion§

method protect-or-queue-on-recursion(Lock::Async:D: &code)

当对已锁定的 Lock::Async 实例调用 protect 时,该方法将被迫阻塞,直到锁被解锁。protect-or-queue-on-recursion 通过以下方式避免了这个问题:如果锁已解锁或锁被调用者链外部的东西锁定了,则其行为与 protect 相同,返回 Nil,或者将对 &code 的调用排队并返回 Promise(如果锁已在调用者链中的另一点被锁定)。

my Lock::Async $lock .= new;
my Int         $count = 0;
 
# The lock is unlocked, so the code runs instantly. 
$lock.protect-or-queue-on-recursion({
    $count++
});
 
# Here, we have caller recursion. The outer call only returns a Promise 
# because the inner one does. If we try to await the inner call's Promise 
# from the outer call, the two calls will block forever since the inner 
# caller's Promise return value is just the outer's with a then block. 
$lock.protect-or-queue-on-recursion({
    $lock.protect-or-queue-on-recursion({
        $count++
    }).then({
        $count++
    })
});
 
# Here, the lock is locked, but not by anything else on the caller chain. 
# This behaves just like calling protect would in this scenario. 
for 0..^2 {
    $lock.protect-or-queue-on-recursion({
        $count++;
    });
}
 
say $count# OUTPUT: 5

方法 with-lock-hidden-from-recursion-check§

method with-lock-hidden-from-recursion-check(&code)

临时重置 Lock::Async 递归列表,以便其不再包含此方法调用的锁,如果在调用者链中已调用 protect-or-queue-on-recursion 并且已将锁置于递归列表中,则立即运行给定的 &code

my Lock::Async $lock .= new;
my Int         $count = 0;
 
$lock.protect-or-queue-on-recursion({
    my Int $count = 0;
 
    # Runs instantly. 
    $lock.with-lock-hidden-from-recursion-check({
        $count++;
    });
 
    # Runs after the outer caller's protect-or-queue-on-recursion call has 
    # finished running. 
    $lock.protect-or-queue-on-recursion({
        $count++;
    }).then({
        say $count# OUTPUT: 2 
    });
 
    say $count# OUTPUT: 1 
});