构建多线程 Web 服务器
目前的单线程版本只能依次处理用户的请求:一时间只能处理一个请求连接。随着用户的请求数增多,可以预料的是排在后面的用户可能要等待数十秒甚至超时!
本章我们将解决这个问题,但是首先来模拟一个慢请求场景,看看单线程是否真的如此糟糕。
基于单线程模拟慢请求
下面的代码中,使用 sleep 的方式让每次请求持续 5 秒,模拟真实的慢请求:
#![allow(unused)] fn main() { // in main.rs use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn handle_connection(mut stream: TcpStream) { // --snip-- let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- } }
由于增加了新的请求路径 /sleep
,之前的 if else
被修改为 match
,需要注意的是,由于 match
不会像方法那样自动做引用或者解引用,因此我们需要显式调用: match &request_line[..]
,来获取所需的 &str
类型。
可以看出,当用户访问 /sleep
时,请求会持续 5 秒后才返回,下面来试试,启动服务器后,打开你的浏览器,这次要分别打开两个页面(tab页): http://127.0.0.1:7878/
和 http://127.0.0.1:7878/sleep
。
此时,如果我们连续访问 /
路径,那效果跟之前一样:立刻看到请求的页面。但假如先访问 /sleep
,接着在另一个页面访问 /
,就会看到 /
的页面直到 5 秒后才会刷出来,验证了请求排队这个糟糕的事实。
至于如何解决,其实办法不少,本章我们来看看一个经典解决方案:线程池。
使用线程池改善吞吐
线程池包含一组已生成的线程,它们时刻等待着接收并处理新的任务。当程序接收到新任务时,它会将线程池中的一个线程指派给该任务,在该线程忙着处理时,新来的任务会交给池中剩余的线程进行处理。最终,当执行任务的线程处理完后,它会被重新放入到线程池中,准备处理新任务。
假设线程池中包含 N 个线程,那么可以推断出,服务器将拥有并发处理 N 个请求连接的能力,从而增加服务器的吞吐量。
同时,我们将限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。
因此,还需对线程池进行一定的架构设计,首先是设定最大线程数的上限,其次维护一个请求队列。池中的线程去队列中依次弹出请求并处理。这样就可以同时并发处理 N 个请求,其中 N 是线程数。
但聪明的读者可能会想到,假如每个请求依然耗时很长,那请求队列依然会堆积,后续的用户请求还是需要等待较长的时间,毕竟你也就 N 个线程,但总归比单线程要强 N 倍吧 :D
当然,线程池依然是较为传统的提升吞吐方法,比较新的有:单线程异步 IO,例如 redis;多线程异步 IO,例如 Rust 的主流 web 框架。事实上,大家在下一个实战项目中,会看到相关技术的应用。
为每个请求生成一个线程
这显然不是我们的最终方案,原因在于它会生成无上限的线程数,最终导致资源耗尽。但它确实是一个好的起点:
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
这种实现下,依次访问 /sleep
和 /
就无需再等待,不错的开始。
限制创建线程的数量
原则上,我们希望在上面代码的基础上,尽量少的去修改,下面是一个假想的线程池 API 实现:
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
代码跟之前的类似,也非常简洁明了, ThreadPool::new(4)
创建一个包含 4 个线程的线程池,接着通过 pool.execute
去分发执行请求。
显然,上面的代码无法编译,下面来逐步实现。
使用编译器驱动的方式开发 ThreadPool
你可能听说过测试驱动开发,但听过编译器驱动开发吗?来见识下 Rust 中的绝招吧。
检查之前的代码,看看报什么错:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error
俗话说,不怕敌人很强,就怕他们不犯错,很好,编译器漏出了破绽。看起来我们需要实现 ThreadPool
类型。看起来,还需要添加一个库包,未来线程池的代码都将在这个独立的包中完成,甚至于未来你要实现其它的服务,也可以复用这个多线程库包。
创建 src/lib.rs
文件并写入如下代码:
#![allow(unused)] fn main() { pub struct ThreadPool; }
接着在 main.rs
中引入:
#![allow(unused)] fn main() { // main.rs use hello::ThreadPool; }
编译后依然报错:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
好,继续实现 new
函数 :
#![allow(unused)] fn main() { pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } } }
继续检查:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| ^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
这个方法类似于 thread::spawn
,用于将闭包中的任务交给某个空闲的线程去执行。
其实这里有一个小难点:execute
的参数是一个闭包,回忆下之前学过的内容,闭包作为参数时可以由三个特征进行约束: Fn
、FnMut
和 FnOnce
,选哪个就成为一个问题。由于 execute
在实现上类似 thread::spawn
,我们可以参考下后者的签名如何声明。
#![allow(unused)] fn main() { pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, }
可以看出,spawn
选择 FnOnce
作为 F
闭包的特征约束,原因是闭包作为任务只需被线程执行一次即可。
F
还有一个特征约束 Send
,也可以照抄过来,毕竟闭包需要从一个线程传递到另一个线程,至于生命周期约束 'static
,是因为我们并不知道线程需要多久时间来执行该任务。
#![allow(unused)] fn main() { impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } }
在理解 spawn
后,就可以轻松写出如上的 execute
实现,注意这里的 FnOnce()
跟 spawn
有所不同,原因是要 execute
传入的闭包没有参数也没有返回值。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
成功编译,但在浏览器访问依然会报之前类似的错误,下面来实现 execute
。
new
还是 build
关于 ThreadPool
的构造函数,存在两个选择 new
和 build
。
new
往往用于简单初始化一个实例,而 build
往往会完成更加复杂的构建工作,例如入门实战中的 Config::build
。
在这个项目中,我们并不需要在初始化线程池的同时创建相应的线程,因此 new
是更适合的选择:
#![allow(unused)] fn main() { impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- } }
这里有两点值得注意:
usize
类型包含0
,但是创建没有任何线程的线程池显然是无意义的,因此做一下assert!
验证ThreadPool
拥有不错的文档注释,甚至包含了可能panic
的情况,通过cargo doc --open
可以访问文档注释
存储线程
创建 ThreadPool
后,下一步就是存储具体的线程,既然要存放线程,一个绕不过去的问题就是:用什么类型来存放,例如假如使用 Vec<T>
来存储,那这个 T
应该是什么?
估计还得探索下 thread::spawn
的签名,毕竟它生成并返回一个线程:
#![allow(unused)] fn main() { pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, }
看起来 JoinHandle<T>
是我们需要的,这里的 T
是传入的闭包任务所返回的,我们的任务无需任何返回,因此 T
直接使用 ()
即可。
#![allow(unused)] fn main() { use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } } // --snip-- } }
如上所示,最终我们使用 Vec<thread::JoinHandle<()>>
来存储线程,同时设定了容量上限 with_capacity(size)
,该方法还可以提前分配好内存空间,比 Vec::new
的性能要更好一点。
将代码从 ThreadPool 发送到线程中
上面的代码留下一个未实现的 for
循环,用于创建和存储线程。
学过多线程一章后,大家应该知道 thread::spawn
虽然是生成线程最好的方式,但是它会立即执行传入的任务,然而,在我们的使用场景中,创建线程和执行任务明显是要分离的,因此标准库看起来不再适合。
可以考虑创建一个 Worker
结构体,作为 ThreadPool
和任务线程联系的桥梁,它的任务是获得将要执行的代码,然后在具体的线程中去执行。想象一个场景:一个餐馆,Worker
等待顾客的点餐,然后将具体的点餐信息传递给厨房,感觉类似服务员?
引入 Worker
后,就无需再存储 JoinHandle<()>
实例,直接存储 Worker
实例:该实例内部会存储 JoinHandle<()>
。下面是新的线程池创建流程:
#![allow(unused)] fn main() { use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { // 尚未实现.. let thread = thread::spawn(|| {}); // 每个 `Worker` 都拥有自己的唯一 id Worker { id, thread } } } }
由于外部调用者无需知道 Worker
的存在,因此这里使用了私有的声明。
大家可以编译下代码,如果出错了,请仔细检查下,是否遗漏了什么,截止目前,代码是完全可以通过编译的,但是任务该怎么执行依然还没有实现。
将请求发送给线程
在上面的代码中, thread::spawn(|| {})
还没有给予实质性的内容,现在一起来完善下。
首先 Worker
结构体需要从线程池 TreadPool
的队列中获取待执行的代码,对于这类场景,消息传递非常适合:我们将使用消息通道( channel )作为任务队列。
#![allow(unused)] fn main() { use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- } }
阅读过之前内容的同学应该知道,消息通道有发送端和接收端,其中线程池 ThreadPool
持有发送端,通过 execute
方法来发送任务。那么问题来了,谁持有接收端呢?答案是 Worker
,它的内部线程将接收任务,然后进行处理。
#![allow(unused)] fn main() { impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } } }
看起来很美好,但是很不幸,它会报错:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error
原因也很简单,receiver
并没有实现 Copy
,因此它的所有权在第一次循环中,就被传入到第一个 Worker
实例中,后续自然无法再使用。
报错就解决呗,但 Rust 中的 channel 实现是 mpsc,即多生产者单消费者,因此我们无法通过克隆消费者的方式来修复这个错误。当然,发送多条消息给多个接收者也不在考虑范畴,该怎么办?似乎陷入了绝境。
雪上加霜的是,就算 receiver
可以克隆,但是你得保证同一个时间只有一个receiver
能接收消息,否则一个任务可能同时被多个 Worker
执行,因此多个线程需要安全的共享和使用 receiver
,等等,安全的共享?听上去 Arc
这个多所有权结构非常适合,互斥使用?貌似 Mutex
很适合,结合一下,Arc<Mutex<T>>
,这不就是我们之前见过多次的线程安全类型吗?
总之,Arc
允许多个 Worker
同时持有 receiver
,而 Mutex
可以确保一次只有一个 Worker
能从 receiver
接收消息。
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- } } }
修改后,每一个 Worker 都可以安全的持有 receiver
,同时不必担心一个任务会被重复执行多次,完美!
实现 execute 方法
首先,需要为一个很长的类型创建一个别名, 有多长呢?
#![allow(unused)] fn main() { // --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --snip-- }
创建别名的威力暂时还看不到,敬请期待。总之,这里的工作很简单,将传入的任务包装成 Job
类型后,发送出去。
但是还没完,接收的代码也要完善下:
#![allow(unused)] fn main() { // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } } }
修改后,就可以不停地循环去接收任务,最后进行执行。还可以看到因为之前 Job
别名的引入, new
函数的签名才没有过度复杂,否则你将看到的是 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Box<dyn FnOnce() + Send + 'static>>>>) -> Worker
,感受下类型别名的威力吧 :D
lock()
方法可以获得一个 Mutex
锁,至于为何使用 unwrap
,难道获取锁还能失败?没错,假如当前持有锁的线程 panic
了,那么这些等待锁的线程就会获取一个错误,因此 通过 unwrap
来让当前等待的线程 panic
是一个不错的解决方案,当然你还可以换成 expect
。
一旦获取到锁里的内容 mpsc::Receiver<Job>>
后,就可以调用其上的 recv
方法来接收消息,依然是一个 unwrap
,原因在于持有发送端的线程可能会被关闭,这种情况下直接 panic
也是不错的。
recv
的调用过程是阻塞的,意味着若没有任何任务,那当前的调用线程将一直等待,直到接收到新的任务。Mutex<T>
可以同一个任务只会被一个 Worker 获取,不会被重复执行。
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
终于,程序如愿运行起来,我们的线程池可以并发处理任务了!从打印的数字可以看到,只有 4 个线程去执行任务,符合我们对线程池的要求,这样再也不用担心系统的线程资源会被消耗殆尽了!
注意: 处于缓存的考虑,有些浏览器会对多次同样的请求进行顺序的执行,因此你可能还是会遇到访问
/sleep
后,就无法访问另一个/sleep
的问题 :(
while let 的巨大陷阱
还有一个问题,为啥之前我们不用 while let
来循环?例如:
#![allow(unused)] fn main() { // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
这段代码编译起来没问题,但是并不会产生我们预期的结果:后续请求依然需要等待慢请求的处理完成后,才能被处理。奇怪吧,仅仅是从 let
改成 while let
就会变成这样?大家可以思考下为什么会这样,具体答案会在下一章节末尾给出,这里先出给一个小提示:Mutex
获取的锁在作用域结束后才会被释放。