Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4688,6 +4688,7 @@ dependencies = [
"crossbeam-deque",
"crossbeam-utils",
"libc",
"parking_lot",
"rand 0.9.2",
"rand_xorshift",
"scoped-tls",
Expand Down
12 changes: 12 additions & 0 deletions compiler/rustc_data_structures/src/sync/worker_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ impl<T> WorkerLocal<T> {
pub fn into_inner(self) -> impl Iterator<Item = T> {
self.locals.into_vec().into_iter().map(|local| local.0)
}

#[inline]
pub unsafe fn as_slice_unchecked(this: &Self) -> &[CacheAligned<T>] {
&this.locals
}
}

impl<T: Sync> WorkerLocal<T> {
#[inline]
pub fn as_slice(this: &Self) -> &[CacheAligned<T>] {
&this.locals
}
}

impl<T> Deref for WorkerLocal<T> {
Expand Down
8 changes: 2 additions & 6 deletions compiler/rustc_interface/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use rustc_ast::{LitKind, MetaItemKind, token};
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::jobserver::{self, Proxy};
use rustc_data_structures::jobserver;
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
use rustc_lint::LintStore;
use rustc_middle::ty;
Expand Down Expand Up @@ -41,9 +41,6 @@ pub struct Compiler {

/// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`.
pub(crate) current_gcx: CurrentGcx,

/// A jobserver reference which we pass on to `GlobalCtxt`.
pub(crate) jobserver_proxy: Arc<Proxy>,
}

/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
Expand Down Expand Up @@ -412,7 +409,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
config.opts.unstable_opts.threads.unwrap_or(1),
&config.extra_symbols,
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
|current_gcx, jobserver_proxy| {
|current_gcx| {
// The previous `early_dcx` can't be reused here because it doesn't
// impl `Send`. Creating a new one is fine.
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
Expand Down Expand Up @@ -484,7 +481,6 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
codegen_backend,
override_queries: config.override_queries,
current_gcx,
jobserver_proxy,
};

// There are two paths out of `f`.
Expand Down
1 change: 0 additions & 1 deletion compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,6 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
),
providers.hooks,
compiler.current_gcx.clone(),
Arc::clone(&compiler.jobserver_proxy),
|tcx| {
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
assert_eq!(feed.key(), LOCAL_CRATE);
Expand Down
24 changes: 10 additions & 14 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
})
}

fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
thread_stack_size: usize,
edition: Edition,
sm_inputs: SourceMapInputs,
Expand All @@ -158,7 +158,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R:
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new(), Proxy::new()),
|| f(CurrentGcx::new()),
)
})
.unwrap()
Expand All @@ -171,10 +171,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R:
})
}

pub(crate) fn run_in_thread_pool_with_globals<
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
R: Send,
>(
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
thread_builder_diag: &EarlyDiagCtxt,
edition: Edition,
threads: usize,
Expand All @@ -198,11 +195,11 @@ pub(crate) fn run_in_thread_pool_with_globals<
edition,
sm_inputs,
extra_symbols,
|current_gcx, jobserver_proxy| {
|current_gcx| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

f(current_gcx, jobserver_proxy)
f(current_gcx)
},
);
};
Expand All @@ -211,13 +208,12 @@ pub(crate) fn run_in_thread_pool_with_globals<
let current_gcx2 = current_gcx.clone();

let proxy = Proxy::new();

let proxy_ = Arc::clone(&proxy);
let proxy__ = Arc::clone(&proxy);

let builder = rustc_thread_pool::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(move || proxy_.acquire_thread())
.release_thread_handler(move || proxy__.release_thread())
.acquire_thread_handler(move || proxy.acquire_thread())
.release_thread_handler(move || proxy_.release_thread())
.num_threads(threads)
.deadlock_handler(move || {
// On deadlock, creates a new thread and forwards information in thread
Expand Down Expand Up @@ -262,7 +258,7 @@ internal compiler error: query cycle handler thread panicked, aborting process";
)
},
);
break_query_cycle(job_map, &registry);
break_query_cycle(tcx, job_map, &registry);
})
})
});
Expand Down Expand Up @@ -293,7 +289,7 @@ internal compiler error: query cycle handler thread panicked, aborting process";
},
// Run `f` on the first thread in the thread pool.
move |pool: &rustc_thread_pool::ThreadPool| {
pool.install(|| f(current_gcx.into_inner(), proxy))
pool.install(|| f(current_gcx.into_inner()))
},
)
.unwrap_or_else(|err| {
Expand Down
74 changes: 32 additions & 42 deletions compiler/rustc_middle/src/query/job.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem;
use std::num::NonZero;
use std::sync::Arc;

use parking_lot::{Condvar, Mutex};
use parking_lot::Mutex;
use rustc_span::Span;

use crate::query::Cycle;
Expand Down Expand Up @@ -54,20 +56,20 @@ impl<'tcx> QueryJob<'tcx> {
#[derive(Debug)]
pub struct QueryWaiter<'tcx> {
pub parent: Option<QueryJobId>,
pub condvar: Condvar,
pub span: Span,
pub cycle: Mutex<Option<Cycle<'tcx>>>,
pub cycle: Option<Cycle<'tcx>>,
}

#[derive(Clone, Debug)]
pub struct QueryLatch<'tcx> {
/// The `Option` is `Some(..)` when the job is active, and `None` once completed.
pub waiters: Arc<Mutex<Option<Vec<Arc<QueryWaiter<'tcx>>>>>>,
/// The `waiters` is not `usize::MAX` when the job is active, and `usize::MAX` once completed.
pub waiters: Arc<Mutex<usize>>,
pub _marker: PhantomData<&'tcx ()>,
}

impl<'tcx> QueryLatch<'tcx> {
fn new() -> Self {
QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) }
QueryLatch { waiters: Arc::new(Mutex::new(0)), _marker: PhantomData }
}

/// Awaits for the query job to complete.
Expand All @@ -77,62 +79,50 @@ impl<'tcx> QueryLatch<'tcx> {
query: Option<QueryJobId>,
span: Span,
) -> Result<(), Cycle<'tcx>> {
let thread_index = rustc_thread_pool::current_thread_index().unwrap();
let mut waiters_guard = self.waiters.lock();
let Some(waiters) = &mut *waiters_guard else {
if *waiters_guard == usize::MAX {
return Ok(()); // already complete
};
debug_assert!(*waiters_guard & (1 << thread_index) == 0);

let waiter = Arc::new(QueryWaiter {
parent: query,
span,
cycle: Mutex::new(None),
condvar: Condvar::new(),
});
let waiter = QueryWaiter { parent: query, span, cycle: None };

// We push the waiter on to the `waiters` list. It can be accessed inside
// the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
// Both of these will remove it from the `waiters` list before resuming
// this thread.
waiters.push(Arc::clone(&waiter));
let mut waiters_state = tcx.waiters.lock();
if mem::replace(&mut *waiters_state, Some(waiter)).is_some() {
panic!("tried to place a waiter twice for a worker thread")
}
*waiters_guard |= 1 << thread_index;
drop(waiters_state);

// Awaits the caller on this latch by blocking the current thread.
// If this detects a deadlock and the deadlock handler wants to resume this thread
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rustc_thread_pool::mark_blocked();
tcx.jobserver_proxy.release_thread();
waiter.condvar.wait(&mut waiters_guard);
// Release the lock before we potentially block in `acquire_thread`
drop(waiters_guard);
tcx.jobserver_proxy.acquire_thread();

// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
// although another thread may still have a Arc reference so we cannot
// use Arc::get_mut
let mut cycle = waiter.cycle.lock();
match cycle.take() {
None => Ok(()),
Some(cycle) => Err(cycle),
}
rustc_thread_pool::park(waiters_guard, |_| {
// Reset our QueryWaiter to None
let waiter = tcx.waiters.lock().take().unwrap();
match waiter.cycle {
None => Ok(()),
Some(cycle) => Err(cycle),
}
})
}

/// Sets the latch and resumes all waiters on it
fn set(&self) {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.take().unwrap(); // mark the latch as complete
let waiters = mem::replace(&mut *waiters_guard, usize::MAX); // mark the latch as complete
debug_assert!(waiters != usize::MAX);
let registry = rustc_thread_pool::Registry::current();
for waiter in waiters {
rustc_thread_pool::mark_unblocked(&registry);
waiter.condvar.notify_one();
for waiter_thread in 0..usize::BITS - 1 {
if waiters & (1 << waiter_thread) != 0 {
rustc_thread_pool::unpark(&registry, waiter_thread as usize);
}
}
}

/// Removes a single waiter from the list of waiters.
/// This is used to break query cycles.
pub fn extract_waiter(&self, waiter: usize) -> Arc<QueryWaiter<'tcx>> {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.as_mut().expect("non-empty waiters vec");
// Remove the waiter from the list of waiters
waiters.remove(waiter)
}
}
11 changes: 4 additions & 7 deletions compiler/rustc_middle/src/ty/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::ops::Deref;
use std::sync::{Arc, OnceLock};
use std::{fmt, iter, mem};

use parking_lot::Mutex;
use rustc_abi::{ExternAbi, FieldIdx, Layout, LayoutData, TargetDataLayout, VariantIdx};
use rustc_ast as ast;
use rustc_data_structures::defer;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::intern::Interned;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
use rustc_data_structures::stable_hasher::StableHash;
Expand Down Expand Up @@ -60,7 +60,7 @@ use crate::middle::codegen_fn_attrs::{CodegenFnAttrs, TargetFeature};
use crate::middle::resolve_bound_vars;
use crate::mir::interpret::{self, Allocation, ConstAllocation};
use crate::mir::{Body, Local, Place, PlaceElem, ProjectionKind, Promoted};
use crate::query::{IntoQueryKey, LocalCrate, Providers, QuerySystem, TyCtxtAt};
use crate::query::{IntoQueryKey, LocalCrate, Providers, QuerySystem, QueryWaiter, TyCtxtAt};
use crate::thir::Thir;
use crate::traits;
use crate::traits::solve::{ExternalConstraints, ExternalConstraintsData, PredefinedOpaques};
Expand Down Expand Up @@ -693,6 +693,7 @@ impl<'tcx> Deref for TyCtxt<'tcx> {
pub struct GlobalCtxt<'tcx> {
pub arena: &'tcx WorkerLocal<Arena<'tcx>>,
pub hir_arena: &'tcx WorkerLocal<hir::Arena<'tcx>>,
pub waiters: WorkerLocal<Mutex<Option<QueryWaiter<'tcx>>>>,

interners: CtxtInterners<'tcx>,

Expand Down Expand Up @@ -759,9 +760,6 @@ pub struct GlobalCtxt<'tcx> {
pub(crate) alloc_map: interpret::AllocMap<'tcx>,

current_gcx: CurrentGcx,

/// A jobserver reference used to release then acquire a token while waiting on a query.
pub jobserver_proxy: Arc<Proxy>,
}

impl<'tcx> GlobalCtxt<'tcx> {
Expand Down Expand Up @@ -937,7 +935,6 @@ impl<'tcx> TyCtxt<'tcx> {
query_system: QuerySystem<'tcx>,
hooks: crate::hooks::Providers,
current_gcx: CurrentGcx,
jobserver_proxy: Arc<Proxy>,
f: impl FnOnce(TyCtxt<'tcx>) -> T,
) -> T {
let data_layout = sess.target.parse_data_layout().unwrap_or_else(|err| {
Expand All @@ -954,6 +951,7 @@ impl<'tcx> TyCtxt<'tcx> {
stable_crate_id,
arena,
hir_arena,
waiters: Default::default(),
interners,
dep_graph,
hooks,
Expand All @@ -975,7 +973,6 @@ impl<'tcx> TyCtxt<'tcx> {
data_layout,
alloc_map: interpret::AllocMap::new(),
current_gcx,
jobserver_proxy,
});

// This is a separate function to work around a crash with parallel rustc (#135870)
Expand Down
Loading
Loading