use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
use libc::{c_char, c_int};
use crate::{
db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
};
#[cfg(not(feature = "multi-threaded-cf"))]
pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
DBCommon<T, OptimisticTransactionDBInner>;
#[cfg(feature = "multi-threaded-cf")]
pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
DBCommon<T, OptimisticTransactionDBInner>;
pub struct OptimisticTransactionDBInner {
base: *mut ffi::rocksdb_t,
db: *mut ffi::rocksdb_optimistictransactiondb_t,
}
impl DBInner for OptimisticTransactionDBInner {
fn inner(&self) -> *mut ffi::rocksdb_t {
self.base
}
}
impl Drop for OptimisticTransactionDBInner {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
ffi::rocksdb_optimistictransactiondb_close(self.db);
}
}
}
impl<T: ThreadMode> OptimisticTransactionDB<T> {
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
Self::open(&opts, path)
}
pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
Self::open_cf(opts, path, None::<&str>)
}
pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: AsRef<str>,
{
let cfs = cfs
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
Self::open_cf_descriptors_internal(opts, path, cfs)
}
pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
Self::open_cf_descriptors_internal(opts, path, cfs)
}
fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
let cfs: Vec<_> = cfs.into_iter().collect();
let outlive = iter::once(opts.outlive.clone())
.chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
.collect();
let cpath = to_cpath(&path)?;
if let Err(e) = fs::create_dir_all(&path) {
return Err(Error::new(format!(
"Failed to create RocksDB directory: `{:?}`.",
e
)));
}
let db: *mut ffi::rocksdb_optimistictransactiondb_t;
let mut cf_map = BTreeMap::new();
if cfs.is_empty() {
db = Self::open_raw(opts, &cpath)?;
} else {
let mut cfs_v = cfs;
if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
cfs_v.push(ColumnFamilyDescriptor {
name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
options: Options::default(),
});
}
let c_cfs: Vec<CString> = cfs_v
.iter()
.map(|cf| CString::new(cf.name.as_bytes()).unwrap())
.collect();
let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
let cfopts: Vec<_> = cfs_v
.iter()
.map(|cf| cf.options.inner as *const _)
.collect();
db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
for handle in &cfhandles {
if handle.is_null() {
return Err(Error::new(
"Received null column family handle from DB.".to_owned(),
));
}
}
for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
cf_map.insert(cf_desc.name.clone(), inner);
}
}
if db.is_null() {
return Err(Error::new("Could not initialize database.".to_owned()));
}
let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
if base.is_null() {
unsafe {
ffi::rocksdb_optimistictransactiondb_close(db);
}
return Err(Error::new("Could not initialize database.".to_owned()));
}
let inner = OptimisticTransactionDBInner { base, db };
Ok(Self::new(
inner,
T::new_cf_map_internal(cf_map),
path.as_ref().to_path_buf(),
outlive,
))
}
fn open_raw(
opts: &Options,
cpath: &CString,
) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
unsafe {
let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
opts.inner,
cpath.as_ptr()
));
Ok(db)
}
}
fn open_cf_raw(
opts: &Options,
cpath: &CString,
cfs_v: &[ColumnFamilyDescriptor],
cfnames: &[*const c_char],
cfopts: &[*const ffi::rocksdb_options_t],
cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
unsafe {
let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
));
Ok(db)
}
}
pub fn transaction(&self) -> Transaction<Self> {
self.transaction_opt(
&WriteOptions::default(),
&OptimisticTransactionOptions::default(),
)
}
pub fn transaction_opt(
&self,
writeopts: &WriteOptions,
otxn_opts: &OptimisticTransactionOptions,
) -> Transaction<Self> {
Transaction {
inner: unsafe {
ffi::rocksdb_optimistictransaction_begin(
self.inner.db,
writeopts.inner,
otxn_opts.inner,
std::ptr::null_mut(),
)
},
_marker: PhantomData::default(),
}
}
pub fn write_opt(
&self,
batch: WriteBatchWithTransaction<true>,
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
self.inner.db,
writeopts.inner,
batch.inner
));
}
Ok(())
}
pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
self.write_opt(batch, &WriteOptions::default())
}
pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
let mut wo = WriteOptions::new();
wo.disable_wal(true);
self.write_opt(batch, &wo)
}
}