mod iter;
mod stats;
use std::{
cmp,
collections::HashMap,
error, io,
path::{Path, PathBuf},
};
use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
};
use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
#[cfg(target_os = "linux")]
use regex::Regex;
#[cfg(target_os = "linux")]
use std::fs::File;
#[cfg(target_os = "linux")]
use std::process::Command;
fn other_io_err<E>(e: E) -> io::Error
where
E: Into<Box<dyn error::Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::Other, e)
}
fn invalid_column(col: u32) -> io::Error {
other_io_err(format!("No such column family: {:?}", col))
}
type MiB = usize;
const KB: usize = 1_024;
const MB: usize = 1_024 * KB;
pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile {
pub initial_file_size: u64,
pub block_size: usize,
}
impl Default for CompactionProfile {
fn default() -> CompactionProfile {
CompactionProfile::ssd()
}
}
#[cfg(target_os = "linux")]
pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
use std::str;
str::from_utf8(df_out.as_slice())
.ok()
.and_then(|df_str| {
Regex::new(r"/dev/(sd[:alpha:]{1,2})")
.ok()
.and_then(|re| re.captures(df_str))
.and_then(|captures| captures.get(1))
})
.map(|drive_path| {
let mut p = PathBuf::from("/sys/block");
p.push(drive_path.as_str());
p.push("queue/rotational");
p
})
}
impl CompactionProfile {
#[cfg(target_os = "linux")]
pub fn auto<P: AsRef<Path>>(db_path: P) -> CompactionProfile {
use std::io::Read;
let hdd_check_file = db_path
.as_ref()
.to_str()
.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
.and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
.and_then(rotational_from_df_output);
if let Some(hdd_check) = hdd_check_file {
if let Ok(mut file) = File::open(hdd_check.as_path()) {
let mut buffer = [0; 1];
if file.read_exact(&mut buffer).is_ok() {
if buffer == [48] {
return Self::ssd()
}
if buffer == [49] {
return Self::hdd()
}
}
}
}
Self::default()
}
#[cfg(not(target_os = "linux"))]
pub fn auto<P: AsRef<Path>>(_db_path: P) -> CompactionProfile {
Self::default()
}
pub fn ssd() -> CompactionProfile {
CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
}
pub fn hdd() -> CompactionProfile {
CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
}
}
#[derive(Clone)]
#[non_exhaustive]
pub struct DatabaseConfig {
pub max_open_files: i32,
pub memory_budget: HashMap<u32, MiB>,
pub compaction: CompactionProfile,
pub columns: u32,
pub keep_log_file_num: i32,
pub enable_statistics: bool,
pub secondary: Option<PathBuf>,
pub max_total_wal_size: Option<u64>,
pub create_if_missing: bool,
}
impl DatabaseConfig {
pub fn with_columns(columns: u32) -> Self {
assert!(columns > 0, "the number of columns must not be zero");
Self { columns, ..Default::default() }
}
pub fn memory_budget(&self) -> MiB {
(0..self.columns)
.map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
.sum()
}
fn memory_budget_for_col(&self, col: u32) -> MiB {
self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
}
fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
let column_mem_budget = self.memory_budget_for_col(col);
let mut opts = Options::default();
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_block_based_table_factory(block_opts);
opts.optimize_level_style_compaction(column_mem_budget);
opts.set_target_file_size_base(self.compaction.initial_file_size);
opts.set_compression_per_level(&[]);
opts
}
}
impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
max_open_files: 512,
memory_budget: HashMap::new(),
compaction: CompactionProfile::default(),
columns: 1,
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
max_total_wal_size: None,
create_if_missing: true,
}
}
}
struct DBAndColumns {
db: DB,
column_names: Vec<String>,
}
impl DBAndColumns {
fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
self.db
.cf_handle(&name)
.ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
}
}
pub struct Database {
inner: DBAndColumns,
config: DatabaseConfig,
opts: Options,
write_opts: WriteOptions,
read_opts: ReadOptions,
block_opts: BlockBasedOptions,
stats: stats::RunningDbStats,
}
fn generate_options(config: &DatabaseConfig) -> Options {
let mut opts = Options::default();
opts.set_report_bg_io_stats(true);
if config.enable_statistics {
opts.enable_statistics();
}
opts.set_use_fsync(false);
opts.create_if_missing(config.create_if_missing);
if config.secondary.is_some() {
opts.set_max_open_files(-1)
} else {
opts.set_max_open_files(config.max_open_files);
}
opts.set_bytes_per_sync(1 * MB as u64);
opts.set_keep_log_file_num(1);
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
if let Some(m) = config.max_total_wal_size {
opts.set_max_total_wal_size(m);
}
opts
}
fn generate_read_options() -> ReadOptions {
let mut read_opts = ReadOptions::default();
read_opts.set_verify_checksums(false);
read_opts
}
fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(config.compaction.block_size);
block_opts.set_format_version(5);
block_opts.set_block_restart_interval(16);
let cache_size = config.memory_budget() / 3;
if cache_size == 0 {
block_opts.disable_cache()
} else {
let cache = rocksdb::Cache::new_lru_cache(cache_size).map_err(other_io_err)?;
block_opts.set_block_cache(&cache);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
}
block_opts.set_bloom_filter(10.0, true);
Ok(block_opts)
}
impl Database {
pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
assert!(config.columns > 0, "the number of columns must not be zero");
let opts = generate_options(config);
let block_opts = generate_block_based_options(config)?;
let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
let write_opts = WriteOptions::default();
let read_opts = generate_read_options();
let db = if let Some(secondary_path) = &config.secondary {
Self::open_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names.as_slice())?
} else {
let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
};
Ok(Database {
inner: DBAndColumns { db, column_names },
config: config.clone(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}
fn open_primary<P: AsRef<Path>>(
opts: &Options,
path: P,
config: &DatabaseConfig,
column_names: &[&str],
block_opts: &BlockBasedOptions,
) -> io::Result<rocksdb::DB> {
let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
.collect();
let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
Err(_) => {
match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
Ok(mut db) => {
for (i, name) in column_names.iter().enumerate() {
let _ = db
.create_cf(name, &config.column_config(&block_opts, i as u32))
.map_err(other_io_err)?;
}
Ok(db)
},
err => err,
}
},
ok => ok,
};
Ok(match db {
Ok(db) => db,
Err(s) => return Err(other_io_err(s)),
})
}
fn open_secondary<P: AsRef<Path>>(
opts: &Options,
path: P,
secondary_path: P,
column_names: &[String],
) -> io::Result<rocksdb::DB> {
let db = DB::open_cf_as_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names);
Ok(match db {
Ok(db) => db,
Err(s) => return Err(other_io_err(s)),
})
}
pub fn transaction(&self) -> DBTransaction {
DBTransaction::new()
}
pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
let cfs = &self.inner;
let mut batch = WriteBatch::default();
let ops = tr.ops;
self.stats.tally_writes(ops.len() as u64);
self.stats.tally_transactions(1);
let mut stats_total_bytes = 0;
for op in ops {
let col = op.col();
let cf = cfs.cf(col as usize)?;
match op {
DBOp::Insert { col: _, key, value } => {
stats_total_bytes += key.len() + value.len();
batch.put_cf(cf, &key, &value);
},
DBOp::Delete { col: _, key } => {
stats_total_bytes += key.len();
batch.delete_cf(cf, &key);
},
DBOp::DeletePrefix { col, prefix } => {
let end_prefix = kvdb::end_prefix(&prefix[..]);
let no_end = end_prefix.is_none();
let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
if no_end {
let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
for result in self.iter_with_prefix(col, prefix) {
let (key, _) = result?;
batch.delete_cf(cf, &key[..]);
}
}
},
};
}
self.stats.tally_bytes_written(stats_total_bytes as u64);
cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
}
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
let cfs = &self.inner;
let cf = cfs.cf(col as usize)?;
self.stats.tally_reads(1);
let value = cfs
.db
.get_pinned_cf_opt(cf, key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);
match value {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {},
};
value
}
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
self.iter_with_prefix(col, prefix)
.next()
.transpose()
.map(|m| m.map(|(_k, v)| v))
}
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
let read_opts = generate_read_options();
iter::IterationHandler::iter(&self.inner, col, read_opts)
}
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
let mut read_opts = generate_read_options();
if let Some(end_prefix) = kvdb::end_prefix(prefix) {
read_opts.set_iterate_upper_bound(end_prefix);
}
iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
}
pub fn num_columns(&self) -> u32 {
self.inner.column_names.len() as u32
}
pub fn num_keys(&self, col: u32) -> io::Result<u64> {
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
let cfs = &self.inner;
let cf = cfs.cf(col as usize)?;
match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
Ok(estimate) => Ok(estimate.unwrap_or_default()),
Err(err_string) => Err(other_io_err(err_string)),
}
}
pub fn remove_last_column(&mut self) -> io::Result<()> {
let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
if let Some(name) = column_names.pop() {
db.drop_cf(&name).map_err(other_io_err)?;
}
Ok(())
}
pub fn add_column(&mut self) -> io::Result<()> {
let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
let col = column_names.len() as u32;
let name = format!("col{}", col);
let col_config = self.config.column_config(&self.block_opts, col as u32);
let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
column_names.push(name);
Ok(())
}
pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
if let Some(stats) = self.opts.get_statistics() {
stats::parse_rocksdb_stats(&stats)
} else {
HashMap::new()
}
}
pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
}
}
impl KeyValueDB for Database {
fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
Database::get(self, col, key)
}
fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
Database::get_by_prefix(self, col, prefix)
}
fn write(&self, transaction: DBTransaction) -> io::Result<()> {
Database::write(self, transaction)
}
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed.into_iter())
}
fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
let unboxed = Database::iter_with_prefix(self, col, prefix);
Box::new(unboxed.into_iter())
}
fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
let rocksdb_stats = self.get_statistics();
let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
let overall_stats = self.stats.overall();
let old_cache_hit_count = overall_stats.raw.cache_hit_count;
self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
let taken_stats = match kind {
kvdb::IoStatsKind::Overall => self.stats.overall(),
kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
};
let mut stats = kvdb::IoStats::empty();
stats.reads = taken_stats.raw.reads;
stats.writes = taken_stats.raw.writes;
stats.transactions = taken_stats.raw.transactions;
stats.bytes_written = taken_stats.raw.bytes_written;
stats.bytes_read = taken_stats.raw.bytes_read;
stats.cache_reads = taken_stats.raw.cache_hit_count;
stats.started = taken_stats.started;
stats.span = taken_stats.started.elapsed();
stats
}
}
#[cfg(test)]
mod tests {
use super::*;
use kvdb_shared_tests as st;
use std::io::{self, Read};
use tempfile::Builder as TempfileBuilder;
fn create(columns: u32) -> io::Result<Database> {
let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
let config = DatabaseConfig::with_columns(columns);
Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
}
#[test]
fn get_fails_with_non_existing_column() -> io::Result<()> {
let db = create(1)?;
st::test_get_fails_with_non_existing_column(&db)
}
#[test]
fn put_and_get() -> io::Result<()> {
let db = create(1)?;
st::test_put_and_get(&db)
}
#[test]
fn delete_and_get() -> io::Result<()> {
let db = create(1)?;
st::test_delete_and_get(&db)
}
#[test]
fn delete_prefix() -> io::Result<()> {
let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
st::test_delete_prefix(&db)
}
#[test]
fn iter() -> io::Result<()> {
let db = create(1)?;
st::test_iter(&db)
}
#[test]
fn iter_with_prefix() -> io::Result<()> {
let db = create(1)?;
st::test_iter_with_prefix(&db)
}
#[test]
fn complex() -> io::Result<()> {
let db = create(1)?;
st::test_complex(&db)
}
#[test]
fn stats() -> io::Result<()> {
let db = create(st::IO_STATS_NUM_COLUMNS)?;
st::test_io_stats(&db)
}
#[test]
fn secondary_db_get() -> io::Result<()> {
let primary = TempfileBuilder::new().prefix("").tempdir()?;
let secondary = TempfileBuilder::new().prefix("").tempdir()?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path()).unwrap();
let key1 = b"key1";
let mut transaction = db.transaction();
transaction.put(0, key1, b"horse");
db.write(transaction)?;
let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
let second_db = Database::open(&config, primary.path()).unwrap();
assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
Ok(())
}
#[test]
fn secondary_db_catch_up() -> io::Result<()> {
let primary = TempfileBuilder::new().prefix("").tempdir()?;
let secondary = TempfileBuilder::new().prefix("").tempdir()?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path()).unwrap();
let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
let second_db = Database::open(&config, primary.path()).unwrap();
let mut transaction = db.transaction();
transaction.put(0, b"key1", b"mule");
transaction.put(0, b"key2", b"cat");
db.write(transaction)?;
second_db.try_catch_up_with_primary()?;
assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
Ok(())
}
#[test]
#[cfg(target_os = "linux")]
fn df_to_rotational() {
use std::path::PathBuf;
let example_df = vec![
70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
];
let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
assert_eq!(rotational_from_df_output(example_df), expected_output);
}
#[test]
#[should_panic]
fn db_config_with_zero_columns() {
let _cfg = DatabaseConfig::with_columns(0);
}
#[test]
#[should_panic]
fn open_db_with_zero_columns() {
let cfg = DatabaseConfig { columns: 0, ..Default::default() };
let _db = Database::open(&cfg, "");
}
#[test]
fn add_columns() {
let config_1 = DatabaseConfig::default();
let config_5 = DatabaseConfig::with_columns(5);
let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
{
let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 1);
for i in 2..=5 {
db.add_column().unwrap();
assert_eq!(db.num_columns(), i);
}
}
{
let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 5);
}
}
#[test]
fn remove_columns() {
let config_1 = DatabaseConfig::default();
let config_5 = DatabaseConfig::with_columns(5);
let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
{
let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns");
assert_eq!(db.num_columns(), 5);
for i in (1..5).rev() {
db.remove_last_column().unwrap();
assert_eq!(db.num_columns(), i);
}
}
{
let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 1);
}
}
#[test]
fn test_num_keys() {
let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, tempdir.path()).unwrap();
assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
let key1 = b"beef";
let mut batch = db.transaction();
batch.put(0, key1, key1);
db.write(batch).unwrap();
assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
}
#[test]
fn default_memory_budget() {
let c = DatabaseConfig::default();
assert_eq!(c.columns, 1);
assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
assert_eq!(
c.memory_budget_for_col(0),
DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
"total memory budget for column 0 is the default"
);
assert_eq!(
c.memory_budget_for_col(999),
DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
"total memory budget for any column is the default"
);
}
#[test]
fn memory_budget() {
let mut c = DatabaseConfig::with_columns(3);
c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
}
#[test]
fn test_stats_parser() {
let raw = r#"rocksdb.row.cache.hit COUNT : 1
rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
"#;
let stats = stats::parse_rocksdb_stats(raw);
assert_eq!(stats["row.cache.hit"].count, 1);
assert!(stats["row.cache.hit"].times.is_none());
assert_eq!(stats["db.get.micros"].count, 0);
let get_times = stats["db.get.micros"].times.unwrap();
assert_eq!(get_times.sum, 15);
assert_eq!(get_times.p50, 2.0);
assert_eq!(get_times.p95, 3.0);
assert_eq!(get_times.p99, 4.0);
assert_eq!(get_times.p100, 5.0);
}
#[test]
fn rocksdb_settings() {
const NUM_COLS: usize = 2;
let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
cfg.max_open_files = 123; cfg.compaction.block_size = 323232;
cfg.compaction.initial_file_size = 102030;
cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
let db_path = TempfileBuilder::new()
.prefix("config_test")
.tempdir()
.expect("the OS can create tmp dirs");
let db = Database::open(&cfg, db_path.path()).expect("can open a db");
let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
.expect("rocksdb creates a LOG file");
let mut settings = String::new();
let statistics = db.get_statistics();
assert!(statistics.contains_key("block.cache.hit"));
rocksdb_log.read_to_string(&mut settings).unwrap();
assert!(settings.contains("Options for column family [default]"), "no default col");
assert!(settings.contains("Options for column family [col0]"), "no col0");
assert!(settings.contains("Options for column family [col1]"), "no col1");
assert!(settings.contains("max_open_files: 123"));
assert!(settings.contains(" block_size: 323232"));
assert!(settings.contains("block_cache_options:\n capacity : 8388608"));
let lru_size = (330 * MB) / 3;
let needle = format!("block_cache_options:\n capacity : {}", lru_size);
let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
assert_eq!(lru, NUM_COLS);
let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
assert_eq!(include_indexes, NUM_COLS);
let pins = settings
.matches("pin_l0_filter_and_index_blocks_in_cache: 1")
.collect::<Vec<_>>()
.len();
assert_eq!(pins, NUM_COLS);
let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
assert_eq!(l0_sizes, NUM_COLS);
assert!(settings.contains("target_file_size_base: 67108864"));
let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
assert_eq!(snappy_compression, NUM_COLS + 1);
let snappy_bottommost = settings
.matches("Options.bottommost_compression: Disabled")
.collect::<Vec<_>>()
.len();
assert_eq!(snappy_bottommost, NUM_COLS + 1);
let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
assert_eq!(levels, NUM_COLS + 1);
assert!(settings.contains("Options.use_fsync: 0"));
assert!(settings.contains("format_version: 5"));
}
}