1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#![allow(missing_docs)]
use futures::{Async, Poll};
use std::cell::Cell;
use std::{error, fmt};
use tokio_threadpool;
#[derive(Debug, Copy, Clone)]
pub enum RuntimeMode {
ThreadPool,
CurrentThread,
}
thread_local!(static MODE: Cell<Option<RuntimeMode>> = Cell::new(None));
struct ResetOnDrop(Option<RuntimeMode>);
impl Drop for ResetOnDrop {
fn drop(&mut self) {
MODE.with(|mode| mode.set(self.0));
}
}
pub(crate) fn with_set_mode<R>(mode: RuntimeMode, f: impl FnOnce() -> R) -> R {
let prev = MODE.with(|m| m.replace(Some(mode)));
let _reset = ResetOnDrop(prev);
if prev.is_some() {
panic!("The runtime mode has already set.");
}
f()
}
pub fn current_mode() -> Option<RuntimeMode> {
MODE.with(|mode| mode.get())
}
#[derive(Debug)]
pub struct BlockingError(tokio_threadpool::BlockingError);
impl BlockingError {
pub fn into_inner(self) -> tokio_threadpool::BlockingError {
self.0
}
}
#[cfg_attr(tarpaulin, skip)]
impl fmt::Display for BlockingError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
#[cfg_attr(tarpaulin, skip)]
impl error::Error for BlockingError {
fn description(&self) -> &str {
self.0.description()
}
fn cause(&self) -> Option<&dyn error::Error> {
self.0.cause()
}
}
pub fn blocking<R>(f: impl FnOnce() -> R) -> Poll<R, BlockingError> {
match current_mode() {
Some(RuntimeMode::ThreadPool) | None => tokio_threadpool::blocking(f).map_err(BlockingError),
Some(RuntimeMode::CurrentThread) => Ok(Async::Ready(f())),
}
}