#![cfg_attr(not(feature = "std"), no_std)]
pub mod migration;
#[cfg(test)]
mod mock;
#[cfg(test)]
mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub mod weights;
pub use weights::WeightInfo;
use codec::{Decode, DecodeLimit, Encode};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};
use frame_support::{
traits::EnsureOrigin,
weights::{constants::WEIGHT_REF_TIME_PER_MILLIS, Weight},
};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
};
use scale_info::TypeInfo;
use sp_runtime::{traits::Hash, RuntimeDebug};
use sp_std::{convert::TryFrom, prelude::*};
use xcm::{
latest::{prelude::*, Weight as XcmWeight},
VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH,
};
use xcm_executor::traits::ConvertOrigin;
pub use pallet::*;
pub type OverweightIndex = u64;
const LOG_TARGET: &str = "xcmp_queue";
const DEFAULT_POV_SIZE: u64 = 64 * 1024; #[frame_support::pallet]
pub mod pallet {
use super::*;
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::*;
#[pallet::pallet]
#[pallet::generate_store(pub(super) trait Store)]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config {
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type XcmExecutor: ExecuteXcm<Self::RuntimeCall>;
type ChannelInfo: GetChannelInfo;
type VersionWrapper: WrapVersion;
type ExecuteOverweightOrigin: EnsureOrigin<Self::RuntimeOrigin>;
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
type WeightInfo: WeightInfo;
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_runtime_upgrade() -> Weight {
migration::migrate_to_latest::<T>()
}
fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
Self::service_xcmp_queue(max_weight)
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::call_index(0)]
#[pallet::weight((Weight::from_ref_time(weight_limit.saturating_add(1_000_000)), DispatchClass::Operational,))]
pub fn service_overweight(
origin: OriginFor<T>,
index: OverweightIndex,
weight_limit: XcmWeight,
) -> DispatchResultWithPostInfo {
T::ExecuteOverweightOrigin::ensure_origin(origin)?;
let (sender, sent_at, data) =
Overweight::<T>::get(index).ok_or(Error::<T>::BadOverweightIndex)?;
let xcm = VersionedXcm::<T::RuntimeCall>::decode_all_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut data.as_slice(),
)
.map_err(|_| Error::<T>::BadXcm)?;
let used =
Self::handle_xcm_message(sender, sent_at, xcm, Weight::from_ref_time(weight_limit))
.map_err(|_| Error::<T>::WeightOverLimit)?;
Overweight::<T>::remove(index);
Self::deposit_event(Event::OverweightServiced { index, used });
Ok(Some(used.saturating_add(Weight::from_ref_time(1_000_000))).into())
}
#[pallet::call_index(1)]
#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
T::ControllerOrigin::ensure_origin(origin)?;
QueueSuspended::<T>::put(true);
Ok(())
}
#[pallet::call_index(2)]
#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
T::ControllerOrigin::ensure_origin(origin)?;
QueueSuspended::<T>::put(false);
Ok(())
}
#[pallet::call_index(3)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| data.suspend_threshold = new);
Ok(())
}
#[pallet::call_index(4)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| data.drop_threshold = new);
Ok(())
}
#[pallet::call_index(5)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| data.resume_threshold = new);
Ok(())
}
#[pallet::call_index(6)]
#[pallet::weight((T::WeightInfo::set_config_with_weight(), DispatchClass::Operational,))]
pub fn update_threshold_weight(origin: OriginFor<T>, new: XcmWeight) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| data.threshold_weight = Weight::from_ref_time(new));
Ok(())
}
#[pallet::call_index(7)]
#[pallet::weight((T::WeightInfo::set_config_with_weight(), DispatchClass::Operational,))]
pub fn update_weight_restrict_decay(
origin: OriginFor<T>,
new: XcmWeight,
) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| {
data.weight_restrict_decay = Weight::from_ref_time(new)
});
Ok(())
}
#[pallet::call_index(8)]
#[pallet::weight((T::WeightInfo::set_config_with_weight(), DispatchClass::Operational,))]
pub fn update_xcmp_max_individual_weight(
origin: OriginFor<T>,
new: XcmWeight,
) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::<T>::mutate(|data| {
data.xcmp_max_individual_weight = Weight::from_ref_time(new)
});
Ok(())
}
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
Success { message_hash: Option<T::Hash>, weight: Weight },
Fail { message_hash: Option<T::Hash>, error: XcmError, weight: Weight },
BadVersion { message_hash: Option<T::Hash> },
BadFormat { message_hash: Option<T::Hash> },
UpwardMessageSent { message_hash: Option<T::Hash> },
XcmpMessageSent { message_hash: Option<T::Hash> },
OverweightEnqueued {
sender: ParaId,
sent_at: RelayBlockNumber,
index: OverweightIndex,
required: Weight,
},
OverweightServiced { index: OverweightIndex, used: Weight },
}
#[pallet::error]
pub enum Error<T> {
FailedToSend,
BadXcmOrigin,
BadXcm,
BadOverweightIndex,
WeightOverLimit,
}
#[pallet::storage]
pub(super) type InboundXcmpStatus<T: Config> =
StorageValue<_, Vec<InboundChannelDetails>, ValueQuery>;
#[pallet::storage]
pub(super) type InboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
RelayBlockNumber,
Vec<u8>,
ValueQuery,
>;
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
#[pallet::storage]
pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
#[pallet::storage]
pub(super) type Overweight<T: Config> =
StorageMap<_, Twox64Concat, OverweightIndex, (ParaId, RelayBlockNumber, Vec<u8>)>;
#[pallet::storage]
pub(super) type OverweightCount<T: Config> = StorageValue<_, OverweightIndex, ValueQuery>;
#[pallet::storage]
pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum InboundState {
Ok,
Suspended,
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum OutboundState {
Ok,
Suspended,
}
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, TypeInfo)]
pub struct InboundChannelDetails {
sender: ParaId,
state: InboundState,
message_metadata: Vec<(RelayBlockNumber, XcmpMessageFormat)>,
}
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
pub struct OutboundChannelDetails {
recipient: ParaId,
state: OutboundState,
signals_exist: bool,
first_index: u16,
last_index: u16,
}
impl OutboundChannelDetails {
pub fn new(recipient: ParaId) -> OutboundChannelDetails {
OutboundChannelDetails {
recipient,
state: OutboundState::Ok,
signals_exist: false,
first_index: 0,
last_index: 0,
}
}
pub fn with_signals(mut self) -> OutboundChannelDetails {
self.signals_exist = true;
self
}
pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
self.state = OutboundState::Suspended;
self
}
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub struct QueueConfigData {
suspend_threshold: u32,
drop_threshold: u32,
resume_threshold: u32,
threshold_weight: Weight,
weight_restrict_decay: Weight,
xcmp_max_individual_weight: Weight,
}
impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: Weight::from_ref_time(100_000),
weight_restrict_decay: Weight::from_ref_time(2),
xcmp_max_individual_weight: Weight::from_parts(
20u64 * WEIGHT_REF_TIME_PER_MILLIS,
DEFAULT_POV_SIZE,
),
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
pub enum ChannelSignal {
Suspend,
Resume,
}
impl<T: Config> Pallet<T> {
fn send_fragment<Fragment: Encode>(
recipient: ParaId,
format: XcmpMessageFormat,
fragment: Fragment,
) -> Result<u32, MessageSendError> {
let data = fragment.encode();
let max_message_size =
T::ChannelInfo::get_channel_max(recipient).ok_or(MessageSendError::NoChannel)?;
if data.len() > max_message_size {
return Err(MessageSendError::TooBig)
}
let mut s = <OutboundXcmpStatus<T>>::get();
let details = if let Some(details) = s.iter_mut().find(|item| item.recipient == recipient) {
details
} else {
s.push(OutboundChannelDetails::new(recipient));
s.last_mut().expect("can't be empty; a new element was just pushed; qed")
};
let have_active = details.last_index > details.first_index;
let appended = have_active &&
<OutboundXcmpMessages<T>>::mutate(recipient, details.last_index - 1, |s| {
if XcmpMessageFormat::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut &s[..]) !=
Ok(format)
{
return false
}
if s.len() + data.len() > max_message_size {
return false
}
s.extend_from_slice(&data[..]);
return true
});
if appended {
Ok((details.last_index - details.first_index - 1) as u32)
} else {
let page_index = details.last_index;
details.last_index += 1;
let mut new_page = format.encode();
new_page.extend_from_slice(&data[..]);
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let r = (details.last_index - details.first_index - 1) as u32;
<OutboundXcmpStatus<T>>::put(s);
Ok(r)
}
}
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
}
<SignalMessages<T>>::mutate(dest, |page| {
if page.is_empty() {
*page = (XcmpMessageFormat::Signals, signal).encode();
} else {
signal.using_encoded(|s| page.extend_from_slice(s));
}
});
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}
pub fn send_blob_message(recipient: ParaId, blob: Vec<u8>) -> Result<u32, MessageSendError> {
Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedEncodedBlob, blob)
}
pub fn send_xcm_message(
recipient: ParaId,
xcm: VersionedXcm<()>,
) -> Result<u32, MessageSendError> {
Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
}
fn create_shuffle(len: usize) -> Vec<usize> {
let seed = frame_system::Pallet::<T>::parent_hash();
let seed =
<[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(seed.as_ref()))
.expect("input is padded with zeroes; qed");
let mut rng = ChaChaRng::from_seed(seed);
let mut shuffled = (0..len).collect::<Vec<_>>();
for i in 0..len {
let j = (rng.next_u32() as usize) % len;
shuffled.as_mut_slice().swap(i, j);
}
shuffled
}
fn handle_blob_message(
_sender: ParaId,
_sent_at: RelayBlockNumber,
_blob: Vec<u8>,
_weight_limit: Weight,
) -> Result<Weight, bool> {
debug_assert!(false, "Blob messages not handled.");
Err(false)
}
fn handle_xcm_message(
sender: ParaId,
_sent_at: RelayBlockNumber,
xcm: VersionedXcm<T::RuntimeCall>,
max_weight: Weight,
) -> Result<Weight, XcmError> {
let hash = Encode::using_encoded(&xcm, T::Hashing::hash);
log::debug!("Processing XCMP-XCM: {:?}", &hash);
let (result, event) = match Xcm::<T::RuntimeCall>::try_from(xcm) {
Ok(xcm) => {
let location = (1, Parachain(sender.into()));
match T::XcmExecutor::execute_xcm(location, xcm, max_weight.ref_time()) {
Outcome::Error(e) => (
Err(e),
Event::Fail { message_hash: Some(hash), error: e, weight: Weight::zero() },
),
Outcome::Complete(w) => (
Ok(Weight::from_ref_time(w)),
Event::Success {
message_hash: Some(hash),
weight: Weight::from_ref_time(w),
},
),
Outcome::Incomplete(w, e) => (
Ok(Weight::from_ref_time(w)),
Event::Fail {
message_hash: Some(hash),
error: e,
weight: Weight::from_ref_time(w),
},
),
}
},
Err(()) =>
(Err(XcmError::UnhandledXcmVersion), Event::BadVersion { message_hash: Some(hash) }),
};
Self::deposit_event(event);
result
}
fn process_xcmp_message(
sender: ParaId,
(sent_at, format): (RelayBlockNumber, XcmpMessageFormat),
max_weight: Weight,
max_individual_weight: Weight,
) -> (Weight, bool) {
let data = <InboundXcmpMessages<T>>::get(sender, sent_at);
let mut last_remaining_fragments;
let mut remaining_fragments = &data[..];
let mut weight_used = Weight::zero();
match format {
XcmpMessageFormat::ConcatenatedVersionedXcm => {
while !remaining_fragments.is_empty() {
last_remaining_fragments = remaining_fragments;
if let Ok(xcm) = VersionedXcm::<T::RuntimeCall>::decode_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut remaining_fragments,
) {
let weight = max_weight - weight_used;
match Self::handle_xcm_message(sender, sent_at, xcm, weight) {
Ok(used) => weight_used = weight_used.saturating_add(used),
Err(XcmError::WeightLimitReached(required))
if required > max_individual_weight.ref_time() =>
{
let msg_len = last_remaining_fragments
.len()
.saturating_sub(remaining_fragments.len());
let overweight_xcm = last_remaining_fragments[..msg_len].to_vec();
let index = Self::stash_overweight(sender, sent_at, overweight_xcm);
let e = Event::OverweightEnqueued {
sender,
sent_at,
index,
required: Weight::from_ref_time(required),
};
Self::deposit_event(e);
},
Err(XcmError::WeightLimitReached(required))
if required <= max_weight.ref_time() =>
{
remaining_fragments = last_remaining_fragments;
break
},
Err(error) => {
log::error!(
"Failed to process XCMP-XCM message, caused by {:?}",
error
);
},
}
} else {
debug_assert!(false, "Invalid incoming XCMP message data");
remaining_fragments = &b""[..];
}
}
},
XcmpMessageFormat::ConcatenatedEncodedBlob => {
while !remaining_fragments.is_empty() {
last_remaining_fragments = remaining_fragments;
if let Ok(blob) = <Vec<u8>>::decode(&mut remaining_fragments) {
let weight = max_weight - weight_used;
match Self::handle_blob_message(sender, sent_at, blob, weight) {
Ok(used) => weight_used = weight_used.saturating_add(used),
Err(true) => {
remaining_fragments = last_remaining_fragments;
break
},
Err(false) => {
},
}
} else {
debug_assert!(false, "Invalid incoming blob message data");
remaining_fragments = &b""[..];
}
}
},
XcmpMessageFormat::Signals => {
debug_assert!(false, "All signals are handled immediately; qed");
remaining_fragments = &b""[..];
},
}
let is_empty = remaining_fragments.is_empty();
if is_empty {
<InboundXcmpMessages<T>>::remove(sender, sent_at);
} else {
<InboundXcmpMessages<T>>::insert(sender, sent_at, remaining_fragments);
}
(weight_used, is_empty)
}
fn stash_overweight(
sender: ParaId,
sent_at: RelayBlockNumber,
xcm: Vec<u8>,
) -> OverweightIndex {
let index = <Self as Store>::OverweightCount::mutate(|count| {
let index = *count;
*count += 1;
index
});
<Self as Store>::Overweight::insert(index, (sender, sent_at, xcm));
index
}
fn service_xcmp_queue(max_weight: Weight) -> Weight {
let suspended = QueueSuspended::<T>::get();
let mut status = <InboundXcmpStatus<T>>::get(); if status.is_empty() {
return Weight::zero()
}
let QueueConfigData {
resume_threshold,
threshold_weight,
weight_restrict_decay,
xcmp_max_individual_weight,
..
} = <QueueConfig<T>>::get();
let mut shuffled = Self::create_shuffle(status.len());
let mut weight_used = Weight::zero();
let mut weight_available = Weight::zero();
let mut shuffle_index = 0;
while shuffle_index < shuffled.len() &&
max_weight.saturating_sub(weight_used).all_gte(threshold_weight)
{
let index = shuffled[shuffle_index];
let sender = status[index].sender;
let sender_origin = T::ControllerOriginConverter::convert_origin(
(1, Parachain(sender.into())),
OriginKind::Superuser,
);
let is_controller = sender_origin
.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
if suspended && !is_controller {
shuffle_index += 1;
continue
}
if weight_available != max_weight {
if shuffle_index < status.len() {
weight_available +=
(max_weight - weight_available) / (weight_restrict_decay.ref_time() + 1);
if (weight_available + threshold_weight).any_gt(max_weight) {
weight_available = max_weight;
}
} else {
weight_available = max_weight;
}
}
let weight_processed = if status[index].message_metadata.is_empty() {
debug_assert!(false, "channel exists in status; there must be messages; qed");
Weight::zero()
} else {
let weight_remaining = weight_available.saturating_sub(weight_used);
let (weight_processed, is_empty) = Self::process_xcmp_message(
sender,
status[index].message_metadata[0],
weight_remaining,
xcmp_max_individual_weight,
);
if is_empty {
status[index].message_metadata.remove(0);
}
weight_processed
};
weight_used += weight_processed;
if status[index].message_metadata.len() as u32 <= resume_threshold &&
status[index].state == InboundState::Suspended
{
let r = Self::send_signal(sender, ChannelSignal::Resume);
debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel");
status[index].state = InboundState::Ok;
}
if !status[index].message_metadata.is_empty() &&
(weight_processed.any_gt(Weight::zero()) || weight_available != max_weight)
{
if shuffle_index + 1 == shuffled.len() {
continue
}
shuffled.push(index);
}
shuffle_index += 1;
}
status.retain(|item| !item.message_metadata.is_empty());
<InboundXcmpStatus<T>>::put(status);
weight_used
}
fn suspend_channel(target: ParaId) {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
let ok = details.state == OutboundState::Ok;
debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
}
});
}
fn resume_channel(target: ParaId) {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.recipient == target) {
let suspended = s[index].state == OutboundState::Suspended;
debug_assert!(
suspended,
"WARNING: Attempt to resume channel that was not suspended."
);
if s[index].first_index == s[index].last_index {
s.remove(index);
} else {
s[index].state = OutboundState::Ok;
}
} else {
debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");
}
});
}
}
impl<T: Config> XcmpMessageHandler for Pallet<T> {
fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
iter: I,
max_weight: Weight,
) -> Weight {
let mut status = <InboundXcmpStatus<T>>::get();
let QueueConfigData { suspend_threshold, drop_threshold, .. } = <QueueConfig<T>>::get();
for (sender, sent_at, data) in iter {
let mut data_ref = data;
let format = match XcmpMessageFormat::decode_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut data_ref,
) {
Ok(f) => f,
Err(_) => {
debug_assert!(false, "Unknown XCMP message format. Silently dropping message");
continue
},
};
if format == XcmpMessageFormat::Signals {
while !data_ref.is_empty() {
use ChannelSignal::*;
match ChannelSignal::decode(&mut data_ref) {
Ok(Suspend) => Self::suspend_channel(sender),
Ok(Resume) => Self::resume_channel(sender),
Err(_) => break,
}
}
} else {
match status.binary_search_by_key(&sender, |item| item.sender) {
Ok(i) => {
let count = status[i].message_metadata.len();
if count as u32 >= suspend_threshold && status[i].state == InboundState::Ok
{
status[i].state = InboundState::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!(
"Attempt to suspend channel failed. Messages may be dropped."
);
}
}
if (count as u32) < drop_threshold {
status[i].message_metadata.push((sent_at, format));
} else {
debug_assert!(
false,
"XCMP channel queue full. Silently dropping message"
);
}
},
Err(_) => status.push(InboundChannelDetails {
sender,
state: InboundState::Ok,
message_metadata: vec![(sent_at, format)],
}),
}
<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
}
}
status.sort();
<InboundXcmpStatus<T>>::put(status);
Self::service_xcmp_queue(max_weight)
}
}
impl<T: Config> XcmpMessageSource for Pallet<T> {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut statuses = <OutboundXcmpStatus<T>>::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
mut signals_exist,
mut first_index,
mut last_index,
} = *status;
if result.len() == max_message_count {
break
}
if outbound_state == OutboundState::Suspended {
continue
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
for i in first_index..last_index {
<OutboundXcmpMessages<T>>::remove(para_id, i);
}
if signals_exist {
<SignalMessages<T>>::remove(para_id);
}
*status = OutboundChannelDetails::new(para_id);
continue
},
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = if signals_exist {
let page = <SignalMessages<T>>::get(para_id);
if page.len() < max_size_now {
<SignalMessages<T>>::remove(para_id);
signals_exist = false;
page
} else {
continue
}
} else if last_index > first_index {
let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
if page.len() < max_size_now {
<OutboundXcmpMessages<T>>::remove(para_id, first_index);
first_index += 1;
page
} else {
continue
}
} else {
continue
};
if first_index == last_index {
first_index = 0;
last_index = 0;
}
if page.len() > max_size_ever {
log::warn!("WARNING: oversize message in queue. silently dropping.");
} else {
result.push((para_id, page));
}
*status = OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
signals_exist,
first_index,
last_index,
};
}
result.sort_by_key(|m| m.0);
statuses.retain(|x| {
x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
});
let pruned = old_statuses_len - statuses.len();
statuses.rotate_left(result.len() - pruned);
<OutboundXcmpStatus<T>>::put(statuses);
result
}
}
impl<T: Config> SendXcm for Pallet<T> {
fn send_xcm(dest: impl Into<MultiLocation>, msg: Xcm<()>) -> Result<(), SendError> {
let dest = dest.into();
match &dest {
MultiLocation { parents: 1, interior: X1(Parachain(id)) } => {
let versioned_xcm = T::VersionWrapper::wrap_version(&dest, msg)
.map_err(|()| SendError::DestinationUnsupported)?;
let hash = T::Hashing::hash_of(&versioned_xcm);
Self::send_fragment(
(*id).into(),
XcmpMessageFormat::ConcatenatedVersionedXcm,
versioned_xcm,
)
.map_err(|e| SendError::Transport(<&'static str>::from(e)))?;
Self::deposit_event(Event::XcmpMessageSent { message_hash: Some(hash) });
Ok(())
},
_ => Err(SendError::CannotReachDestination(dest, msg)),
}
}
}