1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
// This file is part of Substrate.
// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Provides a generic wrapper around shared data. See [`SharedData`] for more information.
use parking_lot::{Condvar, MappedMutexGuard, Mutex, MutexGuard};
use std::sync::Arc;
/// Created by [`SharedDataLocked::release_mutex`].
///
/// As long as the object isn't dropped, the shared data is locked. It is advised to drop this
/// object when the shared data doesn't need to be locked anymore. To get access to the shared data
/// [`Self::upgrade`] is provided.
#[must_use = "Shared data will be unlocked on drop!"]
pub struct SharedDataLockedUpgradable<T> {
shared_data: SharedData<T>,
}
impl<T> SharedDataLockedUpgradable<T> {
/// Upgrade to a *real* mutex guard that will give access to the inner data.
///
/// Every call to this function will reaquire the mutex again.
pub fn upgrade(&mut self) -> MappedMutexGuard<T> {
MutexGuard::map(self.shared_data.inner.lock(), |i| &mut i.shared_data)
}
}
impl<T> Drop for SharedDataLockedUpgradable<T> {
fn drop(&mut self) {
let mut inner = self.shared_data.inner.lock();
// It should not be locked anymore
inner.locked = false;
// Notify all waiting threads.
self.shared_data.cond_var.notify_all();
}
}
/// Created by [`SharedData::shared_data_locked`].
///
/// As long as this object isn't dropped, the shared data is held in a mutex guard and the shared
/// data is tagged as locked. Access to the shared data is provided through
/// [`Deref`](std::ops::Deref) and [`DerefMut`](std::ops::DerefMut). The trick is to use
/// [`Self::release_mutex`] to release the mutex, but still keep the shared data locked. This means
/// every other thread trying to access the shared data in this time will need to wait until this
/// lock is freed.
///
/// If this object is dropped without calling [`Self::release_mutex`], the lock will be dropped
/// immediately.
#[must_use = "Shared data will be unlocked on drop!"]
pub struct SharedDataLocked<'a, T> {
/// The current active mutex guard holding the inner data.
inner: MutexGuard<'a, SharedDataInner<T>>,
/// The [`SharedData`] instance that created this instance.
///
/// This instance is only taken on drop or when calling [`Self::release_mutex`].
shared_data: Option<SharedData<T>>,
}
impl<'a, T> SharedDataLocked<'a, T> {
/// Release the mutex, but keep the shared data locked.
pub fn release_mutex(mut self) -> SharedDataLockedUpgradable<T> {
SharedDataLockedUpgradable {
shared_data: self.shared_data.take().expect("`shared_data` is only taken on drop; qed"),
}
}
}
impl<'a, T> Drop for SharedDataLocked<'a, T> {
fn drop(&mut self) {
if let Some(shared_data) = self.shared_data.take() {
// If the `shared_data` is still set, it means [`Self::release_mutex`] wasn't
// called and the lock should be released.
self.inner.locked = false;
// Notify all waiting threads about the released lock.
shared_data.cond_var.notify_all();
}
}
}
impl<'a, T> std::ops::Deref for SharedDataLocked<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner.shared_data
}
}
impl<'a, T> std::ops::DerefMut for SharedDataLocked<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner.shared_data
}
}
/// Holds the shared data and if the shared data is currently locked.
///
/// For more information see [`SharedData`].
struct SharedDataInner<T> {
/// The actual shared data that is protected here against concurrent access.
shared_data: T,
/// Is `shared_data` currently locked and can not be accessed?
locked: bool,
}
/// Some shared data that provides support for locking this shared data for some time.
///
/// When working with consensus engines there is often data that needs to be shared between multiple
/// parts of the system, like block production and block import. This struct provides an abstraction
/// for this shared data in a generic way.
///
/// The pain point when sharing this data is often the usage of mutex guards in an async context as
/// this doesn't work for most of them as these guards don't implement `Send`. This abstraction
/// provides a way to lock the shared data, while not having the mutex locked. So, the data stays
/// locked and we are still able to hold this lock over an `await` call.
///
/// # Example
///
/// ```
/// # use sc_consensus::shared_data::SharedData;
///
/// let shared_data = SharedData::new(String::from("hello world"));
///
/// let lock = shared_data.shared_data_locked();
///
/// let shared_data2 = shared_data.clone();
/// let join_handle1 = std::thread::spawn(move || {
/// // This will need to wait for the outer lock to be released before it can access the data.
/// shared_data2.shared_data().push_str("1");
/// });
///
/// assert_eq!(*lock, "hello world");
///
/// // Let us release the mutex, but we still keep it locked.
/// // Now we could call `await` for example.
/// let mut lock = lock.release_mutex();
///
/// let shared_data2 = shared_data.clone();
/// let join_handle2 = std::thread::spawn(move || {
/// shared_data2.shared_data().push_str("2");
/// });
///
/// // We still have the lock and can upgrade it to access the data.
/// assert_eq!(*lock.upgrade(), "hello world");
/// lock.upgrade().push_str("3");
///
/// drop(lock);
/// join_handle1.join().unwrap();
/// join_handle2.join().unwrap();
///
/// let data = shared_data.shared_data();
/// // As we don't know the order of the threads, we need to check for both combinations
/// assert!(*data == "hello world321" || *data == "hello world312");
/// ```
///
/// # Deadlock
///
/// Be aware that this data structure doesn't give you any guarantees that you can not create a
/// deadlock. If you use [`release_mutex`](SharedDataLocked::release_mutex) followed by a call
/// to [`shared_data`](Self::shared_data) in the same thread will make your program dead lock.
/// The same applies when you are using a single threaded executor.
pub struct SharedData<T> {
inner: Arc<Mutex<SharedDataInner<T>>>,
cond_var: Arc<Condvar>,
}
impl<T> Clone for SharedData<T> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), cond_var: self.cond_var.clone() }
}
}
impl<T> SharedData<T> {
/// Create a new instance of [`SharedData`] to share the given `shared_data`.
pub fn new(shared_data: T) -> Self {
Self {
inner: Arc::new(Mutex::new(SharedDataInner { shared_data, locked: false })),
cond_var: Default::default(),
}
}
/// Acquire access to the shared data.
///
/// This will give mutable access to the shared data. After the returned mutex guard is dropped,
/// the shared data is accessible by other threads. So, this function should be used when
/// reading/writing of the shared data in a local context is required.
///
/// When requiring to lock shared data for some longer time, even with temporarily releasing the
/// lock, [`Self::shared_data_locked`] should be used.
pub fn shared_data(&self) -> MappedMutexGuard<T> {
let mut guard = self.inner.lock();
while guard.locked {
self.cond_var.wait(&mut guard);
}
debug_assert!(!guard.locked);
MutexGuard::map(guard, |i| &mut i.shared_data)
}
/// Acquire access to the shared data and lock it.
///
/// This will give mutable access to the shared data. The returned [`SharedDataLocked`]
/// provides the function [`SharedDataLocked::release_mutex`] to release the mutex, but
/// keeping the data locked. This is useful in async contexts for example where the data needs
/// to be locked, but a mutex guard can not be held.
///
/// For an example see [`SharedData`].
pub fn shared_data_locked(&self) -> SharedDataLocked<T> {
let mut guard = self.inner.lock();
while guard.locked {
self.cond_var.wait(&mut guard);
}
debug_assert!(!guard.locked);
guard.locked = true;
SharedDataLocked { inner: guard, shared_data: Some(self.clone()) }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shared_data_locking_works() {
const THREADS: u32 = 100;
let shared_data = SharedData::new(0u32);
let lock = shared_data.shared_data_locked();
for i in 0..THREADS {
let data = shared_data.clone();
std::thread::spawn(move || {
if i % 2 == 1 {
*data.shared_data() += 1;
} else {
let mut lock = data.shared_data_locked().release_mutex();
// Give the other threads some time to wake up
std::thread::sleep(std::time::Duration::from_millis(10));
*lock.upgrade() += 1;
}
});
}
let lock = lock.release_mutex();
std::thread::sleep(std::time::Duration::from_millis(100));
drop(lock);
while *shared_data.shared_data() < THREADS {
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}