use crate::{error::Error as CliError, Result, SubstrateCli};
use chrono::prelude::*;
use futures::{future, future::FutureExt, pin_mut, select, Future};
use log::info;
use sc_service::{Configuration, Error as ServiceError, TaskManager};
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::{marker::PhantomData, time::Duration};
#[cfg(target_family = "unix")]
async fn main<F, E>(func: F) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::FusedFuture,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
use tokio::signal::unix::{signal, SignalKind};
let mut stream_int = signal(SignalKind::interrupt()).map_err(ServiceError::Io)?;
let mut stream_term = signal(SignalKind::terminate()).map_err(ServiceError::Io)?;
let t1 = stream_int.recv().fuse();
let t2 = stream_term.recv().fuse();
let t3 = func;
pin_mut!(t1, t2, t3);
select! {
_ = t1 => {},
_ = t2 => {},
res = t3 => res?,
}
Ok(())
}
#[cfg(not(unix))]
async fn main<F, E>(func: F) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::FusedFuture,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
use tokio::signal::ctrl_c;
let t1 = ctrl_c().fuse();
let t2 = func;
pin_mut!(t1, t2);
select! {
_ = t1 => {},
res = t2 => res?,
}
Ok(())
}
pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| {
TOKIO_THREADS_ALIVE.inc();
TOKIO_THREADS_TOTAL.inc();
})
.on_thread_stop(|| {
TOKIO_THREADS_ALIVE.dec();
})
.enable_all()
.build()
}
fn run_until_exit<F, E>(
tokio_runtime: tokio::runtime::Runtime,
future: F,
task_manager: TaskManager,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::Future,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
let f = future.fuse();
pin_mut!(f);
tokio_runtime.block_on(main(f))?;
drop(task_manager);
Ok(())
}
pub struct Runner<C: SubstrateCli> {
config: Configuration,
tokio_runtime: tokio::runtime::Runtime,
phantom: PhantomData<C>,
}
impl<C: SubstrateCli> Runner<C> {
pub fn new(config: Configuration, tokio_runtime: tokio::runtime::Runtime) -> Result<Runner<C>> {
Ok(Runner { config, tokio_runtime, phantom: PhantomData })
}
fn print_node_infos(&self) {
print_node_infos::<C>(self.config())
}
pub fn run_node_until_exit<F, E>(
self,
initialize: impl FnOnce(Configuration) -> F,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<TaskManager, E>>,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
self.print_node_infos();
let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
let task_registry = task_manager.into_task_registry();
let shutdown_timeout = Duration::from_secs(60);
self.tokio_runtime.shutdown_timeout(shutdown_timeout);
let running_tasks = task_registry.running_tasks();
if !running_tasks.is_empty() {
log::error!("Detected running(potentially stalled) tasks on shutdown:");
running_tasks.iter().for_each(|(task, count)| {
let instances_desc =
if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
if task.is_default_group() {
log::error!(
"Task \"{}\" was still running {}after waiting {} seconds to finish.",
task.name,
instances_desc,
shutdown_timeout.as_secs(),
);
} else {
log::error!(
"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
task.name,
task.group,
instances_desc,
shutdown_timeout.as_secs(),
);
}
});
}
res.map_err(Into::into)
}
pub fn sync_run<E>(
self,
runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
) -> std::result::Result<(), E>
where
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
runner(self.config)
}
pub fn async_run<F, E>(
self,
runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>>,
E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
{
let (future, task_manager) = runner(self.config)?;
run_until_exit::<_, E>(self.tokio_runtime, future, task_manager)
}
pub fn config(&self) -> &Configuration {
&self.config
}
pub fn config_mut(&mut self) -> &mut Configuration {
&mut self.config
}
}
pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
info!("{}", C::impl_name());
info!("✌️ version {}", C::impl_version());
info!("❤️ by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
info!("📋 Chain specification: {}", config.chain_spec.name());
info!("🏷 Node name: {}", config.network.node_name);
info!("👤 Role: {}", config.display_role());
info!(
"💾 Database: {} at {}",
config.database,
config
.database
.path()
.map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
);
info!("⛓ Native runtime: {}", C::native_runtime_version(&config.chain_spec));
}
#[cfg(test)]
mod tests {
use std::{
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use sc_network::config::NetworkConfiguration;
use sc_service::{Arc, ChainType, GenericChainSpec, NoExtension};
use sp_runtime::create_runtime_str;
use sp_version::create_apis_vec;
use super::*;
struct Cli;
impl SubstrateCli for Cli {
fn author() -> String {
"test".into()
}
fn impl_name() -> String {
"yep".into()
}
fn impl_version() -> String {
"version".into()
}
fn description() -> String {
"desc".into()
}
fn support_url() -> String {
"no.pe".into()
}
fn copyright_start_year() -> i32 {
2042
}
fn load_spec(
&self,
_: &str,
) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
Err("nope".into())
}
fn native_runtime_version(
_: &Box<dyn sc_service::ChainSpec>,
) -> &'static sp_version::RuntimeVersion {
const VERSION: sp_version::RuntimeVersion = sp_version::RuntimeVersion {
spec_name: create_runtime_str!("spec"),
impl_name: create_runtime_str!("name"),
authoring_version: 0,
spec_version: 0,
impl_version: 0,
apis: create_apis_vec!([]),
transaction_version: 2,
state_version: 0,
};
&VERSION
}
}
fn create_runner() -> Runner<Cli> {
let runtime = build_runtime().unwrap();
let runner = Runner::new(
Configuration {
impl_name: "spec".into(),
impl_version: "3".into(),
role: sc_service::Role::Authority,
tokio_handle: runtime.handle().clone(),
transaction_pool: Default::default(),
network: NetworkConfiguration::new_memory(),
keystore: sc_service::config::KeystoreConfig::InMemory,
keystore_remote: None,
database: sc_client_db::DatabaseSource::ParityDb { path: PathBuf::from("db") },
trie_cache_maximum_size: None,
state_pruning: None,
blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
chain_spec: Box::new(GenericChainSpec::from_genesis(
"test",
"test_id",
ChainType::Development,
|| unimplemented!("Not required in tests"),
Vec::new(),
None,
None,
None,
None,
NoExtension::None,
)),
wasm_method: Default::default(),
wasm_runtime_overrides: None,
execution_strategies: Default::default(),
rpc_http: None,
rpc_ws: None,
rpc_ipc: None,
rpc_ws_max_connections: None,
rpc_cors: None,
rpc_methods: Default::default(),
rpc_max_payload: None,
rpc_max_request_size: None,
rpc_max_response_size: None,
rpc_id_provider: None,
rpc_max_subs_per_conn: None,
ws_max_out_buffer_capacity: None,
prometheus_config: None,
telemetry_endpoints: None,
default_heap_pages: None,
offchain_worker: Default::default(),
force_authoring: false,
disable_grandpa: false,
dev_key_seed: None,
tracing_targets: None,
tracing_receiver: Default::default(),
max_runtime_instances: 8,
announce_block: true,
base_path: None,
informant_output_format: Default::default(),
runtime_cache_size: 2,
},
runtime,
)
.unwrap();
runner
}
#[test]
fn ensure_run_until_exit_informs_tasks_to_end() {
let runner = create_runner();
let counter = Arc::new(AtomicU64::new(0));
let counter2 = counter.clone();
runner
.run_node_until_exit(move |cfg| async move {
let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
counter2.fetch_add(1, Ordering::Relaxed);
futures_timer::Delay::new(Duration::from_millis(50)).await;
}
});
task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
let _ = receiver.await;
});
Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
let count = counter.load(Ordering::Relaxed);
assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
}
fn run_test_in_another_process(
test_name: &str,
test_body: impl FnOnce(),
) -> Option<std::process::Output> {
if std::env::var("RUN_FORKED_TEST").is_ok() {
test_body();
None
} else {
let output = std::process::Command::new(std::env::current_exe().unwrap())
.arg(test_name)
.env("RUN_FORKED_TEST", "1")
.output()
.unwrap();
assert!(output.status.success());
Some(output)
}
}
#[test]
fn ensure_run_until_exit_is_not_blocking_indefinitely() {
let output = run_test_in_another_process(
"ensure_run_until_exit_is_not_blocking_indefinitely",
|| {
sp_tracing::try_init_simple();
let runner = create_runner();
runner
.run_node_until_exit(move |cfg| async move {
let task_manager =
TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
std::thread::sleep(Duration::from_secs(30));
}
});
task_manager.spawn_essential_handle().spawn_blocking(
"test2",
None,
async {
let _ = receiver.await;
},
);
Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
},
);
let Some(output) = output else { return } ;
let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
assert!(
stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
);
assert!(!stderr
.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
}
}