scuffle_bootstrap/
global.rs

1//! Global state for the application.
2//!
3//! # [`Global`] vs. [`GlobalWithoutConfig`]
4//!
5//! [`Global`] has a set of functions that are called at different stages of the
6//! application lifecycle. To use [`Global`], your application is expected to
7//! have a config type implementing [`ConfigParser`]. If your application does
8//! not have a config, consider using the [`GlobalWithoutConfig`] trait which is
9//! a simplified version of [`Global`].
10
11use std::sync::Arc;
12
13use crate::config::{ConfigParser, EmptyConfig};
14
15fn default_runtime_builder() -> tokio::runtime::Builder {
16    let worker_threads = std::env::var("TOKIO_WORKER_THREADS")
17        .unwrap_or_default()
18        .parse::<usize>()
19        .ok()
20        .or_else(|| std::thread::available_parallelism().ok().map(|p| p.get()));
21
22    let mut builder = if let Some(1) = worker_threads {
23        tokio::runtime::Builder::new_current_thread()
24    } else {
25        tokio::runtime::Builder::new_multi_thread()
26    };
27
28    if let Some(worker_threads) = worker_threads {
29        builder.worker_threads(worker_threads);
30    }
31
32    if let Ok(max_blocking_threads) = std::env::var("TOKIO_MAX_BLOCKING_THREADS")
33        .unwrap_or_default()
34        .parse::<usize>()
35    {
36        builder.max_blocking_threads(max_blocking_threads);
37    }
38
39    if !std::env::var("TOKIO_DISABLE_TIME")
40        .unwrap_or_default()
41        .parse::<bool>()
42        .ok()
43        .unwrap_or(false)
44    {
45        builder.enable_time();
46    }
47
48    if !std::env::var("TOKIO_DISABLE_IO")
49        .unwrap_or_default()
50        .parse::<bool>()
51        .ok()
52        .unwrap_or(false)
53    {
54        builder.enable_io();
55    }
56
57    if let Ok(thread_stack_size) = std::env::var("TOKIO_THREAD_STACK_SIZE").unwrap_or_default().parse::<usize>() {
58        builder.thread_stack_size(thread_stack_size);
59    }
60
61    if let Ok(global_queue_interval) = std::env::var("TOKIO_GLOBAL_QUEUE_INTERVAL")
62        .unwrap_or_default()
63        .parse::<u32>()
64    {
65        builder.global_queue_interval(global_queue_interval);
66    }
67
68    if let Ok(event_interval) = std::env::var("TOKIO_EVENT_INTERVAL").unwrap_or_default().parse::<u32>() {
69        builder.event_interval(event_interval);
70    }
71
72    if let Ok(max_io_events_per_tick) = std::env::var("TOKIO_MAX_IO_EVENTS_PER_TICK")
73        .unwrap_or_default()
74        .parse::<usize>()
75    {
76        builder.max_io_events_per_tick(max_io_events_per_tick);
77    }
78
79    builder
80}
81
82/// This trait is implemented for the global type of your application.
83/// It is intended to be used to store any global state of your application.
84/// When using the [`main!`](crate::main) macro, one instance of this type will
85/// be made available to all services.
86///
87/// Using this trait requires a config type implementing [`ConfigParser`].
88/// If your application does not have a config, consider using the
89/// [`GlobalWithoutConfig`] trait.
90///
91/// # See Also
92///
93/// - [`GlobalWithoutConfig`]
94/// - [`Service`](crate::Service)
95/// - [`main`](crate::main)
96pub trait Global: Send + Sync + 'static {
97    type Config: ConfigParser + Send + 'static;
98
99    /// Pre-initialization.
100    ///
101    /// Called before initializing the tokio runtime and loading the config.
102    /// Returning an error from this function will cause the process to
103    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
104    #[inline(always)]
105    fn pre_init() -> anyhow::Result<()> {
106        Ok(())
107    }
108
109    /// Builds the tokio runtime for the process.
110    ///
111    /// If not overridden, a default runtime builder is used to build the
112    /// runtime. It uses the following environment variables:
113    /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
114    ///   current thread runtime is used.
115    ///
116    ///   See [`tokio::runtime::Builder::worker_threads`] for details.
117    /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
118    ///
119    ///   See [`tokio::runtime::Builder::max_blocking_threads`] for details.
120    /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
121    ///
122    ///   See [`tokio::runtime::Builder::enable_time`] for details.
123    /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
124    ///
125    ///   See [`tokio::runtime::Builder::enable_io`] for details.
126    /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
127    ///
128    ///   See [`tokio::runtime::Builder::thread_stack_size`] for details.
129    /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
130    ///
131    ///   See [`tokio::runtime::Builder::global_queue_interval`] for details.
132    /// - `TOKIO_EVENT_INTERVAL`: Event interval.
133    ///
134    ///   See [`tokio::runtime::Builder::event_interval`] for details.
135    /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
136    ///
137    ///   See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
138    #[inline(always)]
139    fn tokio_runtime() -> tokio::runtime::Runtime {
140        default_runtime_builder().build().expect("runtime build")
141    }
142
143    /// Initialize the global.
144    ///
145    /// Called to initialize the global.
146    /// Returning an error from this function will cause the process to
147    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
148    fn init(config: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
149
150    /// Called right before all services start.
151    ///
152    /// Returning an error from this function will prevent any service from
153    /// starting and [`on_exit`](Global::on_exit) will be called with the result
154    /// of this function.
155    #[inline(always)]
156    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
157        std::future::ready(Ok(()))
158    }
159
160    /// Called after a service exits.
161    ///
162    /// `name` is the name of the service that exited and `result` is the result
163    /// the service exited with. Returning an error from this function will
164    /// stop all currently running services and [`on_exit`](Global::on_exit)
165    /// will be called with the result of this function.
166    #[inline(always)]
167    fn on_service_exit(
168        self: &Arc<Self>,
169        name: &'static str,
170        result: anyhow::Result<()>,
171    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
172        let _ = name;
173        std::future::ready(result)
174    }
175
176    /// Called after the shutdown is complete, right before exiting the
177    /// process.
178    ///
179    /// `result` is [`Err`](anyhow::Result) when the process exits due to an
180    /// error in one of the services or handler functions, otherwise `Ok(())`.
181    #[inline(always)]
182    fn on_exit(
183        self: &Arc<Self>,
184        result: anyhow::Result<()>,
185    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
186        std::future::ready(result)
187    }
188}
189
190/// Simplified version of [`Global`].
191///
192/// Implementing this trait will automatically implement [`Global`] for your
193/// type. This trait is intended to be used when you don't need a config for
194/// your global.
195///
196/// Refer to [`Global`] for details.
197pub trait GlobalWithoutConfig: Send + Sync + 'static {
198    /// Builds the tokio runtime for the process.
199    ///
200    /// If not overridden, a default runtime builder is used to build the
201    /// runtime. It uses the following environment variables:
202    /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
203    ///   current thread runtime is used.
204    ///
205    ///   See [`tokio::runtime::Builder::worker_threads`] for details.
206    /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
207    ///
208    ///   See [`tokio::runtime::Builder::max_blocking_threads`] for details.
209    /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
210    ///
211    ///   See [`tokio::runtime::Builder::enable_time`] for details.
212    /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
213    ///
214    ///   See [`tokio::runtime::Builder::enable_io`] for details.
215    /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
216    ///
217    ///   See [`tokio::runtime::Builder::thread_stack_size`] for details.
218    /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
219    ///
220    ///   See [`tokio::runtime::Builder::global_queue_interval`] for details.
221    /// - `TOKIO_EVENT_INTERVAL`: Event interval.
222    ///
223    ///   See [`tokio::runtime::Builder::event_interval`] for details.
224    /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
225    ///
226    ///   See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
227    #[inline(always)]
228    fn tokio_runtime() -> tokio::runtime::Runtime {
229        default_runtime_builder().build().expect("runtime build")
230    }
231
232    /// Initialize the global.
233    ///
234    /// Called to initialize the global.
235    /// Returning an error from this function will cause the process to
236    /// immediately exit without calling [`on_exit`](Global::on_exit) first.
237    fn init() -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
238
239    /// Called right before all services start.
240    ///
241    /// Returning an error from this function will prevent any service from
242    /// starting and [`on_exit`](Global::on_exit) will be called with the result
243    /// of this function.
244    #[inline(always)]
245    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
246        std::future::ready(Ok(()))
247    }
248
249    /// Called after a service exits.
250    ///
251    /// `name` is the name of the service that exited and `result` is the result
252    /// the service exited with. Returning an error from this function will
253    /// stop all currently running services and [`on_exit`](Global::on_exit)
254    /// will be called with the result of this function.
255    #[inline(always)]
256    fn on_service_exit(
257        self: &Arc<Self>,
258        name: &'static str,
259        result: anyhow::Result<()>,
260    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
261        let _ = name;
262        std::future::ready(result)
263    }
264
265    /// Called after the shutdown is complete, right before exiting the
266    /// process.
267    ///
268    /// `result` is [`Err`](anyhow::Result) when the process exits due to an
269    /// error in one of the services or handler functions, otherwise `Ok(())`.
270    #[inline(always)]
271    fn on_exit(
272        self: &Arc<Self>,
273        result: anyhow::Result<()>,
274    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
275        std::future::ready(result)
276    }
277}
278
279impl<T: GlobalWithoutConfig> Global for T {
280    type Config = EmptyConfig;
281
282    #[inline(always)]
283    fn tokio_runtime() -> tokio::runtime::Runtime {
284        <T as GlobalWithoutConfig>::tokio_runtime()
285    }
286
287    #[inline(always)]
288    fn init(_: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send {
289        <T as GlobalWithoutConfig>::init()
290    }
291
292    #[inline(always)]
293    fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
294        <T as GlobalWithoutConfig>::on_services_start(self)
295    }
296
297    #[inline(always)]
298    fn on_service_exit(
299        self: &Arc<Self>,
300        name: &'static str,
301        result: anyhow::Result<()>,
302    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
303        <T as GlobalWithoutConfig>::on_service_exit(self, name, result)
304    }
305
306    #[inline(always)]
307    fn on_exit(
308        self: &Arc<Self>,
309        result: anyhow::Result<()>,
310    ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
311        <T as GlobalWithoutConfig>::on_exit(self, result)
312    }
313}
314
315#[cfg_attr(all(test, coverage_nightly), coverage(off))]
316#[cfg(test)]
317mod tests {
318    use std::sync::Arc;
319    use std::thread;
320
321    use super::{Global, GlobalWithoutConfig};
322    use crate::EmptyConfig;
323
324    struct TestGlobal;
325
326    impl Global for TestGlobal {
327        type Config = ();
328
329        async fn init(_config: Self::Config) -> anyhow::Result<std::sync::Arc<Self>> {
330            Ok(Arc::new(Self))
331        }
332    }
333
334    #[tokio::test]
335    async fn default_global() {
336        thread::spawn(|| {
337            // To get the coverage
338            TestGlobal::tokio_runtime();
339        });
340
341        assert!(matches!(TestGlobal::pre_init(), Ok(())));
342        let global = TestGlobal::init(()).await.unwrap();
343        assert!(matches!(global.on_services_start().await, Ok(())));
344
345        assert!(matches!(global.on_exit(Ok(())).await, Ok(())));
346        assert!(global.on_exit(Err(anyhow::anyhow!("error"))).await.is_err());
347
348        assert!(matches!(global.on_service_exit("test", Ok(())).await, Ok(())));
349        assert!(global.on_service_exit("test", Err(anyhow::anyhow!("error"))).await.is_err());
350    }
351
352    struct TestGlobalWithoutConfig;
353
354    impl GlobalWithoutConfig for TestGlobalWithoutConfig {
355        async fn init() -> anyhow::Result<std::sync::Arc<Self>> {
356            Ok(Arc::new(Self))
357        }
358    }
359
360    #[tokio::test]
361    async fn default_global_no_config() {
362        thread::spawn(|| {
363            // To get the coverage
364            <TestGlobalWithoutConfig as Global>::tokio_runtime();
365        });
366
367        assert!(matches!(TestGlobalWithoutConfig::pre_init(), Ok(())));
368        <TestGlobalWithoutConfig as Global>::init(EmptyConfig).await.unwrap();
369        let global = <TestGlobalWithoutConfig as GlobalWithoutConfig>::init().await.unwrap();
370        assert!(matches!(Global::on_services_start(&global).await, Ok(())));
371
372        assert!(matches!(Global::on_exit(&global, Ok(())).await, Ok(())));
373        assert!(Global::on_exit(&global, Err(anyhow::anyhow!("error"))).await.is_err());
374
375        assert!(matches!(Global::on_service_exit(&global, "test", Ok(())).await, Ok(())));
376        assert!(Global::on_service_exit(&global, "test", Err(anyhow::anyhow!("error")))
377            .await
378            .is_err());
379    }
380}