use super::database::{DBTransaction, Database};
use kvdb::{DBKey, DBOp};
use parity_scale_codec::{Decode, Encode};
pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize};
use polkadot_primitives::v2::{BlockNumber, Hash, SessionIndex, SessionInfo};
use std::sync::Arc;
use futures::channel::oneshot;
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest},
overseer,
};
const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6);
const LOG_TARGET: &str = "parachain::rolling-session-window";
const STORED_ROLLING_SESSION_WINDOW: &[u8] = b"Rolling_session_window";
#[derive(Debug, Clone, thiserror::Error)]
pub enum SessionsUnavailableReason {
#[error(transparent)]
RuntimeApiUnavailable(#[from] oneshot::Canceled),
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error("Missing session index {0:?}")]
Missing(SessionIndex),
#[error("Missing last finalized block number")]
MissingLastFinalizedBlock,
#[error("Missing last finalized block hash")]
MissingLastFinalizedBlockHash(BlockNumber),
}
#[derive(Debug, Clone)]
pub struct SessionsUnavailableInfo {
pub window_start: SessionIndex,
pub window_end: SessionIndex,
pub block_hash: Hash,
}
#[derive(Debug, thiserror::Error, Clone)]
#[error("Sessions unavailable: {kind:?}, info: {info:?}")]
pub struct SessionsUnavailable {
#[source]
kind: SessionsUnavailableReason,
info: Option<SessionsUnavailableInfo>,
}
#[derive(Debug, PartialEq, Clone)]
pub enum SessionWindowUpdate {
Advanced {
prev_window_start: SessionIndex,
prev_window_end: SessionIndex,
new_window_start: SessionIndex,
new_window_end: SessionIndex,
},
Unchanged,
}
#[derive(Clone)]
pub struct DatabaseParams {
pub db: Arc<dyn Database>,
pub db_column: u32,
}
pub struct RollingSessionWindow {
earliest_session: SessionIndex,
session_info: Vec<SessionInfo>,
window_size: SessionWindowSize,
db_params: Option<DatabaseParams>,
}
#[derive(Encode, Decode, Default)]
struct StoredWindow {
earliest_session: SessionIndex,
session_info: Vec<SessionInfo>,
}
impl RollingSessionWindow {
pub async fn new<Sender>(
mut sender: Sender,
block_hash: Hash,
db_params: DatabaseParams,
) -> Result<Self, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(&mut sender, block_hash).await?;
let earliest_non_finalized_block_session =
Self::earliest_non_finalized_block_session(&mut sender).await?;
let on_chain_window_start = std::cmp::min(
session_index.saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
earliest_non_finalized_block_session,
);
let maybe_stored_window = Self::db_load(db_params.clone());
let (mut window_start, stored_sessions) =
if let Some(mut stored_window) = maybe_stored_window {
if earliest_non_finalized_block_session >
stored_window.earliest_session + stored_window.session_info.len() as u32
{
stored_window.session_info.clear();
}
let window_start = if stored_window.session_info.len() > 0 {
stored_window.earliest_session
} else {
on_chain_window_start
};
(window_start, stored_window.session_info)
} else {
(on_chain_window_start, Vec::new())
};
let sessions_missing_count = session_index
.saturating_sub(window_start)
.saturating_add(1)
.saturating_sub(stored_sessions.len() as u32);
let sessions = if sessions_missing_count > 0 {
match extend_sessions_from_chain_state(
stored_sessions,
&mut sender,
block_hash,
&mut window_start,
session_index,
)
.await
{
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start,
window_end: session_index,
block_hash,
}),
}),
Ok(sessions) => Ok(sessions),
}?
} else {
Vec::new()
};
Ok(Self {
earliest_session: window_start,
session_info: sessions,
window_size: SESSION_WINDOW_SIZE,
db_params: Some(db_params),
})
}
fn db_load(db_params: DatabaseParams) -> Option<StoredWindow> {
match db_params.db.get(db_params.db_column, STORED_ROLLING_SESSION_WINDOW).ok()? {
None => None,
Some(raw) => {
let maybe_decoded = StoredWindow::decode(&mut &raw[..]).map(Some);
match maybe_decoded {
Ok(decoded) => decoded,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
"Failed decoding db entry; will start with onchain session infos and self-heal DB entry on next update."
);
None
},
}
},
}
}
fn db_save(&mut self, stored_window: StoredWindow) {
if let Some(db_params) = self.db_params.as_ref() {
match db_params.db.write(DBTransaction {
ops: vec![DBOp::Insert {
col: db_params.db_column,
key: DBKey::from_slice(STORED_ROLLING_SESSION_WINDOW),
value: stored_window.encode(),
}],
}) {
Ok(_) => {},
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "Failed writing db entry");
},
}
}
}
pub fn with_session_info(
earliest_session: SessionIndex,
session_info: Vec<SessionInfo>,
) -> Self {
RollingSessionWindow {
earliest_session,
session_info,
window_size: SESSION_WINDOW_SIZE,
db_params: None,
}
}
pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
if index < self.earliest_session {
None
} else {
self.session_info.get((index - self.earliest_session) as usize)
}
}
pub fn earliest_session(&self) -> SessionIndex {
self.earliest_session
}
pub fn latest_session(&self) -> SessionIndex {
self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1)
}
pub fn contains(&self, session_index: SessionIndex) -> bool {
session_index >= self.earliest_session() && session_index <= self.latest_session()
}
async fn earliest_non_finalized_block_session<Sender>(
sender: &mut Sender,
) -> Result<u32, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let last_finalized_height = {
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
match rx.await {
Ok(Ok(number)) => number,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
"Failed fetching last finalized block number"
);
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlock,
info: None,
})
},
}
};
let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx))
.await;
let last_finalized_hash_parent = match rx.await {
Ok(Ok(maybe_hash)) => maybe_hash,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash");
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
last_finalized_height,
),
info: None,
})
},
};
if let Some(last_finalized_hash_parent) = last_finalized_hash_parent {
let session =
match get_session_index_for_child(sender, last_finalized_hash_parent).await {
Ok(session_index) => session_index,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
?last_finalized_hash_parent,
"Failed fetching session index"
);
return Err(err)
},
};
Ok(session)
} else {
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
last_finalized_height,
),
info: None,
})
}
}
pub async fn cache_session_info_for_head<Sender>(
&mut self,
sender: &mut Sender,
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(sender, block_hash).await?;
let latest = self.latest_session();
if session_index <= latest {
return Ok(SessionWindowUpdate::Unchanged)
}
let earliest_non_finalized_block_session =
Self::earliest_non_finalized_block_session(sender).await?;
let old_window_start = self.earliest_session;
let old_window_end = latest;
let window_start = std::cmp::min(
session_index.saturating_sub(self.window_size.get() - 1),
earliest_non_finalized_block_session,
);
let mut window_start = std::cmp::max(window_start, self.earliest_session);
let mut sessions = self.session_info.clone();
let sessions_out_of_window = window_start.saturating_sub(old_window_start) as usize;
let sessions = if sessions_out_of_window < sessions.len() {
sessions.split_off((window_start as usize).saturating_sub(old_window_start as usize))
} else {
Vec::new()
};
match extend_sessions_from_chain_state(
sessions,
sender,
block_hash,
&mut window_start,
session_index,
)
.await
{
Err(kind) => Err(SessionsUnavailable {
kind,
info: Some(SessionsUnavailableInfo {
window_start,
window_end: session_index,
block_hash,
}),
}),
Ok(s) => {
let update = SessionWindowUpdate::Advanced {
prev_window_start: old_window_start,
prev_window_end: old_window_end,
new_window_start: window_start,
new_window_end: session_index,
};
self.session_info = s;
let new_earliest = std::cmp::max(window_start, old_window_start);
self.earliest_session = new_earliest;
self.db_save(StoredWindow {
earliest_session: self.earliest_session,
session_info: self.session_info.clone(),
});
Ok(update)
},
}
}
}
async fn get_session_index_for_child(
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
block_hash: Hash,
) -> Result<SessionIndex, SessionsUnavailable> {
let (s_tx, s_rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
match s_rx.await {
Ok(Ok(s)) => Ok(s),
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::RuntimeApi(e),
info: None,
}),
Err(e) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::RuntimeApiUnavailable(e),
info: None,
}),
}
}
async fn extend_sessions_from_chain_state(
stored_sessions: Vec<SessionInfo>,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
block_hash: Hash,
window_start: &mut SessionIndex,
end_inclusive: SessionIndex,
) -> Result<Vec<SessionInfo>, SessionsUnavailableReason> {
let mut sessions = stored_sessions;
let mut allow_failure = sessions.is_empty();
let start = *window_start + sessions.len() as u32;
for i in start..=end_inclusive {
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionInfo(i, tx),
))
.await;
match rx.await {
Ok(Ok(Some(session_info))) => {
allow_failure = false;
sessions.push(session_info);
},
Ok(Ok(None)) if !allow_failure => return Err(SessionsUnavailableReason::Missing(i)),
Ok(Ok(None)) => {
*window_start += 1;
gum::debug!(
target: LOG_TARGET,
session = ?i,
"Session info missing from runtime."
);
},
Ok(Err(e)) if !allow_failure => return Err(SessionsUnavailableReason::RuntimeApi(e)),
Err(canceled) if !allow_failure =>
return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)),
Ok(Err(err)) => {
*window_start += 1;
gum::debug!(
target: LOG_TARGET,
session = ?i,
?err,
"Error while fetching session information."
);
},
Err(err) => {
*window_start += 1;
gum::debug!(
target: LOG_TARGET,
session = ?i,
?err,
"Channel error while fetching session information."
);
},
};
}
Ok(sessions)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::kvdb_impl::DbAdapter;
use assert_matches::assert_matches;
use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityRecoveryMessage},
SubsystemContext,
};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_primitives::v2::Header;
use sp_core::testing::TaskExecutor;
const SESSION_DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
fn dummy_db_params() -> DatabaseParams {
let db = kvdb_memorydb::create(NUM_COLUMNS);
let db = DbAdapter::new(db, &[]);
let db: Arc<dyn Database> = Arc::new(db);
DatabaseParams { db, db_column: SESSION_DATA_COL }
}
fn dummy_session_info(index: SessionIndex) -> SessionInfo {
SessionInfo {
validators: Default::default(),
discovery_keys: Vec::new(),
assignment_keys: Vec::new(),
validator_groups: Default::default(),
n_cores: index as _,
zeroth_delay_tranche_width: index as _,
relay_vrf_modulo_samples: index as _,
n_delay_tranches: index as _,
no_show_slots: index as _,
needed_approvals: index as _,
active_validator_indices: Vec::new(),
dispute_period: 6,
random_seed: [0u8; 32],
}
}
fn cache_session_info_test(
expected_start_session: SessionIndex,
session: SessionIndex,
window: Option<RollingSessionWindow>,
expect_requests_from: SessionIndex,
db_params: Option<DatabaseParams>,
) -> RollingSessionWindow {
let db_params = db_params.unwrap_or(dummy_db_params());
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<AvailabilityRecoveryMessage, _>(pool.clone());
let hash = header.hash();
let sender = ctx.sender();
let test_fut = {
Box::pin(async move {
let window = match window {
None =>
RollingSessionWindow::new(sender.clone(), hash, db_params).await.unwrap(),
Some(mut window) => {
window.cache_session_info_for_head(sender, hash).await.unwrap();
window
},
};
assert_eq!(window.earliest_session, expected_start_session);
assert_eq!(
window.session_info,
(expected_start_session..=session).map(dummy_session_info).collect::<Vec<_>>(),
);
window
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(session));
}
);
for i in expect_requests_from..=session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
}
);
}
});
let (window, _) = futures::executor::block_on(futures::future::join(test_fut, aux_fut));
window
}
#[test]
fn cache_session_info_start_empty_db() {
let db_params = dummy_db_params();
let window = cache_session_info_test(
(10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
10,
None,
(10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
Some(db_params.clone()),
);
let window = cache_session_info_test(
(11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
11,
Some(window),
11,
None,
);
assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize);
cache_session_info_test(
(11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
12,
None,
12,
Some(db_params),
);
}
#[test]
fn cache_session_info_first_early() {
cache_session_info_test(0, 1, None, 0, None);
}
#[test]
fn cache_session_info_does_not_underflow() {
let window = RollingSessionWindow {
earliest_session: 1,
session_info: vec![dummy_session_info(1)],
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
cache_session_info_test(1, 2, Some(window), 2, None);
}
#[test]
fn cache_session_window_contains() {
let window = RollingSessionWindow {
earliest_session: 10,
session_info: vec![dummy_session_info(1)],
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
assert!(!window.contains(0));
assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get()));
assert!(!window.contains(11));
assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get() - 1));
}
#[test]
fn cache_session_info_first_late() {
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
100,
None,
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
None,
);
}
#[test]
fn cache_session_info_jump() {
let window = RollingSessionWindow {
earliest_session: 50,
session_info: vec![
dummy_session_info(50),
dummy_session_info(51),
dummy_session_info(52),
],
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
100,
Some(window),
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
None,
);
}
#[test]
fn cache_session_info_roll_full() {
let start = 99 - (SESSION_WINDOW_SIZE.get() - 1);
let window = RollingSessionWindow {
earliest_session: start,
session_info: (start..=99).map(dummy_session_info).collect(),
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
100,
Some(window),
100, None,
);
}
#[test]
fn cache_session_info_roll_many_full_db() {
let db_params = dummy_db_params();
let start = 97 - (SESSION_WINDOW_SIZE.get() - 1);
let window = RollingSessionWindow {
earliest_session: start,
session_info: (start..=97).map(dummy_session_info).collect(),
window_size: SESSION_WINDOW_SIZE,
db_params: Some(db_params.clone()),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
100,
Some(window),
98,
None,
);
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
101,
None,
101,
Some(db_params.clone()),
);
let window = cache_session_info_test(195, 200, None, 195, Some(db_params));
assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize);
}
#[test]
fn cache_session_info_roll_many_full() {
let start = 97 - (SESSION_WINDOW_SIZE.get() - 1);
let window = RollingSessionWindow {
earliest_session: start,
session_info: (start..=97).map(dummy_session_info).collect(),
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1),
100,
Some(window),
98,
None,
);
}
#[test]
fn cache_session_info_roll_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: start,
session_info: (0..=1).map(dummy_session_info).collect(),
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
cache_session_info_test(
0,
2,
Some(window),
2, None,
);
}
#[test]
fn cache_session_info_roll_many_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: start,
session_info: (0..=1).map(dummy_session_info).collect(),
window_size: SESSION_WINDOW_SIZE,
db_params: Some(dummy_db_params()),
};
let actual_window_size = window.session_info.len() as u32;
cache_session_info_test(0, 3, Some(window), actual_window_size, None);
}
#[test]
fn cache_session_fails_for_gap_in_window() {
let session: SessionIndex = 100;
let genesis_session: SessionIndex = 0;
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
let sender = ctx.sender().clone();
Box::pin(async move {
let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
assert!(res.is_err());
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(0));
}
);
for i in genesis_session..=50 {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(None));
}
);
}
for i in 51..=60 {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
}
);
}
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(61, j);
let _ = s_tx.send(Ok(None));
}
);
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
fn any_session_stretch_with_failure_allowed_for_unfinalized_chain() {
let session: SessionIndex = 100;
let genesis_session: SessionIndex = 0;
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
let sender = ctx.sender().clone();
Box::pin(async move {
let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
assert!(res.is_ok());
let rsw = res.unwrap();
assert_eq!(rsw.earliest_session, 50);
assert_eq!(rsw.session_info.len(), 51);
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(0));
}
);
for i in genesis_session..=session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(if i < 50 {
None
} else {
Some(dummy_session_info(i))
}));
}
);
}
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
fn any_session_unavailable_for_caching_means_no_change() {
let session: SessionIndex = 6;
let start_session = session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1);
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
let sender = ctx.sender().clone();
Box::pin(async move {
let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await;
assert!(res.is_err());
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(session));
}
);
for i in start_session..=session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(if i == session {
None
} else {
Some(dummy_session_info(i))
}));
}
);
}
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
fn request_session_info_for_genesis() {
let session: SessionIndex = 0;
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
Box::pin(async move {
let sender = ctx.sender().clone();
let window =
RollingSessionWindow::new(sender, hash, dummy_db_params()).await.unwrap();
assert_eq!(window.earliest_session, session);
assert_eq!(window.session_info, vec![dummy_session_info(session)]);
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, header.number);
let _ = s_tx.send(Ok(Some(header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.hash());
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(s, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(s, session);
let _ = s_tx.send(Ok(Some(dummy_session_info(s))));
}
);
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
}