use crate::{
column::ColId,
display::hex,
error::{try_io, Error, Result},
log::{LogQuery, LogReader, LogWriter},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
stats::{self, ColumnStats},
table::{key::TableKey, SIZE_TIERS_BITS},
Key,
};
use std::convert::TryInto;
const CHUNK_LEN: usize = CHUNK_ENTRIES * ENTRY_BYTES; const CHUNK_ENTRIES: usize = 1 << CHUNK_ENTRIES_BITS;
const CHUNK_ENTRIES_BITS: u8 = 6;
const HEADER_SIZE: usize = 512;
const META_SIZE: usize = 16 * 1024; const ENTRY_LEN: u8 = 64;
pub const ENTRY_BYTES: usize = ENTRY_LEN as usize / 8;
const EMPTY_CHUNK: Chunk = [0u8; CHUNK_LEN];
pub type Chunk = [u8; CHUNK_LEN];
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(META_SIZE >= HEADER_SIZE + stats::TOTAL_SIZE);
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct Entry(u64);
impl Entry {
#[inline]
fn new(address: Address, partial_key: u64, index_bits: u8) -> Entry {
Entry((partial_key << Self::address_bits(index_bits)) | address.as_u64())
}
#[inline]
pub fn address_bits(index_bits: u8) -> u8 {
index_bits + CHUNK_ENTRIES_BITS + SIZE_TIERS_BITS
}
#[inline]
pub fn last_address(index_bits: u8) -> u64 {
(1u64 << Self::address_bits(index_bits)) - 1
}
#[inline]
pub fn address(&self, index_bits: u8) -> Address {
Address::from_u64(self.0 & Self::last_address(index_bits))
}
#[inline]
pub fn partial_key(&self, index_bits: u8) -> u64 {
self.0 >> Self::address_bits(index_bits)
}
#[inline]
fn extract_key(key_prefix: u64, index_bits: u8) -> u64 {
(key_prefix << index_bits) >> Self::address_bits(index_bits)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0 == 0
}
pub fn as_u64(&self) -> u64 {
self.0
}
fn empty() -> Self {
Entry(0)
}
fn from_u64(e: u64) -> Self {
Entry(e)
}
}
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
pub struct Address(u64);
impl Address {
pub const fn new(offset: u64, size_tier: u8) -> Address {
Address((offset << SIZE_TIERS_BITS) | size_tier as u64)
}
pub const fn from_u64(a: u64) -> Address {
Address(a)
}
pub fn offset(&self) -> u64 {
self.0 >> SIZE_TIERS_BITS
}
pub fn size_tier(&self) -> u8 {
(self.0 & ((1 << SIZE_TIERS_BITS) as u64 - 1)) as u8
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "addr {:02}:{}", hex(&[self.size_tier()]), self.offset())
}
}
pub enum PlanOutcome {
Written,
NeedReindex,
Skipped,
}
#[derive(Debug)]
pub struct IndexTable {
pub id: TableId,
map: RwLock<Option<memmap2::MmapMut>>,
path: std::path::PathBuf,
}
fn total_entries(index_bits: u8) -> u64 {
total_chunks(index_bits) * CHUNK_ENTRIES as u64
}
fn total_chunks(index_bits: u8) -> u64 {
1u64 << index_bits
}
fn file_size(index_bits: u8) -> u64 {
total_entries(index_bits) * 8 + META_SIZE as u64
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct TableId(u16);
impl TableId {
pub fn new(col: ColId, index_bits: u8) -> TableId {
TableId(((col as u16) << 8) | (index_bits as u16))
}
pub fn from_u16(id: u16) -> TableId {
TableId(id)
}
pub fn col(&self) -> ColId {
(self.0 >> 8) as ColId
}
pub fn index_bits(&self) -> u8 {
(self.0 & 0xff) as u8
}
pub fn file_name(&self) -> String {
format!("index_{:02}_{}", self.col(), self.index_bits())
}
pub fn is_file_name(col: ColId, name: &str) -> bool {
name.starts_with(&format!("index_{:02}_", col))
}
pub fn as_u16(&self) -> u16 {
self.0
}
pub fn total_chunks(&self) -> u64 {
total_chunks(self.index_bits())
}
pub fn total_entries(&self) -> u64 {
total_entries(self.index_bits())
}
}
impl std::fmt::Display for TableId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "i{:02}-{:02}", self.col(), self.index_bits())
}
}
impl IndexTable {
pub fn open_existing(path: &std::path::Path, id: TableId) -> Result<Option<IndexTable>> {
let mut path: std::path::PathBuf = path.into();
path.push(id.file_name());
let file = match std::fs::OpenOptions::new().read(true).write(true).open(path.as_path()) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(Error::Io(e)),
Ok(file) => file,
};
try_io!(file.set_len(file_size(id.index_bits())));
let map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
log::debug!(target: "parity-db", "Opened existing index {}", id);
Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) }))
}
pub fn create_new(path: &std::path::Path, id: TableId) -> IndexTable {
let mut path: std::path::PathBuf = path.into();
path.push(id.file_name());
IndexTable { id, path, map: RwLock::new(None) }
}
pub fn load_stats(&self) -> Result<ColumnStats> {
if let Some(map) = &*self.map.read() {
Ok(ColumnStats::from_slice(try_io!(Ok(
&map[HEADER_SIZE..HEADER_SIZE + stats::TOTAL_SIZE]
))))
} else {
Ok(ColumnStats::empty())
}
}
pub fn write_stats(&self, stats: &ColumnStats) -> Result<()> {
if let Some(map) = &mut *self.map.write() {
let slice = try_io!(Ok(&mut map[HEADER_SIZE..HEADER_SIZE + stats::TOTAL_SIZE]));
stats.to_slice(slice);
}
Ok(())
}
fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&[u8]> {
let offset = META_SIZE + index as usize * CHUNK_LEN;
Ok(try_io!(Ok(&map[offset..offset + CHUNK_LEN])))
}
#[inline(never)]
fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &[u8]) -> (Entry, usize) {
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
for i in sub_index..CHUNK_ENTRIES {
let entry = Self::read_entry(chunk, i);
if entry.partial_key(self.id.index_bits()) == partial_key {
return (entry, i)
}
}
(Entry::empty(), 0)
}
pub fn recover_key_prefix(&self, chunk: u64, entry: Entry) -> Key {
let partial_key = entry.partial_key(self.id.index_bits());
let k = 64 - Entry::address_bits(self.id.index_bits());
let index_key = (chunk << (64 - self.id.index_bits())) |
(partial_key << (64 - k - self.id.index_bits()));
let mut key = Key::default();
key[0..8].copy_from_slice(&index_key.to_be_bytes());
key
}
pub fn get(&self, key: &Key, sub_index: usize, log: &impl LogQuery) -> Result<(Entry, usize)> {
log::trace!(target: "parity-db", "{}: Querying {}", self.id, hex(key));
let key = TableKey::index_from_partial(key);
let chunk_index = self.chunk_index(key);
if let Some(entry) = log.with_index(self.id, chunk_index, |chunk| {
log::trace!(target: "parity-db", "{}: Querying overlay at {}", self.id, chunk_index);
self.find_entry(key, sub_index, chunk)
}) {
return Ok(entry)
}
if let Some(map) = &*self.map.read() {
log::trace!(target: "parity-db", "{}: Querying chunk at {}", self.id, chunk_index);
let chunk = Self::chunk_at(chunk_index, map)?;
return Ok(self.find_entry(key, sub_index, chunk))
}
Ok((Entry::empty(), 0))
}
pub fn entries(&self, chunk_index: u64, log: &impl LogQuery) -> Result<[Entry; CHUNK_ENTRIES]> {
let mut chunk = [0; CHUNK_LEN];
if let Some(entry) =
log.with_index(self.id, chunk_index, |chunk| Self::transmute_chunk(*chunk))
{
return Ok(entry)
}
if let Some(map) = &*self.map.read() {
let source = Self::chunk_at(chunk_index, map)?;
chunk.copy_from_slice(source);
return Ok(Self::transmute_chunk(chunk))
}
Ok(Self::transmute_chunk(EMPTY_CHUNK))
}
#[inline(always)]
fn transmute_chunk(chunk: [u8; CHUNK_LEN]) -> [Entry; CHUNK_ENTRIES] {
let mut result: [Entry; CHUNK_ENTRIES] = unsafe { std::mem::transmute(chunk) };
if !cfg!(target_endian = "little") {
for item in result.iter_mut() {
*item = Entry::from_u64(u64::from_le(item.0));
}
}
result
}
#[inline(always)]
fn write_entry(entry: &Entry, at: usize, chunk: &mut [u8; CHUNK_LEN]) {
chunk[at * 8..at * 8 + 8].copy_from_slice(&entry.as_u64().to_le_bytes());
}
#[inline(always)]
fn read_entry(chunk: &[u8], at: usize) -> Entry {
Entry::from_u64(u64::from_le_bytes(chunk[at * 8..at * 8 + 8].try_into().unwrap()))
}
#[inline(always)]
fn chunk_index(&self, key_prefix: u64) -> u64 {
key_prefix >> (ENTRY_LEN - self.id.index_bits())
}
fn plan_insert_chunk(
&self,
key_prefix: u64,
address: Address,
source: &[u8],
sub_index: Option<usize>,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
let chunk_index = self.chunk_index(key_prefix);
if address.as_u64() > Entry::last_address(self.id.index_bits()) {
log::warn!(target: "parity-db", "{}: Address space overflow at {}: {}", self.id, chunk_index, address);
return Ok(PlanOutcome::NeedReindex)
}
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
let new_entry = Entry::new(address, partial_key, self.id.index_bits());
if let Some(i) = sub_index {
let entry = Self::read_entry(&chunk, i);
assert_eq!(
entry.partial_key(self.id.index_bits()),
new_entry.partial_key(self.id.index_bits())
);
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Replaced at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
return Ok(PlanOutcome::Written)
}
for i in 0..CHUNK_ENTRIES {
let entry = Self::read_entry(&chunk, i);
if entry.is_empty() {
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Inserted at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
return Ok(PlanOutcome::Written)
}
}
log::trace!(target: "parity-db", "{}: Index chunk full at {}", self.id, chunk_index);
Ok(PlanOutcome::NeedReindex)
}
pub fn write_insert_plan(
&self,
key: &Key,
address: Address,
sub_index: Option<usize>,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
log::trace!(target: "parity-db", "{}: Inserting {} -> {}", self.id, hex(key), address);
let key_prefix = TableKey::index_from_partial(key);
let chunk_index = self.chunk_index(key_prefix);
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_insert_chunk(key_prefix, address, &chunk, sub_index, log)
}
if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}
let chunk = &EMPTY_CHUNK;
self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}
fn plan_remove_chunk(
&self,
key_prefix: u64,
source: &[u8],
sub_index: usize,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let chunk_index = self.chunk_index(key_prefix);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
let i = sub_index;
let entry = Self::read_entry(&chunk, i);
if !entry.is_empty() && entry.partial_key(self.id.index_bits()) == partial_key {
let new_entry = Entry::empty();
Self::write_entry(&new_entry, i, &mut chunk);
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log::trace!(target: "parity-db", "{}: Removed at {}.{}", self.id, chunk_index, i);
return Ok(PlanOutcome::Written)
}
Ok(PlanOutcome::Skipped)
}
pub fn write_remove_plan(
&self,
key: &Key,
sub_index: usize,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
log::trace!(target: "parity-db", "{}: Removing {}", self.id, hex(key));
let key_prefix = TableKey::index_from_partial(key);
let chunk_index = self.chunk_index(key_prefix);
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_remove_chunk(key_prefix, &chunk, sub_index, log)
}
if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
return self.plan_remove_chunk(key_prefix, chunk, sub_index, log)
}
Ok(PlanOutcome::Skipped)
}
pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
let mut map = self.map.upgradable_read();
if map.is_none() {
let mut wmap = RwLockUpgradableReadGuard::upgrade(map);
let file = try_io!(std::fs::OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(self.path.as_path()));
log::debug!(target: "parity-db", "Created new index {}", self.id);
try_io!(file.set_len(file_size(self.id.index_bits())));
let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
self.madvise_random(&mut mmap);
*wmap = Some(mmap);
map = RwLockWriteGuard::downgrade_to_upgradable(wmap);
}
let map = map.as_ref().unwrap();
let offset = META_SIZE + index as usize * CHUNK_LEN;
let ptr: *mut u8 = map.as_ptr() as *mut u8;
let chunk: &mut [u8] = unsafe {
let ptr = ptr.add(offset);
std::slice::from_raw_parts_mut(ptr, CHUNK_LEN)
};
let mut mask_buf = [0u8; 8];
log.read(&mut mask_buf)?;
let mut mask = u64::from_le_bytes(mask_buf);
while mask != 0 {
let i = mask.trailing_zeros();
mask &= !(1 << i);
log.read(try_io!(Ok(
&mut chunk[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES]
)))?;
}
log::trace!(target: "parity-db", "{}: Enacted chunk {}", self.id, index);
Ok(())
}
pub fn validate_plan(&self, index: u64, log: &mut LogReader) -> Result<()> {
if index >= self.id.total_entries() {
return Err(Error::Corruption("Bad index".into()))
}
let mut buf = [0u8; 8];
log.read(&mut buf)?;
let mut mask = u64::from_le_bytes(buf);
while mask != 0 {
let i = mask.trailing_zeros();
mask &= !(1 << i);
log.read(&mut buf[..])?;
}
log::trace!(target: "parity-db", "{}: Validated chunk {}", self.id, index);
Ok(())
}
pub fn skip_plan(log: &mut LogReader) -> Result<()> {
let mut buf = [0u8; 8];
log.read(&mut buf)?;
let mut mask = u64::from_le_bytes(buf);
while mask != 0 {
let i = mask.trailing_zeros();
mask &= !(1 << i);
log.read(&mut buf[..])?;
}
Ok(())
}
pub fn drop_file(self) -> Result<()> {
drop(self.map);
try_io!(std::fs::remove_file(self.path.as_path()));
log::debug!(target: "parity-db", "{}: Dropped table", self.id);
Ok(())
}
pub fn flush(&self) -> Result<()> {
if let Some(map) = &*self.map.read() {
try_io!(map.flush_range(META_SIZE, map.len() - META_SIZE));
}
Ok(())
}
#[cfg(unix)]
fn madvise_random(&self, map: &mut memmap2::MmapMut) {
unsafe {
libc::madvise(
map.as_mut_ptr() as _,
file_size(self.id.index_bits()) as usize,
libc::MADV_RANDOM,
);
}
}
#[cfg(not(unix))]
fn madvise_random(&self, _map: &mut memmap2::MmapMut) {}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_entries() {
let mut chunk = IndexTable::transmute_chunk(EMPTY_CHUNK);
let mut chunk2 = EMPTY_CHUNK;
for (i, chunk) in chunk.iter_mut().enumerate().take(CHUNK_ENTRIES) {
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
let mut hasher = DefaultHasher::new();
i.hash(&mut hasher);
let hash = hasher.finish();
let entry = Entry::from_u64(hash);
IndexTable::write_entry(&entry, i, &mut chunk2);
*chunk = entry;
}
assert!(IndexTable::transmute_chunk(chunk2) == chunk);
}
}