1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
//! Various [`Transformation`]s for working with resources in async contexts. //! //! Oftentimes, one needs to transform some resource into a future and spawn it into the runtime. //! While possible to do manually, the types here might be a bit more comfortable. use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use err_context::AnyError; use log::{error, trace}; use spirit::fragment::Transformation; #[cfg(feature = "net")] use super::net::Accept; use super::FutureInstaller; /// A [`Transformation`] to take a resource, turn it into a future and install it. pub struct ToFuture<F>(pub F); impl<F, Fut, R, II, SF> Transformation<R, II, SF> for ToFuture<F> where F: FnMut(R, &SF) -> Result<Fut, AnyError>, Fut: Future<Output = ()> + 'static, { type OutputResource = Fut; type OutputInstaller = FutureInstaller; fn installer(&mut self, _: II, _: &str) -> FutureInstaller { FutureInstaller::default() } fn transform(&mut self, r: R, cfg: &SF, name: &str) -> Result<Fut, AnyError> { trace!("Wrapping {} into a future", name); (self.0)(r, cfg) } } /// A [`Transformation`] to take a resource, turn it into a future and install it. /// /// Unlike [`ToFuture`], this one doesn't pass the configuration to the closure. /// /// # Examples /// /// This is mostly the same example as the one at the crate root, but done slightly differently. /// The future is created later on, during the transformation phase. While a little bit more /// verbose here, this comes with two advantages: /// /// * Works with already provided fragments, like the network primitives in [`net`][crate::net]. In /// that case you might want to prefer the [`ToFuture`], as it also gives access to the original /// configuration fragment, including any extra configuration for the future. /// * The future can be an arbitrary anonymous/unnameable type (eg. `impl Future` or an `async` /// function), there's no need for boxing. This might have slight positive effect on performance. /// /// ```rust /// use std::time::Duration; /// /// use err_context::AnyError; /// use serde::{Deserialize, Serialize}; /// use spirit::{Empty, Pipeline, Spirit}; /// use spirit::prelude::*; /// use spirit::fragment::driver::CacheEq; /// use spirit_tokio::handlers::ToFutureUnconfigured; /// use structdoc::StructDoc; /// /// #[derive(Clone, Debug, Deserialize, PartialEq, Serialize, StructDoc)] /// #[serde(default)] /// struct MsgCfg { /// /// A message to print now and then. /// msg: String, /// /// Time between printing the message. /// interval: Duration, /// } /// /// impl MsgCfg { /// async fn run(self) { /// loop { /// println!("{}", self.msg); /// tokio::time::delay_for(self.interval).await; /// } /// } /// } /// /// impl Default for MsgCfg { /// fn default() -> Self { /// MsgCfg { /// msg: "Hello".to_owned(), /// interval: Duration::from_secs(1), /// } /// } /// } /// /// spirit::simple_fragment! { /// impl Fragment for MsgCfg { /// type Driver = CacheEq<MsgCfg>; /// // We simply send the configuration forward /// type Resource = Self; /// type Installer = (); /// fn create(&self, _: &'static str) -> Result<Self::Resource, AnyError> { /// Ok(self.clone()) /// } /// } /// } /// /// /// An application. /// #[derive(Default, Deserialize, Serialize, StructDoc)] /// struct AppConfig { /// #[serde(flatten)] /// msg: MsgCfg, /// } /// /// impl AppConfig { /// fn msg(&self) -> &MsgCfg { /// &self.msg /// } /// } /// /// fn main() { /// Spirit::<Empty, AppConfig>::new() /// // Will install and possibly cancel and replace the future if the config changes. /// .with( /// Pipeline::new("Msg") /// .extract_cfg(AppConfig::msg) /// // This thing turns it into the future and sets how to install it. /// .transform(ToFutureUnconfigured(MsgCfg::run)) /// ) /// // Just an empty body here. /// .run(|spirit| { /// // Usually, one would terminate by CTRL+C, but we terminate from here to make sure /// // the example finishes. /// spirit.terminate(); /// Ok(()) /// }) /// } /// ``` pub struct ToFutureUnconfigured<F>(pub F); impl<F, Fut, R, II, SF> Transformation<R, II, SF> for ToFutureUnconfigured<F> where F: FnMut(R) -> Fut, Fut: Future<Output = ()> + 'static, { type OutputResource = Fut; type OutputInstaller = FutureInstaller; fn installer(&mut self, _: II, _: &str) -> FutureInstaller { FutureInstaller::default() } fn transform(&mut self, r: R, _: &SF, name: &str) -> Result<Fut, AnyError> { trace!("Wrapping {} into a future", name); Ok((self.0)(r)) } } /// A plumbing type for [`PerConnection`]. #[cfg(feature = "net")] pub struct Acceptor<A, F, C> { accept: A, f: F, cfg: C, name: &'static str, } #[cfg(feature = "net")] impl<A, F, C, Fut> Future for Acceptor<A, F, C> where A: Accept, F: FnMut(A::Connection, &C) -> Fut, Fut: Future<Output = ()> + Send + 'static, Self: Unpin, { type Output = (); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<()> { loop { match self.accept.poll_accept(ctx) { Poll::Ready(Err(e)) => { error!("Giving up acceptor {}: {}", self.name, e); return Poll::Ready(()); } Poll::Ready(Ok(conn)) => { trace!("Got a new connection on {}", self.name); // Poking the borrow checker around the un-pinning, otherwise it is unhappy let me: &mut Self = &mut self; let fut = (me.f)(conn, &me.cfg); tokio::spawn(fut); } Poll::Pending => return Poll::Pending, } } } } /// A [`Transformation`] that creates a new future from each accepted connection. /// /// For each connection yielded from the acceptor (passed in as the input resource), the function /// is used to create a new future. That new future is spawned into [`tokio`] runtime. Note that /// even when this resource gets uninstalled, the spawned futures from the connections are left /// running. /// /// In case the acceptor yields an error, it is dropped and not used any more. Note that some /// primitives, like [`TcpListenWithLimits`][crate::net::TcpListenWithLimits] handles errors /// internally and never returns any, therefore it might be a good candidate for long-running /// servers. /// /// # Examples #[cfg(feature = "net")] pub struct PerConnection<F>(pub F); #[cfg(feature = "net")] impl<F, Fut, A, II, SF> Transformation<A, II, SF> for PerConnection<F> where A: Accept, F: Clone + FnMut(A::Connection, &SF) -> Fut + 'static, Fut: Future<Output = ()> + 'static, SF: Clone + 'static, { type OutputResource = Acceptor<A, F, SF>; type OutputInstaller = FutureInstaller; fn installer(&mut self, _: II, _: &str) -> FutureInstaller { FutureInstaller::default() } fn transform( &mut self, accept: A, cfg: &SF, name: &'static str, ) -> Result<Acceptor<A, F, SF>, AnyError> { trace!("Creating new acceptor for {}", name); let f = self.0.clone(); let cfg = cfg.clone(); Ok(Acceptor { accept, f, cfg, name, }) } } /// A more flexible (and mind-bending) version of [`PerConnection`]. /// /// The [`PerConnection`] applies a closure to each accepted connection. To share the closure /// between all listeners, the closure is cloned. /// /// This version's closure is higher-level closure. It is called once for each listener to produce /// a per-listener closure to handle its connections. In effect, it allows for custom „cloning“ of /// the closure. #[cfg(feature = "net")] pub struct PerConnectionInit<F>(pub F); #[cfg(feature = "net")] impl<FA, FC, Fut, A, II, SF> Transformation<A, II, SF> for PerConnectionInit<FA> where A: Accept, FA: FnMut(&A, &SF) -> FC + 'static, FC: FnMut(A::Connection, &SF) -> Fut + 'static, Fut: Future<Output = ()> + 'static, SF: Clone + 'static, { type OutputResource = Acceptor<A, FC, SF>; type OutputInstaller = FutureInstaller; fn installer(&mut self, _: II, _: &str) -> FutureInstaller { FutureInstaller::default() } fn transform( &mut self, accept: A, cfg: &SF, name: &'static str, ) -> Result<Acceptor<A, FC, SF>, AnyError> { trace!("Creating new acceptor for {}", name); let f = (self.0)(&accept, cfg); let cfg = cfg.clone(); Ok(Acceptor { accept, f, cfg, name, }) } }