use super::worker::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use futures::{
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};
slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
},
}
struct ExecuteJob {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
struct Workers {
running: HopSlotMap<Worker, WorkerData>,
spawn_inflight: usize,
capacity: usize,
}
impl Workers {
fn can_afford_one_more(&self) -> bool {
self.spawn_inflight + self.running.len() < self.capacity
}
fn find_available(&self) -> Option<Worker> {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
}
fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
self.running.get_mut(worker)?.idle.take()
}
}
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
}
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
struct Queue {
metrics: Metrics,
to_queue_rx: mpsc::Receiver<ToQueue>,
program_path: PathBuf,
spawn_timeout: Duration,
queue: VecDeque<ExecuteJob>,
workers: Workers,
mux: Mux,
}
impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver<ToQueue>,
) -> Self {
Self {
metrics,
program_path,
spawn_timeout,
to_queue_rx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
running: HopSlotMap::with_capacity_and_key(10),
spawn_inflight: 0,
capacity: worker_capacity,
},
}
}
async fn run(mut self) {
loop {
futures::select! {
to_queue = self.to_queue_rx.next() => {
if let Some(to_queue) = to_queue {
handle_to_queue(&mut self, to_queue);
} else {
break;
}
}
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&self.metrics, &mut self.workers).await;
}
}
}
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
to_remove.push(worker);
}
}
for w in to_remove {
if workers.running.remove(w).is_some() {
metrics.execute_worker().on_retired();
}
}
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob { artifact, execution_timeout, params, result_tx };
if let Some(available) = queue.workers.find_available() {
assign(queue, available, job);
} else {
if queue.workers.can_afford_one_more() {
spawn_extra_worker(queue);
}
queue.queue.push_back(job);
}
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn(idle, handle) => {
handle_worker_spawned(queue, idle, handle);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
},
}
}
fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
}
fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result) = match outcome {
Outcome::Ok { result_descriptor, duration: _, idle_worker } => {
(Some(idle_worker), Ok(result_descriptor))
},
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))),
),
Outcome::InternalError { err, idle_worker } =>
(Some(idle_worker), Err(ValidationError::InternalError(err))),
Outcome::HardTimeout =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout))),
Outcome::IoErr =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))),
};
queue.metrics.execute_finished();
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?worker,
worker_rip = idle_worker.is_none(),
"execute worker concluded",
);
let _ = result_tx.send(result);
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
}
} else {
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
if !queue.queue.is_empty() {
spawn_extra_worker(queue);
}
}
}
fn spawn_extra_worker(queue: &mut Queue) {
queue.metrics.execute_worker().on_begin_spawn();
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
.mux
.push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed());
queue.workers.spawn_inflight += 1;
}
async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
Delay::new(Duration::from_secs(3)).await;
},
}
}
}
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?job.artifact.id,
?worker,
"assigning the execute worker",
);
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
qed.",
);
let execution_timer = queue.metrics.time_execution();
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker::start_work(
idle,
job.artifact.clone(),
job.execution_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
);
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
(to_queue_tx, run)
}