1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// Copyright 2021 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Large statement requesting background task logic.

use std::time::Duration;

use futures::{
	channel::{mpsc, oneshot},
	SinkExt,
};

use polkadot_node_network_protocol::{
	request_response::{
		v1::{StatementFetchingRequest, StatementFetchingResponse},
		OutgoingRequest, Recipient, Requests,
	},
	PeerId, UnifiedReputationChange,
};
use polkadot_node_subsystem::{Span, Stage};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v2::{CandidateHash, CommittedCandidateReceipt, Hash};

use crate::{metrics::Metrics, COST_WRONG_HASH, LOG_TARGET};

// In case we failed fetching from our known peers, how long we should wait before attempting a
// retry, even though we have not yet discovered any new peers. Or in other words how long to
// wait before retrying peers that already failed.
const RETRY_TIMEOUT: Duration = Duration::from_millis(500);

/// Messages coming from a background task.
pub enum RequesterMessage {
	/// Get an update of available peers to try for fetching a given statement.
	GetMorePeers {
		relay_parent: Hash,
		candidate_hash: CandidateHash,
		tx: oneshot::Sender<Vec<PeerId>>,
	},
	/// Fetching finished, ask for verification. If verification fails, task will continue asking
	/// peers for data.
	Finished {
		/// Relay parent this candidate is in the context of.
		relay_parent: Hash,
		/// The candidate we fetched data for.
		candidate_hash: CandidateHash,
		/// Data was fetched from this peer.
		from_peer: PeerId,
		/// Response we received from above peer.
		response: CommittedCandidateReceipt,
		/// Peers which failed providing the data.
		bad_peers: Vec<PeerId>,
	},
	/// Report a peer which behaved worse than just not providing data:
	ReportPeer(PeerId, UnifiedReputationChange),
	/// Ask subsystem to send a request for us.
	SendRequest(Requests),
}

/// A fetching task, taking care of fetching large statements via request/response.
///
/// A fetch task does not know about a particular `Statement` instead it just tries fetching a
/// `CommittedCandidateReceipt` from peers, whether this can be used to re-assemble one ore
/// many `SignedFullStatement`s needs to be verified by the caller.
pub async fn fetch(
	relay_parent: Hash,
	candidate_hash: CandidateHash,
	peers: Vec<PeerId>,
	mut sender: mpsc::Sender<RequesterMessage>,
	metrics: Metrics,
) {
	let span = Span::new(candidate_hash, "fetch-large-statement")
		.with_relay_parent(relay_parent)
		.with_stage(Stage::StatementDistribution);

	gum::debug!(
		target: LOG_TARGET,
		?candidate_hash,
		?relay_parent,
		"Fetch for large statement started",
	);

	// Peers we already tried (and failed).
	let mut tried_peers = Vec::new();
	// Peers left for trying out.
	let mut new_peers = peers;

	let req = StatementFetchingRequest { relay_parent, candidate_hash };

	// We retry endlessly (with sleep periods), and rely on the subsystem to kill us eventually.
	loop {
		let span = span.child("try-available-peers");

		while let Some(peer) = new_peers.pop() {
			let _span = span.child("try-peer").with_peer_id(&peer);

			let (outgoing, pending_response) =
				OutgoingRequest::new(Recipient::Peer(peer), req.clone());
			if let Err(err) = sender
				.feed(RequesterMessage::SendRequest(Requests::StatementFetchingV1(outgoing)))
				.await
			{
				gum::info!(
					target: LOG_TARGET,
					?err,
					"Sending request failed, node might be shutting down - exiting."
				);
				return
			}

			metrics.on_sent_request();

			match pending_response.await {
				Ok(StatementFetchingResponse::Statement(statement)) => {
					if statement.hash() != candidate_hash {
						metrics.on_received_response(false);
						metrics.on_unexpected_statement_large();

						if let Err(err) =
							sender.feed(RequesterMessage::ReportPeer(peer, COST_WRONG_HASH)).await
						{
							gum::warn!(
								target: LOG_TARGET,
								?err,
								"Sending reputation change failed: This should not happen."
							);
						}
						// We want to get rid of this peer:
						continue
					}

					if let Err(err) = sender
						.feed(RequesterMessage::Finished {
							relay_parent,
							candidate_hash,
							from_peer: peer,
							response: statement,
							bad_peers: tried_peers,
						})
						.await
					{
						gum::warn!(
							target: LOG_TARGET,
							?err,
							"Sending task response failed: This should not happen."
						);
					}

					metrics.on_received_response(true);

					// We are done now.
					return
				},
				Err(err) => {
					gum::debug!(
						target: LOG_TARGET,
						?err,
						"Receiving response failed with error - trying next peer."
					);

					metrics.on_received_response(false);
					metrics.on_unexpected_statement_large();
				},
			}

			tried_peers.push(peer);
		}

		new_peers = std::mem::take(&mut tried_peers);

		// All our peers failed us - try getting new ones before trying again:
		match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
			Ok(Some(mut peers)) => {
				gum::trace!(target: LOG_TARGET, ?peers, "Received new peers.");
				// New arrivals will be tried first:
				new_peers.append(&mut peers);
			},
			// No new peers, try the old ones again (if we have any):
			Ok(None) => {
				// Note: In case we don't have any more peers, we will just keep asking for new
				// peers, which is exactly what we want.
			},
			Err(()) => return,
		}
	}
}

/// Try getting new peers from subsystem.
///
/// If there are non, we will return after a timeout with `None`.
async fn try_get_new_peers(
	relay_parent: Hash,
	candidate_hash: CandidateHash,
	sender: &mut mpsc::Sender<RequesterMessage>,
	span: &Span,
) -> Result<Option<Vec<PeerId>>, ()> {
	let _span = span.child("wait-for-peers");

	let (tx, rx) = oneshot::channel();

	if let Err(err) = sender
		.send(RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx })
		.await
	{
		gum::debug!(
			target: LOG_TARGET,
			?err,
			"Failed sending background task message, subsystem probably moved on."
		);
		return Err(())
	}

	match rx.timeout(RETRY_TIMEOUT).await.transpose() {
		Err(_) => {
			gum::debug!(target: LOG_TARGET, "Failed fetching more peers.");
			Err(())
		},
		Ok(val) => Ok(val),
	}
}