use crate::{column::ColId, table::SIZE_TIERS};
use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
use std::{
io::{Cursor, Read, Write},
iter,
};
const HISTOGRAM_BUCKETS: usize = 1024;
const HISTOGRAM_BUCKET_BITS: u8 = 5;
pub const TOTAL_SIZE: usize =
4 * HISTOGRAM_BUCKETS + 8 * HISTOGRAM_BUCKETS + 8 * SIZE_TIERS + 8 * 13;
#[derive(Debug)]
pub struct ColumnStats {
value_histogram: Vec<AtomicU32>,
query_histogram: Vec<AtomicU64>, oversized: AtomicU64,
oversized_bytes: AtomicU64,
total_values: AtomicU64,
total_bytes: AtomicU64,
commits: AtomicU64,
inserted_new: AtomicU64,
inserted_overwrite: AtomicU64,
reference_increase_hit: AtomicU64,
reference_increase_miss: AtomicU64,
removed_hit: AtomicU64,
removed_miss: AtomicU64,
queries_miss: AtomicU64,
uncompressed_bytes: AtomicU64,
compression_delta: Vec<AtomicI64>,
}
pub struct StatSummary {
pub columns: Vec<Option<ColumnStatSummary>>,
}
pub struct ColumnStatSummary {
pub total_values: u64,
pub total_bytes: u64,
pub uncompressed_bytes: u64,
}
fn read_u32(cursor: &mut Cursor<&[u8]>) -> AtomicU32 {
let mut buf = [0u8; 4];
cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
AtomicU32::new(u32::from_le_bytes(buf))
}
fn read_u64(cursor: &mut Cursor<&[u8]>) -> AtomicU64 {
let mut buf = [0u8; 8];
cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
AtomicU64::new(u64::from_le_bytes(buf))
}
fn read_i64(cursor: &mut Cursor<&[u8]>) -> AtomicI64 {
let mut buf = [0u8; 8];
cursor.read_exact(&mut buf).expect("Incorrect stats buffer");
AtomicI64::new(i64::from_le_bytes(buf))
}
fn write_u32(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU32) {
cursor
.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
.expect("Incorrect stats buffer");
}
fn write_u64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicU64) {
cursor
.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
.expect("Incorrect stats buffer");
}
fn write_i64(cursor: &mut Cursor<&mut [u8]>, val: &AtomicI64) {
cursor
.write_all(&val.load(Ordering::Relaxed).to_le_bytes())
.expect("Incorrect stats buffer");
}
fn value_histogram_index(size: u32) -> Option<usize> {
let bucket = size as usize >> HISTOGRAM_BUCKET_BITS;
if bucket < HISTOGRAM_BUCKETS {
Some(bucket)
} else {
None
}
}
impl ColumnStats {
pub fn from_slice(data: &[u8]) -> ColumnStats {
let mut cursor = Cursor::new(data);
let cursor = &mut cursor;
let value_histogram =
iter::repeat_with(|| read_u32(cursor)).take(HISTOGRAM_BUCKETS).collect();
let query_histogram = iter::repeat_with(|| read_u64(cursor)).take(SIZE_TIERS).collect();
let oversized = read_u64(cursor);
let oversized_bytes = read_u64(cursor);
let total_values = read_u64(cursor);
let total_bytes = read_u64(cursor);
let commits = read_u64(cursor);
let inserted_new = read_u64(cursor);
let inserted_overwrite = read_u64(cursor);
let removed_hit = read_u64(cursor);
let removed_miss = read_u64(cursor);
let queries_miss = read_u64(cursor);
let uncompressed_bytes = read_u64(cursor);
let compression_delta =
iter::repeat_with(|| read_i64(cursor)).take(HISTOGRAM_BUCKETS).collect();
let reference_increase_hit = read_u64(cursor);
let reference_increase_miss = read_u64(cursor);
ColumnStats {
value_histogram,
query_histogram,
oversized,
oversized_bytes,
total_values,
total_bytes,
commits,
inserted_new,
inserted_overwrite,
reference_increase_hit,
reference_increase_miss,
removed_hit,
removed_miss,
queries_miss,
uncompressed_bytes,
compression_delta,
}
}
pub fn empty() -> ColumnStats {
ColumnStats {
value_histogram: iter::repeat_with(Default::default).take(HISTOGRAM_BUCKETS).collect(),
query_histogram: iter::repeat_with(Default::default).take(SIZE_TIERS).collect(),
oversized: Default::default(),
oversized_bytes: Default::default(),
total_values: Default::default(),
total_bytes: Default::default(),
commits: Default::default(),
inserted_new: Default::default(),
inserted_overwrite: Default::default(),
reference_increase_hit: Default::default(),
reference_increase_miss: Default::default(),
removed_hit: Default::default(),
removed_miss: Default::default(),
queries_miss: Default::default(),
uncompressed_bytes: Default::default(),
compression_delta: iter::repeat_with(Default::default)
.take(HISTOGRAM_BUCKETS)
.collect(),
}
}
pub fn clear(&self) {
for v in &self.value_histogram {
v.store(0, Ordering::Relaxed)
}
for v in &self.query_histogram {
v.store(0, Ordering::Relaxed)
}
self.oversized.store(0, Ordering::Relaxed);
self.oversized_bytes.store(0, Ordering::Relaxed);
self.total_values.store(0, Ordering::Relaxed);
self.total_bytes.store(0, Ordering::Relaxed);
self.commits.store(0, Ordering::Relaxed);
self.inserted_new.store(0, Ordering::Relaxed);
self.inserted_overwrite.store(0, Ordering::Relaxed);
self.reference_increase_hit.store(0, Ordering::Relaxed);
self.reference_increase_miss.store(0, Ordering::Relaxed);
self.removed_hit.store(0, Ordering::Relaxed);
self.removed_miss.store(0, Ordering::Relaxed);
self.queries_miss.store(0, Ordering::Relaxed);
self.uncompressed_bytes.store(0, Ordering::Relaxed);
for v in &self.compression_delta {
v.store(0, Ordering::Relaxed)
}
}
pub fn summary(&self) -> ColumnStatSummary {
ColumnStatSummary {
total_values: self.total_values.load(Ordering::Relaxed),
total_bytes: self.total_bytes.load(Ordering::Relaxed),
uncompressed_bytes: self.uncompressed_bytes.load(Ordering::Relaxed),
}
}
pub fn to_slice(&self, data: &mut [u8]) {
let mut cursor = Cursor::new(data);
for item in &self.value_histogram {
write_u32(&mut cursor, item);
}
for item in &self.query_histogram {
write_u64(&mut cursor, item);
}
write_u64(&mut cursor, &self.oversized);
write_u64(&mut cursor, &self.oversized_bytes);
write_u64(&mut cursor, &self.total_values);
write_u64(&mut cursor, &self.total_bytes);
write_u64(&mut cursor, &self.commits);
write_u64(&mut cursor, &self.inserted_new);
write_u64(&mut cursor, &self.inserted_overwrite);
write_u64(&mut cursor, &self.removed_hit);
write_u64(&mut cursor, &self.removed_miss);
write_u64(&mut cursor, &self.queries_miss);
write_u64(&mut cursor, &self.uncompressed_bytes);
for item in &self.compression_delta {
write_i64(&mut cursor, item);
}
write_u64(&mut cursor, &self.reference_increase_hit);
write_u64(&mut cursor, &self.reference_increase_miss);
}
pub fn write_stats_text(&self, writer: &mut impl Write, col: ColId) -> std::io::Result<()> {
writeln!(writer, "Column {}", col)?;
writeln!(writer, "Total values: {}", self.total_values.load(Ordering::Relaxed))?;
writeln!(writer, "Total bytes: {}", self.total_bytes.load(Ordering::Relaxed))?;
writeln!(writer, "Total oversized values: {}", self.oversized.load(Ordering::Relaxed))?;
writeln!(
writer,
"Total oversized bytes: {}",
self.oversized_bytes.load(Ordering::Relaxed)
)?;
writeln!(writer, "Total commits: {}", self.commits.load(Ordering::Relaxed))?;
writeln!(writer, "New value insertions: {}", self.inserted_new.load(Ordering::Relaxed))?;
writeln!(
writer,
"Existing value insertions: {}",
self.inserted_overwrite.load(Ordering::Relaxed)
)?;
writeln!(
writer,
"Reference increases: {}",
self.reference_increase_hit.load(Ordering::Relaxed)
)?;
writeln!(
writer,
"Missed reference increases: {}",
self.reference_increase_miss.load(Ordering::Relaxed)
)?;
writeln!(writer, "Removals: {}", self.removed_hit.load(Ordering::Relaxed))?;
writeln!(writer, "Missed removals: {}", self.removed_miss.load(Ordering::Relaxed))?;
writeln!(
writer,
"Uncompressed bytes: {}",
self.uncompressed_bytes.load(Ordering::Relaxed)
)?;
writeln!(writer, "Compression deltas:")?;
for i in 0..HISTOGRAM_BUCKETS {
let count = self.value_histogram[i].load(Ordering::Relaxed);
let delta = self.compression_delta[i].load(Ordering::Relaxed);
if count != 0 && delta != 0 {
writeln!(
writer,
" {}-{}: {}",
i << HISTOGRAM_BUCKET_BITS,
(((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
delta
)?;
}
}
write!(writer, "Queries per size tier: [")?;
for i in 0..SIZE_TIERS {
if i == SIZE_TIERS - 1 {
writeln!(writer, "{}]", self.query_histogram[i].load(Ordering::Relaxed))?;
} else {
write!(writer, "{}, ", self.query_histogram[i].load(Ordering::Relaxed))?;
}
}
writeln!(writer, "Missed queries: {}", self.queries_miss.load(Ordering::Relaxed))?;
writeln!(writer, "Value histogram:")?;
for i in 0..HISTOGRAM_BUCKETS {
let count = self.value_histogram[i].load(Ordering::Relaxed);
if count != 0 {
writeln!(
writer,
" {}-{}: {}",
i << HISTOGRAM_BUCKET_BITS,
(((i + 1) << HISTOGRAM_BUCKET_BITS) - 1),
count
)?;
}
}
writeln!(writer)?;
Ok(())
}
pub fn query_hit(&self, size_tier: u8) {
self.query_histogram[size_tier as usize].fetch_add(1, Ordering::Relaxed);
}
pub fn query_miss(&self) {
self.queries_miss.fetch_add(1, Ordering::Relaxed);
}
pub fn insert(&self, size: u32, compressed: u32) {
if let Some(index) = value_histogram_index(size) {
self.value_histogram[index].fetch_add(1, Ordering::Relaxed);
self.compression_delta[index]
.fetch_add(size as i64 - compressed as i64, Ordering::Relaxed);
} else {
self.oversized.fetch_add(1, Ordering::Relaxed);
self.oversized_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
}
self.total_values.fetch_add(1, Ordering::Relaxed);
self.total_bytes.fetch_add(compressed as u64, Ordering::Relaxed);
self.uncompressed_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
pub fn remove(&self, size: u32, compressed: u32) {
if let Some(index) = value_histogram_index(size) {
self.value_histogram[index].fetch_sub(1, Ordering::Relaxed);
self.compression_delta[index]
.fetch_sub(size as i64 - compressed as i64, Ordering::Relaxed);
} else {
self.oversized.fetch_sub(1, Ordering::Relaxed);
self.oversized_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
}
self.total_values.fetch_sub(1, Ordering::Relaxed);
self.total_bytes.fetch_sub(compressed as u64, Ordering::Relaxed);
self.uncompressed_bytes.fetch_sub(size as u64, Ordering::Relaxed);
}
pub fn insert_val(&self, size: u32, compressed: u32) {
self.inserted_new.fetch_add(1, Ordering::Relaxed);
self.insert(size, compressed);
}
pub fn remove_val(&self, size: u32, compressed: u32) {
self.removed_hit.fetch_add(1, Ordering::Relaxed);
self.remove(size, compressed);
}
pub fn reference_increase(&self) {
self.reference_increase_hit.fetch_add(1, Ordering::Relaxed);
}
pub fn reference_increase_miss(&self) {
self.reference_increase_miss.fetch_add(1, Ordering::Relaxed);
}
pub fn remove_miss(&self) {
self.removed_miss.fetch_add(1, Ordering::Relaxed);
}
pub fn replace_val(&self, old: u32, old_compressed: u32, new: u32, new_compressed: u32) {
self.inserted_overwrite.fetch_add(1, Ordering::Relaxed);
self.remove(old, old_compressed);
self.insert(new, new_compressed);
}
pub fn commit(&self) {
self.commits.fetch_add(1, Ordering::Relaxed);
}
}