重新设计 Crystal 中的 MT(multi-method support) 以提高效率. 例如, 在存在 sleeping 线程的时候, 总是避免 block 纤程, 同时允许开发者手动的 创建以及选择不同的 execution contexts 来运行 Fiber.
这个特性灵感来自于: Golang 和 Kotlin
本 RFC 主要讨论 parallelism, 这里仅快速总结一下 concurrency.
Fibers(纤程),其他语言可能被称为 coroutines, lightweight-threds 或 user-threads,是 Crystal 中实现 concurrency 的基础。
Fiber 总是使用 block 方式来调用, 在 block 中的代码,在当前 Fiber 运行时中被执行,并且不会返回任何值 (这点不同于 Kotlin). 纤程中如果有操作导致 block,当前纤程被 blocked 并挂起(suspended),在稍后 blocked 操作恢复后,纤程可以被恢复(resume).
这里需要理解的关键是,当一个纤程 A 挂起时,另一个纤程 B 会被自动运行,并在 B block 或执行完成之后,等待 event-loop 报告 是继续执行下一个 Fiber C 还是,恢复前一个纤程或其他纤程的运行。
在并发场景下,一个操作系统纤程,可以同时关联很多个纤程,这意味着,可能同一时间总有一个纤程在运行,这就是 concurrency.
例如,以常见的 IO 操作为例,假设我们从一个 socket 读取内容,高速的处理器需要等待慢速的 IO 返回数据,从开发者角度来看, 这个操作是 block 的,此时 CPU 如果什么都不做等待 IO 完成,完全是 CPU 资源的浪费,在并发(concurrency)场景下,此时, 访问 IO 操作的 Fiber 会立即挂起,释放操作系统线程资源去完成其他任务,等 IO 数据返回后,再立即切换回来继续执行。
纤程是 cooperative(协作式的),无法从程序外部中断或抢占,但是纤程所在的线程它自己有可能被操作系统中断或抢占, 进而阻塞在该线程上运行的所有纤程。
不过 “纤程无法被抢占” 并非并发模型的一部分。在未来的演化中,有可能实现,程序内部主动抢占运行时间过长的纤程, 或者在可抢占的点上要求它们主动让出执行权。
Fiber 的创建以及切换都是非常轻量的。
新派生(spawning) 的 fiber 可以可选的允许在当前线程上派发新的 Fiber,来将一些纤程分组在一起, 或者使用简单的轮询(robin-ish) 方式(无法手动指定),派发到某个正在运行的随机线程上。
当前实现,一个 Fiber 只能在之前挂起(suspend)的线程上被恢复(resume),不可能恢复到另一个线程上。
Thread <=> scheduler <=> event loop(queue),当前实现它们之间都是 唯一的一对一 关系。
从逻辑角度来说,Fiber 应该总是属于一个 scheduler,反倒不太关心具体在那个线程上运行。 因为,也许将来某个实现,Thread 和 scheduler 之间,并非一一对应, 而是 golang M:N scheduler 的关系。 一个 Fiber 恢复时, scheduler 可能选择最空闲的线程,而这个线程和之前挂起时的线程可能是不同的线程。
在不同线程之上运行的 Fibers 可以并行(in parallel) 运行, 但是同一个线程之上运行的 一组纤程,可以并发(concurrency)的方式运行,但是永远不会并行(in parallel)。
同一个线程之上运行的一组纤程,如果操作同一数据,那么这些数据会在 CPU cache 中缓存以提高性能。
如果线程数超过 CPU 可利用线程的数量,或者操作系统非常忙碌,带来大量线程切换, 并不会提高性能,反而有害于性能。
在同一个 Thread 之上运行的一组 Fiber 无法并发(in parallel), 这极大的简化了逻辑,并且 更加快速,因为你不需要担心同组 Fiber 之间的原子同步操作。
新派生的 Fiber 使用简单的轮询方式来选择一个新的线程,这有一个问题,它不考虑忙碌的线程, Fiber 可能被派发到一个繁忙的线程之上,而同时,另一个线程空闲。
scheduler 会等待处理 event loop(queue) 上的事件,当 queue 为空时,这个饥饿的线程 (starving thread) 会立即进入睡眠状态, 甚至其他线程非常忙碌的情况下,而新的 Fiber 可能继续不断的入队繁忙线程的队列。
如果当前 Fiber 是一个 CPU 密集型(CPU bound) 任务, 执行过程中可能没有 可抢占点 (preemptible point)来允许同一个分组的其他 Fibers 执行, 显然这些被 blocked 的 fibers 也没有机会在其他线程上被执行。
最坏的情况下,整个应用程序等待一个 CPU 密集型 fiber 执行完成,而其他有负载的 fibers 一直被阻塞,这限制了并行性。(理想情况下,负载 Fiber 应该尽可能分布在不同的线程之上)
目前没有提供 API 来指定初始 threads/schedulers 的数量,也无法调整大小。 变大其实不是一个问题,但是变小,不得不等待 queue 为空才能停止线程。 如果一个 Fiber 执行的是长期 (long running) 操作, 例如:信号处理循环,或日志处理(logger), 这个线程根本无法被停止掉。
创建一个线程时(例如通过未公开的类似于 Ruby 用法的 Thread.new 方法),也会自动创建对应的 scheduler/event-loop,这样做会引发许多复杂的潜在问题。
下面的话不好翻译,继续引用原内容
Technically we can (Thread.new is undisclosed API but can be called), yet calling anything remotely related to fibers or the event-loop is dangerous as it will immediately create a scheduler for the thread, and/or an event-loop for that thread, and possibly block the thread from doing any progress if the thread puts itself to sleep waiting for an event, or other issues.
一个可能的解决方案是,允许创建 bare 线程,当在这样的线程之上 spawning a fiber, 新派发的 Fiber 会被发送到其他线程或直接抛异常。
有时候,我们需要一个 fiber 在一个线程之上独占运行一段时间,或一直运行,在运行期间, 不可以有新的 Fiber 被调度到这个线程,例如:
当前模式下,无法做到,当前线程被独占,运行专门的纤程,同时将 scheduler 以及其他纤程 移动到新的线程,如果可以,那么专门的纤程和移动的其他纤程是并行,这破坏了纤程之间因该并发的约定。
综合考虑当前方案的优点(局部性)以及缺点(阻塞 fiber), 我提案打破 “一个 fiber 只能 被同一个线程 resume” 的约定,代之,一个 fiber 可以在任意线程之上 resume 。
这使得更多的优化场景成为可能,例如:现代多线程模型的工作窃取(work stealing),允许空闲的 纤程窃取其他线程的任务队列中的任务,可以有效解决单个线程空闲、其他线程任务繁重的负载不均问题。
这个实现,不必是一个类似于 go 提案的,一个支持 work stealing 的单个 MT 环境,代之, 在运行时,在运行时可以动态创建新的环境。
一个执行上下文,创建并管理一个专门的 pool (池中其中可以一个或多个线程) 上下文主要负责管理如何运行,挂起,以及 fiber 在内部线程中的交换。
应用程序可以并行的创建任意多个执行上下文,它们彼此之间是隔离的,但是仍然可能通过 常见的线程安全的同步原语(synchronization primitives**, 例如:Channel,Mutex 来通讯。
换个说法:一个执行上下文管理一组 fiber,是一个比线程更加高级的抽象,fiber 只是与 execution contexts 关联,而与具体运行时所属的线程解耦,当 spawn 一个新的 fiber, fiber 默认会入队到当前运行 fiber 所在的execution context, 所有子纤程(Child fibers) 都会在父纤程所在的 execution context 之上运行。(除非明确指定其他 context)
我们可以 send 一个纤程到其他 execution context 去执行,但是一个已经派生的 fiber, resume 重新入队时,只能是原来的执行上下文。
下面是标准库中实现的执行上下文:
单线程上下文
所有的 Fiber 不会并行运行,而是使用我们已知的正常的 concurrency 并发原语。 缺点:一个 blocking fiber 将会阻塞当前线程,其他 Fiber 无法继续执行。
多线程上下文
Fiber 在多个线程上并行运行,并且挂起后可以被再恢复到任何一个线程上。 scheduler 和 thread 可以动态增长、减少,并且 scheduler 可以转移到其他线程之上(M:N 模型), 并且线程之间可以 steal fibers。
这种方式的优点是:只要操作系统线程资源可用,可以运行的 Fiber 一定会在某个线程之上运行。
隔离上下文
这样的线程之上,只有一个 fiber 被允许运行,独占线程的全部资源,没有竟态条件,也无需 concurrency, 例如:GUI主循环(Git.main), 游戏循环(game loop) 以及 CPU 密集型计算 ( CPU heavy computation),使用隔离上下文,可以避免这些任务被打扰。
当创建一个隔离上下文时,可以指定一个 context 作为默认 Fiber 将要发送到的上下文。 默认是: Fiber::ExecutionContext.default
一些细节:
上面的列表不是互斥的,你可以创建一个不同于以上任意规则的上下文,例如:多线程,i 但是不开启 work stealing
一个应用程序可以并行的启动任意多的 context
context 应该是可包装的(wrappable), 允许开发者在现有的 context 之上新增方便的功能, 例如:监控 Fiber, 并在 Fiber 执行完之后,自动关闭它。例如:允许额外添加 monitor 功能来自动关闭
当 Fiber 被派生时,Crystal 默认会启动一个包含 work-stealing 能力的多线程(MT)context.
这个默认上下文提供一个满足绝大多数用例的环境,除了针对 concurrency 访问外部资源 需要特别注意之外,或者开发者只需要通过首选的 Channel 方式通讯,开发者就可以在无需 过多考虑如何利用多核的情况下,就可以很好的使用多核。
执行上下文可以配置仅使用一个线程,因此关闭了默认 context 内部并行能力。然而,仍旧 可以和其他 context 并行运行!
直到 Crystal 2.0 之前,默认的 execution context 仍然是单线程(ST)模式。 需要一个编译时选项,例如:-Dmt 来让默认 context 使用多线程模式。
应用程序可以创建除了默认执行上下文之外其他执行上下文。
这些执行上下文配置为不同的行为,例如,使用 ST 模式,或隔离上下文,甚至允许调整 线程优先级 及 CPU 相关性, 以便于在 CPU 内核上更好的分配。
理想情况下,开发者可以自己实现一个定制的上下文,或包装一个现有的上下文以增强它。
2 我们可以建立一个专门用来处理 UI 或 游戏循环的 execution context,然后使用默认的 context来处理复杂的计算或处理 Web 请求,彼此不影响,UI 会会被卡住。
创建一个单独的 MT execution context 专门执行 CPU 密集算法(例如,BCrypt),这将 阻塞当前线程,并且允许操作系统抢占线程,而默认的上下文(例如 Web 应用登录)就不会 在成千上万用户同时尝试登录时(需要使用 BCrypt 算法验证密码),被阻塞。
Crystal 编译器在 parsing 以及 semantic 阶段不需要 MT, 因此我们可以配置这些阶段 的 execution context 只使用一个独占的线程,当进入 codegen 阶段时,选择另一个支持 MT 的 context,当 spawn fiber 时使用尽可能多的 Thread 和 CPU。
不同的上下文可能具有不同的优先级和偏好,以便操作系统能够在异构计算架构 (例如 ARM big.LITTLE)中更高效地分配线程
An execution context shall provide:
configuration (e.g. number of threads, …);
methods to spawn, enqueue, yield and reschedule fibers within its premises;
a scheduler to run the fibers (or many schedulers for a MT context);
an event loop (IO & timers):
=> this might be complex: I don’t think we can share a libevent across event bases? we already need to have a “thread local” libevent object for IO objects as well as for PCRE2 (though this is an optimization).
=> we might want to move to our own primitives on top of epoll (Linux) and kqueue (BSDs) since we already wrap IOCP (Win32) and a PR for io_uring (Linux) so we don’t have to stick to libevent2 limitations (e.g. we may be able to allocate events on the stack since we always suspend the fiber).
Ideally developers would be able to create custom execution contexts. That means we must have public APIs for at least the EventLoop and maybe the Scheduler (at least its resume method), which sounds like a good idea.
In addition, synchronization primitives, such as Channel(T) or Mutex, must allow communication and synchronization across execution contexts, and thus be thread-safe.
1
2 class Thread
3 # reference to the current execution context
4 property! execution_context : Fiber::ExecutionContext
5
6 # reference to the current MT scheduler (only present for MT contexts)
7 property! execution_context_scheduler : Fiber::ExecutionContext::Scheduler
8
9 # reference to the currently running fiber (simpler access + support scenarios
10 # where a whole scheduler is moved to another thread when a fiber has blocked
11 # for too long: the fiber would still need to access `Fiber.current`).
12 property! current_fiber : Fiber
13 end
14
15 class Fiber
16 def self.current : Fiber
17 Thread.current.current_fiber
18 end
19
20 def self.yield : Nil
21 ::sleep(0.seconds)
22 end
23
24 property execution_context : Fiber::ExecutionContext
25
26 def initialize(@name : String?, @execution_context : Fiber::ExecutionContext)
27 end
28
29 def enqueue : Nil
30 @execution_context.enqueue(self)
31 end
32
33 @[Deprecated("Use Fiber#enqueue instead")]
34 def resume : Nil
35 # can't call Fiber::ExecutionContext#resume directly (it's protected)
36 Fiber::ExecutionContext.resume(self)
37 end
38 end
39
40 def spawn(*, name : String?, execution_context : Fiber::ExecutionContext = Fiber::ExecutionContext.current, &block) : Fiber
41 Fiber.new(name, execution_context, &block)
42 end
43
44 def sleep : Nil
45 Fiber::ExecutionContext.reschedule
46 end
47
48 def sleep(time : Time::Span) : Nil
49 Fiber.current.resume_event.add(time)
50 Fiber::ExecutionContext.reschedule
51 end
52
And the proposed API. There are two distinct modules that each handle a specific parts:
Fiber::ExecutionContext is the module aiming to implement the public facing API, for context creation and cross context communication; there can only be one instance object of an execution context at a time.
Fiber::ExecutionContext::Scheduler is the module aiming to implement internal API for each scheduler; there should be one scheduler per thread, and there may be one or more schedulers at a time for a single execution context (e.g. MT).
There is some overlap between each module, especially around spawning and enqueueing fibers, but the context they're expected to run differ: while the former need thread safe methods (i.e. cross context enqueues), the latter can assume thread local safety.
1
2 module Fiber::ExecutionContext
3 # the default execution context (always started)
4 class_getter default = MultiThreaded.new("DEFAULT", size: System.cpu_count.to_i)
5
6 def self.current : ExecutionContext
7 Thread.current.execution_context
8 end
9
10 # the following methods delegate to the current execution context, they expose
11 # the otherwise protected instance methods which are only safe to call on the
12 # current execution context:
13
14 # Suspends the current fiber and resumes the next runnable fiber.
15 def self.reschedule : Nil
16 Scheduler.current.reschedule
17 end
18
19 # Resumes `fiber` in the execution context. Raises if the fiber
20 # doesn't belong to the context.
21 def self.resume(fiber : Fiber) : Nil
22 if fiber.execution_context == current
23 Scheduler.current.resume(fiber)
24 else
25 raise RuntimeError.new
26 end
27 end
28
29 # the following methods can be called from whatever context and must be thread
30 # safe (even ST):
31
32 abstract def spawn(name : String?, &block) : Fiber
33 abstract def spawn(name : String?, same_thread : Bool, &block) : Fiber
34 abstract def enqueue(fiber : Fiber) : Nil
35
36 # the following accessors don’t have to be protected, but their implementation
37 # must be thread safe (even ST):
38
39 abstract def stack_pool : Fiber::StackPool
40 abstract def event_loop : Crystal::EventLoop
41 end
42
43 module Fiber::ExecutionContext::Scheduler
44 def self.current : ExecutionContext
45 Thread.current.execution_context_scheduler
46 end
47
48 abstract def thread : Thread
49 abstract def execution_context : ExecutionContext
50
51 # the following methods are expected to only be called from the current
52 # execution context scheduler (aka current thread):
53
54 abstract def spawn(name : String?, &block) : Fiber
55 abstract def spawn(name : String?, same_thread : Bool, &block) : Fiber
56 abstract def enqueue(fiber : Fiber) : Nil
57
58 # the following methods must only be called on the current execution context
59 # scheduler, otherwise we could resume or suspend a fiber on whatever context:
60
61 protected abstract def reschedule : Nil
62 protected abstract def resume(fiber : Fiber) : Nil
63
64 # General wrapper of fiber context switch that takes care of the gc rwlock,
65 # releasing the stack of dead fibers safely, ...
66 protected def swapcontext(fiber : Fiber) : Nil
67 end
68 end
69
Then we can implement a number of default execution contexts. For example a single threaded context might implement both modules as a single type, taking advantage of only having to deal with a single thread. For example:
1
2 class SingleThreaded
3 include Fiber::ExecutionContext
4 include Fiber::ExecutionContext::Scheduler
5
6 def initialize(name : String)
7 # todo: start one thread
8 end
9
10 # todo: implement abstract methods
11 end
12
A multithreaded context should implement both modules as different types, since there will be only one execution context but many threads that will each need a scheduler. For example:
1
2 class MultiThreaded
3 include Fiber::ExecutionContext
4
5 class Scheduler
6 include Fiber::ExecutionContext::Scheduler
7
8 # todo: implement abstract methods
9 end
10
11 getter name : String
12
13 def initialize(name : String, @size : Int32)
14 # todo: start @size threads
15 end
16
17 def spawn(name : String?, same_thread : Bool, &block) : Fiber
18 raise RuntimeError.new if same_thread
19 self.spawn(name, &block)
20 end
21
22 # todo: implement abstract methods
23 end
24
Finally, an isolated context could extend the singlethreaded context, taking advantage of its —in practice we might want to have a distinct implementation since we should only have to deal with two fibers (one isolate + one main loop for its event loop).
1
2 class Isolated < SingleThreaded
3 def initialize(name : String, @spawn_context = Fiber::ExecutionContext.default, &@func : ->)
4 super name
5 @fiber = Fiber.new(name: name, &@func)
6 enqueue @fiber
7 end
8
9 def spawn(name : String?, &block) : Fiber
10 @spawn_context.spawn(name, &block)
11 end
12
13 def spawn(name : String?, same_thread : Bool, &block) : Fiber
14 raise RuntimeError.new if same_thread
15 @spawn_context.spawn(name, &block)
16 end
17
18 # todo: prevent enqueue/resume of anything but @fiber
19 end
20
1
2 # (main fiber runs in the default context)
3 # shrink the main context to a single thread:
4 Fiber::ExecutionContext.default.resize(maximum: 1)
5
6 # create a dedicated context with N threads:
7 ncpu = System.cpu_count
8 codegen = Fiber::ExecutionContext::MultiThreaded.new(name: "CODEGEN", minimum: ncpu, maximum: ncpu)
9 channel = Channel(CompilationUnit).new(ncpu * 2)
10 group = WaitGroup.new(ncpu)
11
12 spawn do
13 # (runs in the default context)
14 units.each do |unit|
15 channel.send(unit)
16 end
17 end
18
19 ncpu.times do
20 codegen.spawn do
21 # (runs in the codegen context)
22 while unit = channel.receive?
23 unit.compile
24 end
25 ensure
26 group.done
27 end
28 end
29
30 group.wait
31
The default execution context moving from 'a fiber is always resumed on the same thread" to the more lenient 'a fiber can be resumed by any thread", this introduces a couple breaking changes over preview_mt.
Drop the "one fiber will always be resumed on the same thread" assumption, instead:
Drop the same_thread option on spawn, instead:
Drop the preview_mt flag and make execution contexts the only compilation mode (at worst introduce without_mt):
The Fiber#resume public method is deprecated because a fiber can't be resumed into any execution context:
[!NOTE] The breaking changes can be postponed to Crystal 2 by making the default execution context be ST, keep supporting same_thread: true for ST while MT would raise, and same_thread: false would be a NOOP. A compilation flag can be introduced to change the default context to be MT in Crystal 1 (e.g. keep -Dpreview_mt but consider just -Dmt). Crystal 2 would drop the same_thread argument, make the default context MT:N, and introduce a -Dwithout_mt compilation flag to return to ST to ease the transition.
Alternatively, since the -Dpreview_mt compilation flag denotes an experimental feature, we could deprecate same_thread in a Crystal 1.x release, then make it a NOOP and set the default context to MT:1 in a further Crystal 1.y release. Crystal 2 would then drop the same_thread argument and change the default context to MT:N.
Usages of the Crystal::ThreadLocalValue helper class might break with fibers moving across threads. We use this because the GC can't access the Thread Local Storage space of threads. Some usages might be replaceable with direct accessors on Thread.current but some usages link a particular object instance to a particular thread; those cases will need to be refactored.
[!CAUTION] This might actually be a bug even with ST: the thread could switch to another fiber while checking for recursion. I assume current usages are cpu-bound and never reach a preemptible point, but still, there are no guarantees.
Regex (PCRE2): not affected (no fiber preemption within usages of JIT stack and match data objects);
IO::Evented: will need an overhaul depending on the event loop details (still one per thread? or one per context?);
[!WARNING] The event loop will very likely need an interface to fully abstract OS specific details around nonblocking calls (epoll, kqueue, IOCP, io_uring, …). See #10766.
The rationale is to have efficient parallelism to make the most out of the CPU cores, to provide an environment that should usually work, yet avoid limiting the choices for the developer to squeeze the last remaining drops of potential parallelism.
The most obvious alternative would be to keep the current MT model. Maybe we could fix a few shortcomings, for example shrink/resize, start a thread without a scheduler, allow fibers non explicitly marked as same thread to be moved across threads (though it might be hard to efficiently steal fibers), ...
Another alternative would be to implement Go's model and make it the only possible environment. This might not be the ideal scenario, since Crystal can't preempt a fiber (unlike Go) having specific threads might be a better solution to running CPU heavy computations without blocking anything (if possible).
One last, less obvious solution, could be to drop MT completely, and assume that crystal applications can only live on a single thread. That would heavily simplify synchronization primitives. Parallelism could be achieved with multiple processes and IPC (Inter Process Communication), allowing an application to be distributed inside a cluster.
As mentioned in the introduction, this proposal takes inspiration from Go and Kotlin (coroutines guide).
Both languages have coroutines and parallelism built right into the language. Both declare that coroutines may be resumed by whatever thread, and both runtimes expose an environment where multiple threads can resume the coroutines.
Go provides one execution context: their MT optimized solution. Every coroutine runs within that global context.
Kotlin, on the other hand, provides a similar environment to Go by default, but allows to create additional execution contexts with one or more threads. Coroutines then live inside their designated context, which may each be ST or MT.
The Tokio crate for Rust provides a single-threaded scheduler or a multi-threaded scheduler (with work stealing), but it must be chosen at compilation time. Apparently the MT scheduler is selected by default (to be verified).
The feature will likely require both the preview_mt and execution_contexts flags for the initial implementations.
Since it is a breaking change from stable Crystal, unrelated to preview_mt, MT and execution contexts may only become default with a Crystal major release?
Maybe MT could be limited to 1 thread by default, and spawn(same_thread) be deprecated, but what to do with spawn(same_thread: true)?
This proposal doesn’t solve the inherent problem of: how can applications configure the default context at runtime (e.g. number of MT schedulers) since we create the context before the application’s main can start.
We could maybe have sensible defaults + lazily started schedulers & threads, which means that the default context would only start 1 thread to run 1 scheduler, then start up to System.cpu_count on demand (using some heuristic).
The application would then be able to scale the default context programmatically, for example to a minimum of 4 schedulers which will immediately start the additional schedulers & threads.
MT execution contexts could eventually have a dynamic number of threads, adding new threads to a context when it has enough work, and removing threads when threads are starving (always within min..max limits). Ideally the threads could return to a thread pool (up to a total limit, like GOMAXPROCS in Go) and be reused by other contexts.