1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::future::Future;
use err_context::AnyError;
use log::trace;
use serde::de::DeserializeOwned;
use spirit::extension::Extensible;
use spirit::fragment::Installer;
use structopt::StructOpt;
use tokio::runtime::Handle;
use tokio::select;
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::runtime::Tokio;
pub struct RemoteDrop {
name: &'static str,
request_drop: Option<Sender<()>>,
terminated: Option<Receiver<()>>,
}
impl Drop for RemoteDrop {
fn drop(&mut self) {
trace!("Requesting remote drop on {}", self.name);
let _ = self.request_drop.take().unwrap().send(());
let mut terminated = self.terminated.take().unwrap();
if terminated.try_recv().is_err() {
let _ = Handle::current().block_on(terminated);
}
trace!("Remote drop done on {}", self.name);
}
}
struct SendOnDrop(Option<Sender<()>>);
impl Drop for SendOnDrop {
fn drop(&mut self) {
let _ = self.0.take().unwrap().send(());
}
}
#[derive(Copy, Clone, Debug, Default)]
pub struct FutureInstaller;
impl<F, O, C> Installer<F, O, C> for FutureInstaller
where
F: Future<Output = ()> + Send + 'static,
{
type UninstallHandle = RemoteDrop;
fn install(&mut self, fut: F, name: &'static str) -> RemoteDrop {
let (request_send, request_recv) = oneshot::channel();
let (confirm_send, confirm_recv) = oneshot::channel();
let cancellable_future = async move {
let _guard = SendOnDrop(Some(confirm_send));
select! {
_ = request_recv => trace!("Future {} requested to terminate", name),
_ = fut => trace!("Future {} terminated on its own", name),
};
};
trace!("Installing future {}", name);
tokio::spawn(cancellable_future);
RemoteDrop {
name,
request_drop: Some(request_send),
terminated: Some(confirm_recv),
}
}
fn init<E>(&mut self, ext: E, _name: &'static str) -> Result<E, AnyError>
where
E: Extensible<Opts = O, Config = C, Ok = E>,
E::Config: DeserializeOwned + Send + Sync + 'static,
E::Opts: StructOpt + Send + Sync + 'static,
{
ext.with_singleton(Tokio::Default)
}
}