use crate::{
error::{try_io, Result},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
table::TableId,
};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(target_os = "linux")]
fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
let err = unsafe { libc::posix_fadvise(file.as_raw_fd(), 0, 0, libc::POSIX_FADV_RANDOM) };
if err != 0 {
Err(std::io::Error::from_raw_os_error(err))
} else {
Ok(())
}
}
#[cfg(target_os = "macos")]
fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_RDAHEAD, 0) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> {
Ok(())
}
#[cfg(target_os = "macos")]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fsync(file.as_raw_fd()) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(not(target_os = "macos"))]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
file.sync_data()
}
const GROW_SIZE_BYTES: u64 = 256 * 1024;
#[derive(Debug)]
pub struct TableFile {
pub file: RwLock<Option<std::fs::File>>,
pub path: std::path::PathBuf,
pub capacity: AtomicU64,
pub dirty: AtomicBool,
pub id: TableId,
}
impl TableFile {
pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result<Self> {
let mut capacity = 0u64;
let file = if std::fs::metadata(&filepath).is_ok() {
let file = try_io!(std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(filepath.as_path()));
try_io!(disable_read_ahead(&file));
let len = try_io!(file.metadata()).len();
if len == 0 {
capacity += GROW_SIZE_BYTES / entry_size as u64;
try_io!(file.set_len(capacity * entry_size as u64));
} else {
capacity = len / entry_size as u64;
}
Some(file)
} else {
None
};
Ok(TableFile {
path: filepath,
file: RwLock::new(file),
capacity: AtomicU64::new(capacity),
dirty: AtomicBool::new(false),
id,
})
}
fn create_file(&self) -> Result<std::fs::File> {
log::debug!(target: "parity-db", "Created value table {}", self.id);
let file = try_io!(std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(self.path.as_path()));
try_io!(disable_read_ahead(&file));
Ok(file)
}
#[cfg(unix)]
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self.file.read().as_ref().unwrap().read_exact_at(buf, offset));
Ok(())
}
#[cfg(unix)]
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
self.dirty.store(true, Ordering::Relaxed);
try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset));
Ok(())
}
#[cfg(windows)]
pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};
let file = self.file.read();
let file = file.as_ref().unwrap();
while !buf.is_empty() {
match file.seek_read(buf, offset) {
Ok(0) => break,
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
},
Err(e) => return Err(Error::Io(e)),
}
}
if !buf.is_empty() {
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)))
} else {
Ok(())
}
}
#[cfg(windows)]
pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};
self.dirty.store(true, Ordering::Relaxed);
let file = self.file.read();
let file = file.as_ref().unwrap();
while !buf.is_empty() {
match file.seek_write(buf, offset) {
Ok(0) =>
return Err(Error::Io(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
))),
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
},
Err(e) => return Err(Error::Io(e)),
}
}
Ok(())
}
pub fn grow(&self, entry_size: u16) -> Result<()> {
let mut capacity = self.capacity.load(Ordering::Relaxed);
capacity += GROW_SIZE_BYTES / entry_size as u64;
self.capacity.store(capacity, Ordering::Relaxed);
let mut file = self.file.upgradable_read();
if file.is_none() {
let mut wfile = RwLockUpgradableReadGuard::upgrade(file);
*wfile = Some(self.create_file()?);
file = RwLockWriteGuard::downgrade_to_upgradable(wfile);
}
try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64));
Ok(())
}
pub fn flush(&self) -> Result<()> {
if let Ok(true) =
self.dirty.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
{
if let Some(file) = self.file.read().as_ref() {
try_io!(fsync(file));
}
}
Ok(())
}
pub fn remove(&self) -> Result<()> {
let mut file = self.file.write();
if let Some(file) = file.take() {
drop(file);
try_io!(std::fs::remove_file(&self.path));
}
Ok(())
}
}