1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Components for the basic mechanisim for HTTP/1.1 server upgrade.
//!
//! # Examples
//!
//! ```
//! # extern crate tsukuyomi;
//! # extern crate futures;
//! # extern crate http;
//! # use tsukuyomi::error::Error;
//! # use tsukuyomi::input::Input;
//! # use tsukuyomi::input::upgrade::{Upgraded, UpgradeContext};
//! # use tsukuyomi::output::{Output, ResponseBody};
//! # use futures::{future, Future};
//! # use http::{header, StatusCode, Response};
//! fn validate(input: &Input) -> Result<(), Error> {
//!     // do some stuff ...
//! #   Ok(())
//! }
//!
//! fn on_upgrade(io: Upgraded, cx: UpgradeContext)
//!     -> impl Future<Item = (), Error = ()> + Send + 'static {
//!     // ...
//! #   future::ok(())
//! }
//!
//! fn handshake(input: &mut Input) -> Result<Output, Error> {
//!     validate(input)?;
//!
//!     // Register a callback function called when upgrading
//!     // the server protocol.
//!     let _ = input.body_mut().on_upgrade(on_upgrade);
//!
//!     // Build the handshake response.
//!     // If the status code is set to `101 Switching Protocols`,
//!     // a task will be generated by calling a callback function
//!     // registered at the above section at the end of handling
//!     // the request.
//!     Response::builder()
//!         .status(StatusCode::SWITCHING_PROTOCOLS)
//!         .header(header::UPGRADE, "foo")
//!         .body(ResponseBody::empty())
//!         .map_err(Error::internal_server_error)
//! }
//! ```

use futures::{Future, IntoFuture};
use http::Request;
pub use hyper::upgrade::Upgraded;

use app::App;
use app::RouteId;
use input::local_map::LocalMap;

/// Contextual information used at upgrading to another protocol.
#[derive(Debug)]
pub struct UpgradeContext {
    pub(crate) request: Request<()>,
    pub(crate) app: App,
    pub(crate) locals: LocalMap,
    pub(crate) route: RouteId,
    pub(crate) params: Vec<(usize, usize)>,
}

impl UpgradeContext {
    /// Returns the reference to a `Request<()>` used during the handshake.
    pub fn request(&self) -> &Request<()> {
        &self.request
    }

    /// Returns the reference to a `LocalMap` used during the handshake.
    pub fn locals(&self) -> &LocalMap {
        &self.locals
    }

    /// Returns the reference to a value of `T` registered in the global storage.
    pub fn get<T>(&self) -> Option<&T>
    where
        T: Send + Sync + 'static,
    {
        self.app.get(self.route)
    }
}

/// A trait representing a function called at performing the protocol upgrade.
pub trait OnUpgrade: Send + 'static {
    /// Creates a task for processing the upgraded protocol from the specified context.
    fn on_upgrade(self, io: Upgraded, cx: UpgradeContext) -> Box<dyn Future<Item = (), Error = ()> + Send + 'static>;
}

impl<F, R> OnUpgrade for F
where
    F: FnOnce(Upgraded, UpgradeContext) -> R + Send + 'static,
    R: IntoFuture<Item = (), Error = ()>,
    R::Future: Send + 'static,
{
    fn on_upgrade(self, io: Upgraded, cx: UpgradeContext) -> Box<dyn Future<Item = (), Error = ()> + Send + 'static> {
        Box::new((self)(io, cx).into_future())
    }
}

pub(crate) struct OnUpgradeObj(
    Box<dyn FnMut(Upgraded, UpgradeContext) -> Box<dyn Future<Item = (), Error = ()> + Send> + Send + 'static>,
);

impl OnUpgradeObj {
    pub(crate) fn new<T: OnUpgrade>(on_upgrade: T) -> Self {
        let mut on_upgrade = Some(on_upgrade);
        OnUpgradeObj(Box::new(move |io, cx| {
            let on_upgrade = on_upgrade.take().unwrap();
            on_upgrade.on_upgrade(io, cx)
        }))
    }

    pub(crate) fn upgrade(
        mut self,
        io: Upgraded,
        cx: UpgradeContext,
    ) -> Box<dyn Future<Item = (), Error = ()> + Send + 'static> {
        (self.0)(io, cx)
    }
}