use futures::{channel::oneshot, future::FutureExt};
use sp_core::{crypto::Pair, ecdsa, ed25519, sr25519, traits::SpawnNamed};
use std::sync::{
atomic::{AtomicBool, Ordering as AtomicOrdering},
Arc,
};
#[derive(Debug, Clone)]
struct Sr25519BatchItem {
signature: sr25519::Signature,
pub_key: sr25519::Public,
message: Vec<u8>,
}
pub struct BatchVerifier {
scheduler: Box<dyn SpawnNamed>,
sr25519_items: Vec<Sr25519BatchItem>,
invalid: Arc<AtomicBool>,
pending_tasks: Vec<oneshot::Receiver<()>>,
}
impl BatchVerifier {
pub fn new(scheduler: Box<dyn SpawnNamed>) -> Self {
BatchVerifier {
scheduler,
sr25519_items: Default::default(),
invalid: Arc::new(false.into()),
pending_tasks: vec![],
}
}
fn spawn_verification_task(
&mut self,
f: impl FnOnce() -> bool + Send + 'static,
name: &'static str,
) -> bool {
if self.invalid.load(AtomicOrdering::Relaxed) {
return false
}
let invalid_clone = self.invalid.clone();
let (sender, receiver) = oneshot::channel();
self.pending_tasks.push(receiver);
self.scheduler.spawn(
name,
None,
async move {
if !f() {
invalid_clone.store(true, AtomicOrdering::Relaxed);
}
if sender.send(()).is_err() {
log::warn!("Verification halted while result was pending");
invalid_clone.store(true, AtomicOrdering::Relaxed);
}
}
.boxed(),
);
true
}
pub fn push_ed25519(
&mut self,
signature: ed25519::Signature,
pub_key: ed25519::Public,
message: Vec<u8>,
) -> bool {
self.spawn_verification_task(
move || ed25519::Pair::verify(&signature, &message, &pub_key),
"substrate_ed25519_verify",
)
}
pub fn push_sr25519(
&mut self,
signature: sr25519::Signature,
pub_key: sr25519::Public,
message: Vec<u8>,
) -> bool {
if self.invalid.load(AtomicOrdering::Relaxed) {
return false
}
self.sr25519_items.push(Sr25519BatchItem { signature, pub_key, message });
if self.sr25519_items.len() >= 128 {
let items = std::mem::take(&mut self.sr25519_items);
self.spawn_verification_task(
move || Self::verify_sr25519_batch(items),
"substrate_sr25519_verify",
)
} else {
true
}
}
pub fn push_ecdsa(
&mut self,
signature: ecdsa::Signature,
pub_key: ecdsa::Public,
message: Vec<u8>,
) -> bool {
self.spawn_verification_task(
move || ecdsa::Pair::verify(&signature, &message, &pub_key),
"substrate_ecdsa_verify",
)
}
fn verify_sr25519_batch(items: Vec<Sr25519BatchItem>) -> bool {
let messages = items.iter().map(|item| &item.message[..]).collect();
let signatures = items.iter().map(|item| &item.signature).collect();
let pub_keys = items.iter().map(|item| &item.pub_key).collect();
sr25519::verify_batch(messages, signatures, pub_keys)
}
#[must_use]
pub fn verify_and_clear(&mut self) -> bool {
let pending = std::mem::take(&mut self.pending_tasks);
let started = std::time::Instant::now();
log::trace!(
target: "runtime",
"Batch-verification: {} pending tasks, {} sr25519 signatures",
pending.len(),
self.sr25519_items.len(),
);
if !Self::verify_sr25519_batch(std::mem::take(&mut self.sr25519_items)) {
return false
}
if pending.len() > 0 {
let (sender, receiver) = std::sync::mpsc::channel();
self.scheduler.spawn(
"substrate-batch-verify-join",
None,
async move {
futures::future::join_all(pending).await;
sender.send(()).expect(
"Channel never panics if receiver is live. \
Receiver is always live until received this data; qed. ",
);
}
.boxed(),
);
if receiver.recv().is_err() {
log::warn!(
target: "runtime",
"Haven't received async result from verification task. Returning false.",
);
return false
}
}
log::trace!(
target: "runtime",
"Finalization of batch verification took {} ms",
started.elapsed().as_millis(),
);
!self.invalid.swap(false, AtomicOrdering::Relaxed)
}
}