非阻塞I/O
本节介绍Tokio提供的网络资源和drivers。 这个组件提供Tokio的主要功能之一:非阻塞,事件驱动,由适当的操作系统原语提供的网络(epoll,kqueue,IOCP,…)。 它以资源和drivers模式为模型在上一节中描述。
网络drivers使用mio构建,网络资源由后备实现Evented的类型。
本指南将重点介绍TCP类型。 其他网络资源(UDP,unix插座,管道等)遵循相同的模式。
网络资源。
网络资源是由网络句柄和对为资源供电的driver的引用组成的类型,例如TcpListener和TcpStream。 最初,在首次创建资源时,driver指针可能是None:
let listener = TcpListener::bind(&addr).unwrap();
在这种情况下,尚未设置对driver的引用。 但是,如果使用带有Handle引用的构造函数,则driver引用将设置为给定句柄表示的driver:
let listener = TcpListener::from_std(std_listener, &my_reactor_handle);
一旦driver与资源相关联,就会将其设置为该资源的生命周期,不能改变。 相关的driver负责接收网络资源的操作系统事件并通知对该资源表示兴趣的任务。
使用资源
资源类型包括以poll_为前缀和在返回类型中包含Async的非阻塞函数。 这些函数与任务系统关联,应该从任务中使用,并作为[Future]实现一部分使用。 例如,TcpStream提供[poll_read]和[poll_write]。 TcpListener提供[poll_accept]。
这里有一个使用[poll_accept]]接受来自侦听器的入站套接字并通过生成新任务来处理它们的任务:
struct Acceptor {
listener: TcpListener,
}
impl Future for Acceptor {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let (socket, _) = try_ready!(self.listener.poll_accept());
// Spawn a task to process the socket
tokio::spawn(process(socket));
}
}
}
资源类型还可以包括返回 future的函数。 这些是使用poll_函数提供附加功能的帮助程序。 例如,TcpStream提供了一个返回 future的[connect]函数。一旦TcpStream与对等方建立了连接(或未能成功),这个 future就会完成。
使用组合器连接TcpStream:
tokio::spawn({
let connect_future = TcpStream::connect(&addr);
connect_future
.and_then(|socket| process(socket))
.map_err(|_| panic!())
});
future也可以直接用于其他future的实现:
struct ConnectAndProcess {
connect: ConnectFuture,
}
impl Future for ConnectAndProcess {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let socket = try_ready!(self.connect.poll());
tokio::spawn(process(socket));
Ok(Async::Ready(()))
}
}
使用driver注册资源
当使用TcpListener :: poll_accept(或任何poll_ 函数)时,如果资源已准备好立即返回,那么它将会这样做。在这种情况下poll_accept,准备就绪意味着有一个套接字在队列中等待被接受。如果资源*没有准备就绪,即没有待接受的套接字,然后资源要求driver一旦准备好就通知当前任务。
第一次NotReady由资源返回,如果资源没有明确地使用Handle参数分配一个driver,则资源将使用driver实例注册自身。这是通过查看与当前执行上下文关联的网络driver来完成的。
执行上下文的默认driver使用本地线程存储,使用with_default设置,并使用[Handle :: current]访问。运行时负责确保,从闭包内传递到with_default过程轮询任务。调用[Handle :: current]访问本地线程由with_default设置,以便将句柄返回给当前执行上下文的driver。
Handle :: current vsHandle :: default
Handle :: current和Handle :: default都返回一个Handle实例。然而,它们略有不同。大多数情况下,Handle :: default就是期望的行为。
Handle :: current为当前driver 立即读取存储在driver中的线程局部变量。这意味着Handle :: current必须从设置默认driver的执行上下文中调用。 Handle :: current当句柄将被发送到不同的执行上下文使用并且用户希望使用特定的反应器(reactor)时使用(参见下面的示例)。
另一方面,[Handle :: default]懒惰地读取线程局部变量。这允许从执行上下文之外获取Handle实例。使用资源时,句柄将访问线程局部变量,如上一节中所述。
例如:
fn main() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let std_listener = ::std::net::TcpListener::bind(&addr).unwrap();
let listener = TcpListener::from_std(std_listener, &Handle::default()).unwrap();
tokio::run({
listener.incoming().for_each(|socket| {
tokio::spawn(process(socket));
Ok(())
})
.map_err(|_| panic!("error"))
});
}
在这个例子中,incoming()返回通过调用 poll_accept实现的一个 future。 该future产生于具有网络driver配置作为执行上下文的一部分的运行之上。 当在执行上下文中调用poll_accept时,即当读取线程本地driver与TcpListener实例相关联。
但是,如果直接使用tokio-threadpool,那么产生threadpool executor之上的任务就会将无法访问reactor:
let pool = ThreadPool::new();
let listener = TcpListener::bind(&addr).unwrap();
pool.spawn({
listener.incoming().for_each(|socket| {
// This will never get called due to the listener not being able to
// function.
unreachable!();
})
.map_err(|_| panic!("error"))
});
为了使上面的示例工作,必须为线程池的执行上下文设置反应器(reactor)。有关更多信息,请参阅building a runtime细节。 或者,可以使用[Handle :: current]获得的Handle:
let pool = ThreadPool::new();
// This does not run on the pool.
tokio::run(future::lazy(move || {
// Get the handle
let handle = Handle::current();
let std_listener = std::net::TcpListener::bind(&addr).unwrap();
// This eagerly links the listener with the handle for the current reactor.
let listener = TcpListener::from_std(std_listener, &handle).unwrap();
pool.spawn({
listener.incoming().for_each(|socket| {
// Do something with the socket
Ok(())
})
.map_err(|_| panic!())
});
Ok(())
}));
网络driver
为所有Tokio的网络类型提供动力的driver是[Reactor]Crate中的[tokio-reactor]类型。 它是使用mio实现的。 调用[Reactor :: turn]使用[mio :: Poll :: poll]获取已注册网络资源的操作系统事件。 然后它使用[task system]通知每个网络资源已注册的任务。 任务被调度为在其关联的executor上运行,然后任务将网络资源视为就绪并且调用poll_ *函数返回Async :: Ready。
将driver与资源链接
driver必须跟踪向其注册的每个资源。 虽然实际实现更复杂,但可以将其视为对单元共享状态的共享引用,类似于:
struct Registration {
// The registration needs to know its ID. This allows it to remove state
// from the reactor when it is dropped.
id: Id,
// The task that owns the resource and is registered to receive readiness
// notifications from the driver.
//
// If `task` is `Some`, we **definitely** know that the resource
// is not ready because we have not yet received an operating system event.
// This allows avoiding syscalls that will return `NotReady`.
//
// If `task` is `None`, then the resource **might** be ready. We can try the
// syscall, but it might still return `NotReady`.
task: Option<task::Task>,
}
struct TcpListener {
mio_listener: mio::TcpListener,
registration: Option<Arc<Mutex<Registration>>>,
}
struct Reactor {
poll: mio::Poll,
resources: HashMap<Id, Arc<Mutex<Registration>>>,
}
这不是真正的实现,而是用于演示行为的简化版本。在实践中,没有Mutex,每个资源实例没有分配单元,并且reactor不使用HashMap。 真正的实现在here
首次使用资源时,它会向driver注册:
impl TcpListener {
fn poll_accept(&mut self) -> Poll<TcpStream, io::Error> {
// If the registration is not set, this will associate the `TcpListener`
// with the current execution context's reactor.
let registration = self.registration.get_or_insert_with(|| {
// Access the thread-local variable that tracks the reactor.
Reactor::with_current(|reactor| {
// Registers the listener, which implements `mio::Evented`.
// `register` returns the registration instance for the resource.
reactor.register(&self.mio_listener)
})
});
if registration.task.is_none() {
// The task is `None`, this means the resource **might** be ready.
match self.mio_listener.accept() {
Ok(socket) => {
let socket = mio_socket_to_tokio(socket);
return Ok(Async::Ready(socket));
}
Err(ref e) if e.kind() == WouldBlock => {
// The resource is not ready, fall through to task registration
}
Err(e) => {
// All other errors are returned to the caller
return Err(e);
}
}
}
// The task is set even if it is already `Some`, this handles the case where
// the resource is moved to a different task than the one stored in
// `self.task`.
registration.task = Some(task::current());
Ok(Async::NotReady)
}
}
请注意,每个资源只有一个task字段。其含义是资源一次只能从一个任务中使用。如果TcpListener :: poll_accept返回NotReady,注册当前任务和将监听器发送到另一个调用poll_accept的任务并视为NotReady,然后第二个任务是唯一一个在套接字准备好被接受后将接收通知的任务。资源可能会支持跟踪不同操作的不同任务。例如,TcpStream内部有两个任务字段:一个用于通知read准备好了,另一个用于通知write准备好了。这允许从不同的任务调用TcpStream :: poll_read和TcpStream :: poll_write。
[mio :: Poll]作为register上面使用的函数的一部分,将事件类型注册到驱动程序的实例中。。同样,本指南使用了简化的实现与实际tokio-reactor的实现不匹配,但足以理解tokio-reactor的行为方式。
impl Reactor {
fn register<T: mio::Evented>(&mut self, evented: &T) -> Arc<Mutex<Registration>> {
// Generate a unique identifier for this registration. This identifier
// can be converted to and from a Mio Token.
let id = generate_unique_identifier();
// Register the I/O type with Mio
self.poll.register(
evented, id.into_token(),
mio::Ready::all(),
mio::PollOpt::edge());
let registration = Arc::new(Mutex::new(Registration {
id,
task: None,
}));
self.resources.insert(id, registration.clone());
registration
}
}
运行driver
driver需要运行才能使其相关资源正常工作。如果driver无法运行,资源永远不会准备就绪。使用Runtime时会自动处理运行driver,但了解它是如何工作的很有用。如果你对真正的实现感兴趣,那么[tokio-reactor] real-impl源码是最好的参考。
当资源注册到driver时,它们也会注册Mio,运行driver在循环中执行以下步骤:
1)调用[Poll :: poll]来获取操作系统事件。
2)发送所有事件到适当的注册过的资源。
上面的步骤是通过调用Reactor :: turn来完成的。循环部分是取决于我们。这通常在后台线程中完成或嵌入executor中作为一个Park实现。有关详细信息,请参阅runtime guide。
loop {
// `None` means never timeout, blocking until we receive an operating system
// event.
reactor.turn(None);
}
turn的实现执行以下操作:
fn turn(&mut self) {
// Create storage for operating system events. This shouldn't be created
// each time `turn` is called, but doing so does not impact behavior.
let mut events = mio::Events::with_capacity(1024);
self.poll.poll(&mut events, timeout);
for event in &events {
let id = Id::from_token(event.token());
if let Some(registration) = self.resources.get(&id) {
if let Some(task) = registration.lock().unwrap().task.take() {
task.notify();
}
}
}
}
任务在其executor上进行调度会通知任务会结果。 当任务再次运行,它将再次调用poll_accept函数。 这次,task插槽将是None。 这意味着应该尝试系统调用,并且这次poll_accept将返回一个被接受的套接字(可能允许虚假事件)。
运行时模型
使用Tokio编写的应用程序组织在大量小的非阻塞任务中。 Tokio任务类似于goroutine或者Erlang进程,但是是非阻塞的。它们设计为轻量级,可以快速生成,并保持较低的调度开销。它们也是非阻塞的,因为无法立即完成的此类操作必须立即返回。它们返回一个表示操作正在进行的值,而不是返回操作的结果,表明操作正在进行中。
非阻塞执行
使用Future trait实现Tokio任务:
struct MyTask {
my_resource: MyResource,
}
impl Future for MyTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.my_resource.poll() {
Ok(Async::Ready(value)) => {
self.process(value);
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
self.process_err(err);
Ok(Async::Ready(()))
}
}
}
}
使用tokio :: spawn或通过调用executor对象上的Spawn方法将任务提交给 executor。 poll函数驱动任务。没有调用poll就什么都不做。在任务上调用poll直到Ready(())返回是 executor的工作。
MyTask将从my_resource接收一个值并处理它。一旦值处理完毕,任务就完成了他的逻辑并结束。这会返回Ok(Async :: Ready(()))。
为了完成处理,任务取决于my_resource提供的值。鉴于my_resource是一个非阻塞任务,它在调用my_resource.poll()时,可能准备好或者还没准备好提供值。如果它准备就绪,它返回Ok(Async :: Ready(value))。如果没有准备好,它会返回Ok(Async::NotReady)。
当资源未准备好提供值时,这意味着该任务本身还没准备好完成,任务的poll函数也返回NotReady。
在未来的某个时刻,资源将随时准备提供值。资源使用任务系统向 executor发信号给executor通知它已准备好。 executor安排任务,导致MyTask :: poll又叫了一遍。这一次,假设my_resource准备就绪,那么值就是从my_resource.poll()返回并且任务完成。
协作调度
协作调度用于在 executor上调度任务。单个 executor将通过一小组线程管理许多任务。将有比线程更多的任务。这也没有抢占。这个意味着当任务被安排执行时,它会阻止当前线程直到poll函数返回。
因此,实现poll在很短的时间内执行才是重要的。对于I / O绑定的应用程序,通常会发生这种情况。但是,如果任务预计必须长时间运行,则应该推迟工作到blocking pool或将计算分解为更小的块和在每个块执行之后yield回来。
任务系统
任务系统是资源通知executor准备就绪的系统。 任务由消耗资源的非阻塞逻辑组成。 在上面的示例中,MyTask使用单个资源my_resource,但没有限制任务可以使用的资源数量。
当任务正在执行并尝试使用未准备好的资源时,它在该资源上被逻辑阻塞,即任务无法进一步处理,直到资源准备就绪。 Tokio跟踪阻塞当前任务的资源以进行推进。当一个依赖资源准备就绪, executor安排任务。这是通过跟踪当任务在资源中表现兴趣完成。
当MyTask执行,尝试使用my_resource和my_resource返回NotReady时,MyTask隐含表示对my_resource资源感兴趣。对此,任务和资源是连接的。什么时候资源准备就绪,任务再次被安排。
task :: current和Task :: notify
通过两个API完成跟踪兴趣并通知准备情况的变化:
- task::current
- Task::notify
当调用my_resource.poll()时,如果资源准备就绪,则立即返回值而不使用任务系统。如果资源没有准备好,通过调用task::current() -> Task 来获取当前任务的句柄。这是通过读取executor设置的线程局部变量集获得此句柄。
一些外部事件(在网络上接收的数据,后台线程完成计算等…)将导致my_resource准备好生成它的值。那时,准备好my_resource的逻辑将调用从task :: current获得的任务句柄上的notify。这个表示准备就绪会改变 executor, executor随后安排任务执行。
如果多个任务表示对资源感兴趣,则只有last任务这样做会得到通知。资源旨在从单一任务使用。
Async :: NotReady
任何返回Async的函数都必须遵守contract(契约)。 当返回NotReady,当前任务必须已经注册准备就绪的变更通知。 以上部分讨论了资源的含义。 对于任务逻辑,这意味着无法返回NotReady除非资源已返回“NotReady”。 通过这样做,contract得到了传承。 当前任务已注册通知,因为已从资源收到NotReady。
必须非常小心避免在没有从资源收到NotReady的情况下返回NotReady。 例如,以下任务中,任务实现结果永远不会完成。
use futures::{Future, Poll, Async};
enum BadTask {
First(Resource1),
Second(Resource2),
}
impl Future for BadTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use BadTask::*;
let value = match *self {
First(ref mut resource) => {
try_ready!(resource.poll())
}
Second(ref mut resource) => {
try_ready!(resource.poll());
return Ok(Async::Ready(()));
}
};
*self = Second(Resource2::new(value));
Ok(Async::NotReady)
}
}
上面实现的问题是Ok(Async :: NotReady)是在将状态转换为Second后立即返回。 在这转换中,没有资源返回NotReady。 当任务本身返回时NotReady,它违反了contract ,因为任务将来不会被通知。
通常通过添加循环来解决这种情况:
use futures::{Future, Poll, Async};
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use BadTask::*;
loop {
let value = match *self {
First(ref mut resource) => {
try_ready!(resource.poll())
}
Second(ref mut resource) => {
try_ready!(resource.poll());
return Ok(Async::Ready(()));
}
};
*self = Second(Resource2::new(value));
}
}
思考它的一种方法是任务的poll函数不能返回,直到由于其资源不能进一步取得进展而准备就绪或明确yields(见下文)。
另请注意,返回Async的函数只能从一个任务调用。 换句话说,这些函数只能从已经提交给tokio :: spawn或其他任务spawn函数调用
Yielding
有时,任务必须返回NotReady而不是在资源上被阻塞。这通常发生在运行计算很大且任务想要的时候将控制权交还 executor以允许其执行其他 future。
Yielding 是通过通知当前任务并返回“NotReady”完成:
Yield可用于分解CPU昂贵的计算:
struct Count {
remaining: usize,
}
impl Future for Count {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while self.remaining > 0 {
self.remaining -= 1;
// Yield every 10 iterations
if self.remaining % 10 == 0 {
task::current().notify();
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
}
executor
executor员负责驱动完成许多任务。任务是产生于 executor之上, 是在executor需要调用它的poll函数的时候。 executor挂钩到任务系统以接收资源准备通知。
通过将任务系统与 executor实现分离,具体执行和调度逻辑可以留给 executor实现。tokio提供两个executor实现,每个实现具有独特的特点:current_thread和thread_pool。
当任务首次在executor之上生成时, executor用Spawn将其包装。这将任务逻辑与任务状态绑定(这主要是遗留原因所需要的)。 executor通常会将任务存储在堆,通常是将它存储在Box或Arc中。当 executor选择一个执行任务,它调用Spawn :: poll_future_notify。此函数确保将任务上下文设置为线程局部变量像task :: current能够读取它。
当调用poll_future_notify时, executor也是传递通知句柄和标识符。这些参数包含在由task :: current返回的任务句柄中,也是有关任务与executor连接的方式。
notify句柄是Notify 的实现,标识符是 executor用于查找当前任务的值。当调用Task::notify,notify函数使用提供的标识符调用notify句柄。该函数的实现负责执行调度逻辑。
实现 executor的一种策略是将每个任务存储在Box和使用链接列表来跟踪计划执行的任务。当调用Notify :: notify,然后将与之关联的任务标识符被推送到scheduled链表的末尾。当 executor运行时,它从链表的前端弹出并执行任务如上所述。
请注意,本节未介绍 executor的运行方式。细节这留给 executor实现。一个选项是 executor产生一个或多个线程并将这些线程专用于排出scheduled链表。另一个是提供一个MyExecutor :: run函数阻塞当前线程并排出scheduled链表。
资源,drivers和运行时
资源是叶子futures,即未以其他futures实施的futures。它们是使用上述任务系统的类型与 executor互动。资源类型包括TCP和UDP套接字,定时器,通道,文件句柄等.Tokio应用程序很少需要实现资源。相反,他们使用Tokio或第三方包装箱提供的资源。
通常,资源本身不能起作用而是需要drivers。例如,Tokio TCP套接字由Reactor支持。Reactor是socket资源driver。单个driver可以为大量资源实例提供动力。要使用该资源,drivers必须在某处运行这个过程。 Tokio提供网络资源的drivers(tokio-reactor),文件资源(tokio-fs)和定时器(tokio-timer)。提供解耦driver组件允许用户选择他们想要使用的组件。每个driver可以单独使用或与其他driver结合使用。
正因为如此,为了使用Tokio并成功执行任务,一个应用程序必须启动 executor和必要的drivers作为应用程序的任务依赖的资源。这需要大量的样板。为了管理样板,Tokio提供了几个运行时选项。运行时是与所有必需drivers捆绑在一起的executor,以便为Tokio的资源提供动力。不是单独管理所有各种Tokio组件,而是在一次调用中创建并启动运行时。
Tokio提供并发运行时和单线程运行时。并发运行时基于多线程、工作窃取 executor。单线程运行时执行当前线程上的所有任务和drivers。用户可以选择最适合应用的运行时。
Future
如上所述,任务是使用Future trait实现的。 这个特点不仅限于实施任务。 一个 Future是表示一个非阻塞计算的值在未来的某个时间完成。 任务是一个计算没有输出。 Tokio中的许多资源都用Future实现。 例如,超时是Future在达到截止日期后完成。
该 trait包括许多与Future值一起工作的有用的组合器。
通过对应用特定类型实现Future来构建应用或使用组合器来定义应用程序逻辑。 通常两者兼而有之策略是最成功的。
本文暂时没有评论,来添加一个吧(●'◡'●)