1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
// Copyright 2021 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Large statement requesting background task logic.
use std::time::Duration;
use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use polkadot_node_network_protocol::{
request_response::{
v1::{StatementFetchingRequest, StatementFetchingResponse},
OutgoingRequest, Recipient, Requests,
},
PeerId, UnifiedReputationChange,
};
use polkadot_node_subsystem::{Span, Stage};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v2::{CandidateHash, CommittedCandidateReceipt, Hash};
use crate::{metrics::Metrics, COST_WRONG_HASH, LOG_TARGET};
// In case we failed fetching from our known peers, how long we should wait before attempting a
// retry, even though we have not yet discovered any new peers. Or in other words how long to
// wait before retrying peers that already failed.
const RETRY_TIMEOUT: Duration = Duration::from_millis(500);
/// Messages coming from a background task.
pub enum RequesterMessage {
/// Get an update of available peers to try for fetching a given statement.
GetMorePeers {
relay_parent: Hash,
candidate_hash: CandidateHash,
tx: oneshot::Sender<Vec<PeerId>>,
},
/// Fetching finished, ask for verification. If verification fails, task will continue asking
/// peers for data.
Finished {
/// Relay parent this candidate is in the context of.
relay_parent: Hash,
/// The candidate we fetched data for.
candidate_hash: CandidateHash,
/// Data was fetched from this peer.
from_peer: PeerId,
/// Response we received from above peer.
response: CommittedCandidateReceipt,
/// Peers which failed providing the data.
bad_peers: Vec<PeerId>,
},
/// Report a peer which behaved worse than just not providing data:
ReportPeer(PeerId, UnifiedReputationChange),
/// Ask subsystem to send a request for us.
SendRequest(Requests),
}
/// A fetching task, taking care of fetching large statements via request/response.
///
/// A fetch task does not know about a particular `Statement` instead it just tries fetching a
/// `CommittedCandidateReceipt` from peers, whether this can be used to re-assemble one ore
/// many `SignedFullStatement`s needs to be verified by the caller.
pub async fn fetch(
relay_parent: Hash,
candidate_hash: CandidateHash,
peers: Vec<PeerId>,
mut sender: mpsc::Sender<RequesterMessage>,
metrics: Metrics,
) {
let span = Span::new(candidate_hash, "fetch-large-statement")
.with_relay_parent(relay_parent)
.with_stage(Stage::StatementDistribution);
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?relay_parent,
"Fetch for large statement started",
);
// Peers we already tried (and failed).
let mut tried_peers = Vec::new();
// Peers left for trying out.
let mut new_peers = peers;
let req = StatementFetchingRequest { relay_parent, candidate_hash };
// We retry endlessly (with sleep periods), and rely on the subsystem to kill us eventually.
loop {
let span = span.child("try-available-peers");
while let Some(peer) = new_peers.pop() {
let _span = span.child("try-peer").with_peer_id(&peer);
let (outgoing, pending_response) =
OutgoingRequest::new(Recipient::Peer(peer), req.clone());
if let Err(err) = sender
.feed(RequesterMessage::SendRequest(Requests::StatementFetchingV1(outgoing)))
.await
{
gum::info!(
target: LOG_TARGET,
?err,
"Sending request failed, node might be shutting down - exiting."
);
return
}
metrics.on_sent_request();
match pending_response.await {
Ok(StatementFetchingResponse::Statement(statement)) => {
if statement.hash() != candidate_hash {
metrics.on_received_response(false);
metrics.on_unexpected_statement_large();
if let Err(err) =
sender.feed(RequesterMessage::ReportPeer(peer, COST_WRONG_HASH)).await
{
gum::warn!(
target: LOG_TARGET,
?err,
"Sending reputation change failed: This should not happen."
);
}
// We want to get rid of this peer:
continue
}
if let Err(err) = sender
.feed(RequesterMessage::Finished {
relay_parent,
candidate_hash,
from_peer: peer,
response: statement,
bad_peers: tried_peers,
})
.await
{
gum::warn!(
target: LOG_TARGET,
?err,
"Sending task response failed: This should not happen."
);
}
metrics.on_received_response(true);
// We are done now.
return
},
Err(err) => {
gum::debug!(
target: LOG_TARGET,
?err,
"Receiving response failed with error - trying next peer."
);
metrics.on_received_response(false);
metrics.on_unexpected_statement_large();
},
}
tried_peers.push(peer);
}
new_peers = std::mem::take(&mut tried_peers);
// All our peers failed us - try getting new ones before trying again:
match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
Ok(Some(mut peers)) => {
gum::trace!(target: LOG_TARGET, ?peers, "Received new peers.");
// New arrivals will be tried first:
new_peers.append(&mut peers);
},
// No new peers, try the old ones again (if we have any):
Ok(None) => {
// Note: In case we don't have any more peers, we will just keep asking for new
// peers, which is exactly what we want.
},
Err(()) => return,
}
}
}
/// Try getting new peers from subsystem.
///
/// If there are non, we will return after a timeout with `None`.
async fn try_get_new_peers(
relay_parent: Hash,
candidate_hash: CandidateHash,
sender: &mut mpsc::Sender<RequesterMessage>,
span: &Span,
) -> Result<Option<Vec<PeerId>>, ()> {
let _span = span.child("wait-for-peers");
let (tx, rx) = oneshot::channel();
if let Err(err) = sender
.send(RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx })
.await
{
gum::debug!(
target: LOG_TARGET,
?err,
"Failed sending background task message, subsystem probably moved on."
);
return Err(())
}
match rx.timeout(RETRY_TIMEOUT).await.transpose() {
Err(_) => {
gum::debug!(target: LOG_TARGET, "Failed fetching more peers.");
Err(())
},
Ok(val) => Ok(val),
}
}