From 6cc59600e256c9bba567b0fc94b02013563de398 Mon Sep 17 00:00:00 2001 From: Pascal Kuthe Date: Wed, 2 Aug 2023 18:45:55 +0200 Subject: [PATCH] switch to lock-free item list --- src/boxcar.rs | 595 +++++++++++++++++++++++++++++++++++++++++++++++++ src/items.rs | 139 ------------ src/lib.rs | 221 ++++++++++-------- src/pattern.rs | 4 +- src/worker.rs | 194 ++++++++++------ 5 files changed, 860 insertions(+), 293 deletions(-) create mode 100644 src/boxcar.rs diff --git a/src/boxcar.rs b/src/boxcar.rs new file mode 100644 index 0000000..95e210f --- /dev/null +++ b/src/boxcar.rs @@ -0,0 +1,595 @@ +//! Adapted from the `boxcar` crate at https://github.com/ibraheemdev/boxcar/blob/master/src/raw.rs +//! under MIT licenes: +//! +//! Copyright (c) 2022 Ibraheem Ahmed +//! +//! Permission is hereby granted, free of charge, to any person obtaining a copy +//! of this software and associated documentation files (the "Software"), to deal +//! in the Software without restriction, including without limitation the rights +//! to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +//! copies of the Software, and to permit persons to whom the Software is +//! furnished to do so, subject to the following conditions: +//! +//! The above copyright notice and this permission notice shall be included in all +//! copies or substantial portions of the Software. +//! +//! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +//! IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +//! FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +//! AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +//! LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +//! OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +//! SOFTWARE. + +use std::alloc::Layout; +use std::cell::UnsafeCell; +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering}; +use std::{ptr, slice}; + +use crate::{Item, Utf32String}; + +const BUCKETS: u32 = u32::BITS - SKIP_BUCKET; +const MAX_ENTRIES: u32 = u32::MAX - SKIP; + +/// A lock-free, append-only vector. +pub(crate) struct Vec { + /// a counter used to retrieve a unique index to push to. + /// + /// this value may be more than the true length as it will + /// be incremented before values are actually stored. + inflight: AtomicU64, + /// buckets of length 32, 64 .. 2^31 + buckets: [Bucket; BUCKETS as usize], + /// the number of initialized elements in this vector + count: AtomicU32, + /// the number of matcher columns in this vector, its absoletly critical that + /// this remains constant and after initilaziaton (safety invariant) since + /// it is used to calculate the Entry layou + columns: u32, +} + +impl Vec { + /// Constructs a new, empty `Vec` with the specified capacity and matcher columns. + pub fn with_capacity(capacity: u32, columns: u32) -> Vec { + assert_ne!(columns, 0, "there must be atleast one matcher column"); + let init = match capacity { + 0 => 0, + // initialize enough buckets for `capacity` elements + n => Location::of(n).bucket, + }; + + let mut buckets = [ptr::null_mut(); BUCKETS as usize]; + + for (i, bucket) in buckets[..=init as usize].iter_mut().enumerate() { + let len = Location::bucket_len(i as u32); + *bucket = unsafe { Bucket::alloc(len, columns) }; + } + + Vec { + buckets: buckets.map(Bucket::new), + inflight: AtomicU64::new(0), + count: AtomicU32::new(0), + columns, + } + } + pub fn columns(&self) -> u32 { + self.columns + } + + /// Returns the number of elements in the vector. + pub fn count(&self) -> u32 { + self.count.load(Ordering::Acquire) + } + + // Returns a reference to the element at the given index. + // + // # Safety + // + // Entry at `index` must be initialized. + pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { + let location = Location::of(index); + + // safety: caller guarantees the entry is initialized + unsafe { + let entries = self + .buckets + .get_unchecked(location.bucket as usize) + .entries + .load(Ordering::Acquire); + debug_assert!(!entries.is_null()); + let entry = Bucket::::get(entries, location.entry, self.columns); + debug_assert!((*entry).active.load(Ordering::Acquire)); + Entry::read(entry, self.columns) + } + } + + /// Returns a reference to the element at the given index. + pub fn get(&self, index: u32) -> Option> { + let location = Location::of(index); + + unsafe { + // safety: `location.bucket` is always in bounds + let entries = self + .buckets + .get_unchecked(location.bucket as usize) + .entries + .load(Ordering::Acquire); + + // bucket is uninitialized + if entries.is_null() { + return None; + } + + // safety: `location.entry` is always in bounds for it's bucket + let entry = Bucket::::get(entries, location.entry, self.columns); + + // safety: the entry is active + (*entry) + .active + .load(Ordering::Acquire) + .then(|| Entry::read(entry, self.columns)) + } + } + + /// Appends an element to the back of the vector. + pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 { + let index = self.inflight.fetch_add(1, Ordering::Release); + // the inflight counter is a `u64` to catch overflows of the vector'scapacity + let index: u32 = index.try_into().expect("overflowed maximum capacity"); + let location = Location::of(index); + + // eagerly allocate the next bucket if we are close to the end of this one + if index == (location.bucket_len - (location.bucket_len >> 3)) { + if let Some(next_bucket) = self.buckets.get(location.bucket as usize + 1) { + Vec::get_or_alloc(next_bucket, location.bucket_len << 1, self.columns); + } + } + + // safety: `location.bucket` is always in bounds + let bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) }; + let mut entries = bucket.entries.load(Ordering::Acquire); + + // the bucket has not been allocated yet + if entries.is_null() { + entries = Vec::get_or_alloc(bucket, location.bucket_len, self.columns); + } + + unsafe { + // safety: `location.entry` is always in bounds for it's bucket + let entry = Bucket::get(entries, location.entry, self.columns); + + // safety: we have unique access to this entry. + // + // 1. it is impossible for another thread to attempt a `push` + // to this location as we retrieved it from `inflight.fetch_add` + // + // 2. any thread trying to `get` this entry will see `active == false`, + // and will not try to access it + (*entry).slot.get().write(MaybeUninit::new(value)); + for col in Entry::matcher_cols_raw(entry, self.columns) { + col.get().write(MaybeUninit::new(Utf32String::default())) + } + fill_columns(Entry::matcher_cols_mut(entry, self.columns)); + // let other threads know that this entry is active + (*entry).active.store(true, Ordering::Release); + } + + // increase the true count + self.count.fetch_add(1, Ordering::Release); + index + } + + /// race to initialize a bucket + fn get_or_alloc(bucket: &Bucket, len: u32, cols: u32) -> *mut Entry { + let entries = unsafe { Bucket::alloc(len, cols) }; + match bucket.entries.compare_exchange( + ptr::null_mut(), + entries, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => entries, + Err(found) => unsafe { + Bucket::dealloc(entries, len, cols); + found + }, + } + } + + /// Returns an iterator over the vector starting at `start` + /// the iterator is deterministically sized and will not grow + /// as more elements are pushed + pub unsafe fn snapshot(&self, start: u32) -> Iter<'_, T> { + let end = self + .inflight + .load(Ordering::Acquire) + .min(MAX_ENTRIES as u64) as u32; + assert!(start <= end, "index {start} is out of bounds!"); + Iter { + location: Location::of(start), + vec: self, + idx: start, + end, + } + } + + /// Returns an iterator over the vector starting at `start` + /// the iterator is deterministically sized and will not grow + /// as more elements are pushed + pub unsafe fn par_snapshot(&self, start: u32) -> ParIter<'_, T> { + let end = self + .inflight + .load(Ordering::Acquire) + .min(MAX_ENTRIES as u64) as u32; + assert!(start <= end, "index {start} is out of bounds!"); + + ParIter { + start, + end, + vec: self, + } + } +} + +impl Drop for Vec { + fn drop(&mut self) { + for (i, bucket) in self.buckets.iter_mut().enumerate() { + let entries = *bucket.entries.get_mut(); + + if entries.is_null() { + break; + } + + let len = Location::bucket_len(i as u32); + // safety: in drop + unsafe { Bucket::dealloc(entries, len, self.columns) } + } + } +} +type SnapshotItem<'v, T> = (u32, Option>); + +pub struct Iter<'v, T> { + location: Location, + idx: u32, + end: u32, + vec: &'v Vec, +} +impl Iter<'_, T> { + pub fn end(&self) -> u32 { + self.end + } +} + +impl<'v, T> Iterator for Iter<'v, T> { + type Item = SnapshotItem<'v, T>; + fn size_hint(&self) -> (usize, Option) { + ( + (self.end - self.idx) as usize, + Some((self.end - self.idx) as usize), + ) + } + + fn next(&mut self) -> Option> { + if self.end == self.idx { + return None; + } + debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end); + debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Acquire)); + + loop { + let entries = unsafe { + self.vec + .buckets + .get_unchecked(self.location.bucket as usize) + .entries + .load(Ordering::Acquire) + }; + debug_assert!(self.location.bucket < BUCKETS); + + if self.location.entry < self.location.bucket_len { + if entries.is_null() { + // we still want to yield these + let index = self.idx; + self.location.entry += 1; + self.idx += 1; + return Some((index, None)); + } + // safety: bounds and null checked above + let entry = unsafe { Bucket::get(entries, self.location.entry, self.vec.columns) }; + let index = self.idx; + self.location.entry += 1; + self.idx += 1; + + let entry = unsafe { + (*entry) + .active + .load(Ordering::Acquire) + .then(|| Entry::read(entry, self.vec.columns)) + }; + return Some((index, entry)); + } + + self.location.entry = 0; + self.location.bucket += 1; + + if self.location.bucket < BUCKETS { + self.location.bucket_len = Location::bucket_len(self.location.bucket); + } + } + } +} +impl ExactSizeIterator for Iter<'_, T> {} +impl DoubleEndedIterator for Iter<'_, T> { + fn next_back(&mut self) -> Option { + unimplemented!() + } +} + +pub struct ParIter<'v, T> { + end: u32, + start: u32, + vec: &'v Vec, +} +impl<'v, T> ParIter<'v, T> { + pub fn end(&self) -> u32 { + self.end + } +} + +impl<'v, T: Send + Sync> rayon::iter::ParallelIterator for ParIter<'v, T> { + type Item = SnapshotItem<'v, T>; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: rayon::iter::plumbing::UnindexedConsumer, + { + rayon::iter::plumbing::bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some((self.end - self.start) as usize) + } +} + +impl rayon::iter::IndexedParallelIterator for ParIter<'_, T> { + fn len(&self) -> usize { + (self.end - self.start) as usize + } + + fn drive>(self, consumer: C) -> C::Result { + rayon::iter::plumbing::bridge(self, consumer) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: rayon::iter::plumbing::ProducerCallback, + { + callback.callback(ParIterProducer { + start: self.start, + end: self.end, + vec: self.vec, + }) + } +} + +struct ParIterProducer<'v, T: Send> { + start: u32, + end: u32, + vec: &'v Vec, +} + +impl<'v, T: 'v + Send + Sync> rayon::iter::plumbing::Producer for ParIterProducer<'v, T> { + type Item = SnapshotItem<'v, T>; + type IntoIter = Iter<'v, T>; + + fn into_iter(self) -> Self::IntoIter { + debug_assert!(self.start <= self.end); + Iter { + location: Location::of(self.start), + idx: self.start, + end: self.end, + vec: self.vec, + } + } + + fn split_at(self, index: usize) -> (Self, Self) { + assert!(index <= (self.end - self.start) as usize); + let index = index as u32; + ( + ParIterProducer { + start: self.start, + end: self.start + index, + vec: self.vec, + }, + ParIterProducer { + start: self.start + index, + end: self.end, + vec: self.vec, + }, + ) + } +} + +struct Bucket { + entries: AtomicPtr>, +} + +impl Bucket { + fn layout(len: u32, layout: Layout) -> Layout { + Layout::from_size_align(layout.size() * len as usize, layout.align()) + .expect("exceeded maximum allocation size") + } + + unsafe fn alloc(len: u32, cols: u32) -> *mut Entry { + let layout = Entry::::layout(cols); + let arr_layout = Self::layout(len, layout); + let entries = std::alloc::alloc(arr_layout); + if entries.is_null() { + std::alloc::handle_alloc_error(arr_layout) + } + + for i in 0..len { + let active = entries.add(i as usize * layout.size()) as *mut AtomicBool; + active.write(AtomicBool::new(false)) + } + entries as *mut Entry + } + + unsafe fn dealloc(entries: *mut Entry, len: u32, cols: u32) { + let layout = Entry::::layout(cols); + let arr_layout = Self::layout(len, layout); + for i in 0..len { + let entry = Bucket::get(entries, i, cols); + if *(*entry).active.get_mut() { + ptr::drop_in_place((*(*entry).slot.get()).as_mut_ptr()); + for matcher_col in Entry::matcher_cols_raw(entry, cols) { + ptr::drop_in_place((*matcher_col.get()).as_mut_ptr()); + } + } + } + std::alloc::dealloc(entries as *mut u8, arr_layout) + } + + unsafe fn get(entries: *mut Entry, idx: u32, cols: u32) -> *mut Entry { + let layout = Entry::::layout(cols); + let ptr = entries as *mut u8; + ptr.add(layout.size() * idx as usize) as *mut Entry + } + + fn new(entries: *mut Entry) -> Bucket { + Bucket { + entries: AtomicPtr::new(entries), + } + } +} + +#[repr(C)] +struct Entry { + active: AtomicBool, + slot: UnsafeCell>, + tail: [UnsafeCell>; 0], +} + +impl Entry { + fn layout(cols: u32) -> Layout { + let head = Layout::new::(); + let tail = Layout::array::(cols as usize).expect("invalid memory layout"); + head.extend(tail) + .expect("invalid memory layout") + .0 + .pad_to_align() + } + + unsafe fn matcher_cols_raw<'a>( + ptr: *mut Entry, + cols: u32, + ) -> &'a [UnsafeCell>] { + // this whole thing looks weird. The reason we do this is that + // we must make sure the pointer retains its provenance which may (or may not?) + // be lost if we used tail.as_ptr() + let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; + let offset = tail.offset_from(ptr as *mut u8) as usize; + let ptr = (ptr as *mut u8).add(offset) as *mut _; + slice::from_raw_parts(ptr, cols as usize) + } + + unsafe fn matcher_cols_mut<'a>(ptr: *mut Entry, cols: u32) -> &'a mut [Utf32String] { + // this whole thing looks weird. The reason we do this is that + // we must make sure the pointer retains its provenance which may (or may not?) + // be lost if we used tail.as_ptr() + let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; + let offset = tail.offset_from(ptr as *mut u8) as usize; + let ptr = (ptr as *mut u8).add(offset) as *mut _; + slice::from_raw_parts_mut(ptr, cols as usize) + } + // # Safety + // + // Value must be initialized. + unsafe fn read<'a>(ptr: *mut Entry, cols: u32) -> Item<'a, T> { + // this whole thing looks weird. The reason we do this is that + // we must make sure the pointer retains its provenance which may (or may not?) + // be lost if we used tail.as_ptr() + let data = (*(*ptr).slot.get()).assume_init_ref(); + let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; + let offset = tail.offset_from(ptr as *mut u8) as usize; + let ptr = (ptr as *mut u8).add(offset) as *mut _; + let matcher_columns = slice::from_raw_parts(ptr, cols as usize); + Item { + data, + matcher_columns, + } + } +} + +#[derive(Debug)] +struct Location { + // the index of the bucket + bucket: u32, + // the length of `bucket` + bucket_len: u32, + // the index of the entry in `bucket` + entry: u32, +} + +// skip the shorter buckets to avoid unnecessary allocations. +// this also reduces the maximum capacity of a vector. +const SKIP: u32 = 32; +const SKIP_BUCKET: u32 = (u32::BITS - SKIP.leading_zeros()) - 1; + +impl Location { + fn of(index: u32) -> Location { + let skipped = index.checked_add(SKIP).expect("exceeded maximum length"); + let bucket = u32::BITS - skipped.leading_zeros(); + let bucket = bucket - (SKIP_BUCKET + 1); + let bucket_len = Location::bucket_len(bucket); + let entry = skipped ^ bucket_len; + + Location { + bucket, + bucket_len, + entry, + } + } + + fn bucket_len(bucket: u32) -> u32 { + 1 << (bucket + SKIP_BUCKET) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn location() { + assert_eq!(Location::bucket_len(0), 32); + for i in 0..32 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 32); + assert_eq!(loc.bucket, 0); + assert_eq!(loc.entry, i); + } + + assert_eq!(Location::bucket_len(1), 64); + for i in 33..96 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 64); + assert_eq!(loc.bucket, 1); + assert_eq!(loc.entry, i - 32); + } + + assert_eq!(Location::bucket_len(2), 128); + for i in 96..224 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 128); + assert_eq!(loc.bucket, 2); + assert_eq!(loc.entry, i - 96); + } + + let max = Location::of(MAX_ENTRIES); + assert_eq!(max.bucket, BUCKETS - 1); + assert_eq!(max.bucket_len, 1 << 31); + assert_eq!(max.entry, (1 << 31) - 1); + } +} diff --git a/src/items.rs b/src/items.rs index 2f50289..8b13789 100644 --- a/src/items.rs +++ b/src/items.rs @@ -1,140 +1 @@ -use std::mem::swap; -use std::ptr::NonNull; -use crate::Utf32String; - -pub(crate) struct ItemCache { - live: Vec, - evicted: Vec, -} -impl ItemCache { - pub(crate) fn new() -> Self { - Self { - live: Vec::with_capacity(1024), - evicted: Vec::new(), - } - } - - pub(crate) fn clear(&mut self) { - if self.evicted.is_empty() { - self.evicted.reserve(1024); - swap(&mut self.evicted, &mut self.live) - } else { - self.evicted.append(&mut self.live) - } - } - - pub(crate) fn cleared(&self) -> bool { - !self.evicted.is_empty() - } - - pub(crate) fn push(&mut self, item: Box<[Utf32String]>) { - self.live.push(Item { - cols: Box::leak(item).into(), - }) - } - - pub(crate) fn get(&mut self) -> &mut [Item] { - &mut self.live - } -} - -#[derive(PartialEq, Eq, Clone)] -pub struct Item { - // TODO: small vec optimization?? - cols: NonNull<[Utf32String]>, -} - -impl std::fmt::Debug for Item { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ItemText") - .field("cols", &self.cols()) - .finish() - } -} - -unsafe impl Send for Item {} -unsafe impl Sync for Item {} - -impl Item { - pub fn cols(&self) -> &[Utf32String] { - // safety: cols is basically a box and treated the same as a box, - // however there can be other references so using a box (unique ptr) - // would be an alias violation - unsafe { self.cols.as_ref() } - } -} -impl Drop for Item { - fn drop(&mut self) { - // safety: cols is basically a box and treated the same as a box, - // however there can be other references (that won't be accessed - // anymore at this point) so using a box (unique ptr) would be an alias - // violation - unsafe { drop(Box::from_raw(self.cols.as_ptr())) } - } -} - -#[derive(Debug, Clone, Copy)] -pub(crate) struct ItemSnapshot { - cols: NonNull<[Utf32String]>, - pub(crate) len: u32, -} - -unsafe impl Send for ItemSnapshot {} -unsafe impl Sync for ItemSnapshot {} - -#[derive(Debug, Clone)] -pub(crate) struct ItemsSnapshot { - items: Vec, -} - -impl ItemsSnapshot { - pub(crate) fn new(items: &ItemCache) -> Self { - Self { - items: items - .live - .iter() - .map(|item| ItemSnapshot { - cols: item.cols, - len: item.cols().iter().map(|s| s.len() as u32).sum(), - }) - .collect(), - } - } - - pub(crate) fn outdated(&self, items: &ItemCache) -> bool { - items.live.len() != self.items.len() - } - - pub(crate) fn len(&self) -> usize { - self.items.len() - } - - pub(crate) fn update(&mut self, items: &ItemCache) -> bool { - let cleared = !items.evicted.is_empty(); - // drop in another thread to ensure we don't wait for a long drop here - if cleared { - self.items.clear(); - }; - let start = self.items.len(); - self.items - .extend(items.live[start..].iter().map(|item| ItemSnapshot { - cols: item.cols, - len: item.cols().iter().map(|s| s.len() as u32).sum(), - })); - cleared - } - - pub(crate) unsafe fn get(&self) -> &[ItemSnapshot] { - &self.items - } -} - -impl ItemSnapshot { - pub(crate) fn cols(&self) -> &[Utf32String] { - // safety: we only hand out ItemSnapshot ranges - // if the caller asserted via the unsafe ItemsSnapshot::get - // function that the pointers are valid - unsafe { self.cols.as_ref() } - } -} diff --git a/src/lib.rs b/src/lib.rs index 50b2569..866a036 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,24 +1,71 @@ use std::cmp::Reverse; -use std::ops::Deref; -use std::sync::atomic::{self, AtomicBool}; +use std::sync::atomic::{self, AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use crate::items::{Item, ItemCache}; -use crate::worker::Worker; -use parking_lot::lock_api::ArcMutexGuard; +use parking_lot::Mutex; use rayon::ThreadPool; pub use crate::pattern::{CaseMatching, MultiPattern, Pattern, PatternKind}; pub use crate::utf32_string::Utf32String; +use crate::worker::Woker; +pub use nucleo_matcher::{chars, Matcher, MatcherConfig, Utf32Str}; -mod items; +mod boxcar; mod pattern; mod utf32_string; mod worker; -pub use nucleo_matcher::{chars, Matcher, MatcherConfig, Utf32Str}; -use parking_lot::{Mutex, MutexGuard, RawMutex}; +pub struct Item<'a, T> { + pub data: &'a T, + pub matcher_columns: &'a [Utf32String], +} + +pub struct Injector { + items: Arc>, + notify: Arc<(dyn Fn() + Sync + Send)>, +} + +impl Clone for Injector { + fn clone(&self) -> Self { + Injector { + items: self.items.clone(), + notify: self.notify.clone(), + } + } +} + +impl Injector { + /// Appends an element to the back of the vector. + pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 { + let idx = self.items.push(value, fill_columns); + (self.notify)(); + idx + } + + /// Returns the total number of items in the current + /// queue + pub fn injected_items(&self) -> u32 { + self.items.count() + } + + /// Returns a reference to the item at the given index. + /// + /// # Safety + /// + /// Item at `index` must be initialized. That means you must have observed + /// `push` returning this value or `get` retunring `Some` for this value. + /// Just because a later index is initialized doesn't mean that this index + /// is initialized + pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { + self.items.get_unchecked(index) + } + + /// Returns a reference to the element at the given index. + pub fn get(&self, index: u32) -> Option> { + self.items.get(index) + } +} #[derive(PartialEq, Eq, Debug, Clone, Copy)] pub struct Match { @@ -32,139 +79,134 @@ pub struct Status { pub running: bool, } -#[derive(Clone)] -pub struct Items { - cache: Arc>, - items: Arc>>, - notify: Arc<(dyn Fn() + Sync + Send)>, -} - -impl Items { - pub fn clear(&mut self) { - self.items.lock().clear(); - self.cache.lock().clear(); - } - - pub fn append(&mut self, items: impl Iterator)>) { - let mut cache = self.cache.lock(); - let mut items_ = self.items.lock(); - items_.extend(items.map(|(item, text)| { - cache.push(text); - item - })); - // notify that a new tick will be necessary - (self.notify)(); - } - - pub fn get(&self) -> impl Deref + '_ { - MutexGuard::map(self.items.lock(), |items| items.as_mut_slice()) - } - - pub fn get_matcher_items(&self) -> impl Deref + '_ { - MutexGuard::map(self.cache.lock(), |items| items.get()) - } -} - -pub struct Nucleo { +pub struct Nucleo { // the way the API is build we totally don't actually need these to be Arcs // but this lets us avoid some unsafe - worker: Arc>, canceled: Arc, + should_notify: Arc, + worker: Arc>>, pool: ThreadPool, - pub items: Items, + cleared: bool, + item_count: u32, pub matches: Vec, pub pattern: MultiPattern, - should_notify: Arc, + pub notify: Arc<(dyn Fn() + Sync + Send)>, + items: Arc>, } -impl Nucleo { +impl Nucleo { pub fn new( config: MatcherConfig, notify: Arc<(dyn Fn() + Sync + Send)>, num_threads: Option, case_matching: CaseMatching, - cols: usize, - items: impl Iterator)>, + columns: u32, ) -> Self { - let mut cache = ItemCache::new(); - let items: Vec<_> = items - .map(|(item, text)| { - cache.push(text); - item - }) - .collect(); - let matches: Vec<_> = (0..items.len()) - .map(|i| Match { - score: 0, - idx: i as u32, - }) - .collect(); - let (pool, worker) = - Worker::new(notify.clone(), num_threads, config, matches.clone(), &cache); + let (pool, worker) = Woker::new(num_threads, config, notify.clone(), columns); Self { canceled: worker.canceled.clone(), should_notify: worker.should_notify.clone(), - items: Items { - cache: Arc::new(Mutex::new(cache)), - items: Arc::new(Mutex::new(items)), - notify, - }, + items: worker.items.clone(), pool, - matches, - pattern: MultiPattern::new(&config, case_matching, cols), + matches: Vec::with_capacity(2 * 1024), + pattern: MultiPattern::new(&config, case_matching, columns as usize), worker: Arc::new(Mutex::new(worker)), + cleared: false, + item_count: 0, + notify, } } + /// Returns the total number of items + pub fn item_count(&self) -> u32 { + self.item_count + } + + pub fn injector(&self) -> Injector { + Injector { + items: self.items.clone(), + notify: self.notify.clone(), + } + } + + /// Returns a reference to the item at the given index. + /// + /// # Safety + /// + /// Item at `index` must be initialized. That means you must have observed + /// `push` returning this value or `get` retunring `Some` for this value. + /// Just because a later index is initialized doesn't mean that this index + /// is initialized + pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { + self.items.get_unchecked(index) + } + + /// Returns a reference to the element at the given index. + pub fn get(&self, index: u32) -> Option> { + self.items.get(index) + } + + /// Clears all items + pub fn clear(&mut self) { + self.canceled.store(true, Ordering::Relaxed); + self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns())); + self.cleared = true + } + pub fn update_config(&mut self, config: MatcherConfig) { self.worker.lock().update_config(config) } + pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 { + let idx = self.items.push(value, fill_columns); + (self.notify)(); + idx + } + pub fn tick(&mut self, timeout: u64) -> Status { self.should_notify.store(false, atomic::Ordering::Relaxed); let status = self.pattern.status(); - let items = self.items.cache.lock_arc(); - let canceled = status != pattern::Status::Unchanged || items.cleared(); - let res = self.tick_inner(timeout, canceled, items, status); + let canceled = status != pattern::Status::Unchanged || self.cleared; + let res = self.tick_inner(timeout, canceled, status); + self.cleared = false; if !canceled { - self.should_notify.store(true, atomic::Ordering::Relaxed); return res; } - let items = self.items.cache.lock_arc(); - let res = self.tick_inner(timeout, false, items, pattern::Status::Unchanged); - self.should_notify.store(true, atomic::Ordering::Relaxed); - res + self.tick_inner(timeout, false, pattern::Status::Unchanged) } - fn tick_inner( - &mut self, - timeout: u64, - canceled: bool, - items: ArcMutexGuard, - status: pattern::Status, - ) -> Status { + fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status { let mut inner = if canceled { self.pattern.reset_status(); self.canceled.store(true, atomic::Ordering::Relaxed); self.worker.lock_arc() } else { let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else { + self.should_notify.store(true, Ordering::Release); return Status{ changed: false, running: true }; }; worker }; let changed = inner.running; + + let running = canceled || self.items.count() > inner.item_count(); if inner.running { inner.running = false; - self.matches.clone_from(&inner.matches); + if !inner.was_canceled { + self.item_count = inner.item_count(); + self.matches.clone_from(&inner.matches); + } } - - let running = canceled || inner.items.outdated(&items); if running { inner.pattern.clone_from(&self.pattern); self.canceled.store(false, atomic::Ordering::Relaxed); - self.pool.spawn(move || unsafe { inner.run(items, status) }) + if !canceled { + self.should_notify.store(true, atomic::Ordering::Release); + } + let cleared = self.cleared; + self.pool + .spawn(move || unsafe { inner.run(status, cleared) }) } Status { changed, running } } @@ -181,6 +223,7 @@ impl Drop for Nucleo { } } } + /// convenicne function to easily fuzzy match /// on a (relatively small list of inputs). This is not recommended for building a full tui /// application that can match large numbers of matches as all matching is done on the current diff --git a/src/pattern.rs b/src/pattern.rs index ab2cf17..feb35ab 100644 --- a/src/pattern.rs +++ b/src/pattern.rs @@ -174,10 +174,10 @@ impl MultiPattern { pub fn new( matcher_config: &MatcherConfig, case_matching: CaseMatching, - cols: usize, + columns: usize, ) -> MultiPattern { MultiPattern { - cols: vec![Pattern::new(matcher_config, case_matching); cols], + cols: vec![Pattern::new(matcher_config, case_matching); columns], } } diff --git a/src/worker.rs b/src/worker.rs index d7babdc..99082ae 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -3,13 +3,11 @@ use std::sync::atomic::{self, AtomicBool}; use std::sync::Arc; use nucleo_matcher::MatcherConfig; -use parking_lot::lock_api::ArcMutexGuard; -use parking_lot::RawMutex; +use parking_lot::Mutex; use rayon::{prelude::*, ThreadPool}; -use crate::items::{ItemCache, ItemsSnapshot}; use crate::pattern::{self, MultiPattern}; -use crate::Match; +use crate::{boxcar, Match}; struct Matchers(Box<[UnsafeCell]>); @@ -24,18 +22,24 @@ impl Matchers { unsafe impl Sync for Matchers {} unsafe impl Send for Matchers {} -pub(crate) struct Worker { - notify: Arc<(dyn Fn() + Sync + Send)>, +pub(crate) struct Woker { pub(crate) running: bool, - pub(crate) items: ItemsSnapshot, matchers: Matchers, pub(crate) matches: Vec, pub(crate) pattern: MultiPattern, pub(crate) canceled: Arc, pub(crate) should_notify: Arc, + pub(crate) was_canceled: bool, + pub(crate) last_snapshot: u32, + notify: Arc<(dyn Fn() + Sync + Send)>, + pub(crate) items: Arc>, + in_flight: Vec, } -impl Worker { +impl Woker { + pub(crate) fn item_count(&self) -> u32 { + self.last_snapshot - self.in_flight.len() as u32 + } pub(crate) fn update_config(&mut self, config: MatcherConfig) { for matcher in self.matchers.0.iter_mut() { matcher.get_mut().config = config; @@ -43,12 +47,11 @@ impl Worker { } pub(crate) fn new( - notify: Arc<(dyn Fn() + Sync + Send)>, worker_threads: Option, config: MatcherConfig, - matches: Vec, - items: &ItemCache, - ) -> (ThreadPool, Worker) { + notify: Arc<(dyn Fn() + Sync + Send)>, + cols: u32, + ) -> (ThreadPool, Self) { let worker_threads = worker_threads .unwrap_or_else(|| std::thread::available_parallelism().map_or(4, |it| it.get())); let pool = rayon::ThreadPoolBuilder::new() @@ -59,96 +62,161 @@ impl Worker { let matchers = (0..worker_threads) .map(|_| UnsafeCell::new(nucleo_matcher::Matcher::new(config))) .collect(); - let worker = Worker { - notify, + let worker = Woker { running: false, - items: ItemsSnapshot::new(items), matchers: Matchers(matchers), - matches, + last_snapshot: 0, + matches: Vec::new(), // just a placeholder pattern: MultiPattern::new(&config, crate::CaseMatching::Ignore, 0), canceled: Arc::new(AtomicBool::new(false)), should_notify: Arc::new(AtomicBool::new(false)), + was_canceled: false, + notify, + items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)), + in_flight: Vec::with_capacity(64), }; (pool, worker) } - pub(crate) unsafe fn run( - &mut self, - items_lock: ArcMutexGuard, - query_status: pattern::Status, - ) { - self.running = true; - let mut last_scored_item = self.items.len(); - let cleared = self.items.update(&items_lock); - drop(items_lock); - - // TODO: be smarter around reusing past results for rescoring - if cleared || query_status == pattern::Status::Rescore { - self.matches.clear(); - last_scored_item = 0; - } + unsafe fn process_new_items(&mut self) { let matchers = &self.matchers; let pattern = &self.pattern; - let items = unsafe { self.items.get() }; + self.matches.reserve(self.in_flight.len()); + self.in_flight.retain(|&idx| { + let Some(item) = self.items.get(idx) else { + return true; + }; + let Some(score) = pattern.score(item.matcher_columns, matchers.get()) else { + return false; + }; + self.matches.push(Match { score, idx }); + false + }); + let new_snapshot = self.items.par_snapshot(self.last_snapshot); + if new_snapshot.end() != self.last_snapshot { + let end = new_snapshot.end(); + let in_flight = Mutex::new(&mut self.in_flight); + let items = new_snapshot.filter_map(|(idx, item)| { + let Some(item) = item else { + in_flight.lock().push(idx); + return None; + }; + let score = if self.canceled.load(atomic::Ordering::Relaxed) { + 0 + } else { + pattern.score(item.matcher_columns, matchers.get())? + }; + Some(Match { score, idx }) + }); + self.matches.par_extend(items); + self.last_snapshot = end; + } + } - if self.pattern.cols.iter().all(|pat| pat.is_empty()) { + fn remove_in_flight_matches(&mut self) { + let mut off = 0; + self.in_flight.retain(|&i| { + let is_in_flight = self.items.get(i).is_none(); + if is_in_flight { + self.matches.remove((i - off) as usize); + off += 1; + } + is_in_flight + }); + } + + unsafe fn process_new_items_trivial(&mut self) { + let new_snapshot = self.items.snapshot(self.last_snapshot); + if new_snapshot.end() != self.last_snapshot { + let end = new_snapshot.end(); + let items = new_snapshot.filter_map(|(idx, item)| { + if item.is_none() { + self.in_flight.push(idx); + return None; + }; + Some(Match { score: 0, idx }) + }); + self.matches.extend(items); + self.last_snapshot = end; + } + } + + pub(crate) unsafe fn run(&mut self, pattern_status: pattern::Status, cleared: bool) { + self.running = true; + self.was_canceled = false; + + if cleared { + self.last_snapshot = 0; + } + + // TODO: be smarter around reusing past results for rescoring + let empty_pattern = self.pattern.cols.iter().all(|pat| pat.is_empty()); + if empty_pattern { self.matches.clear(); - self.matches.extend((0..items.len()).map(|i| Match { - score: 0, - idx: i as u32, - })); - if self.should_notify.load(atomic::Ordering::Relaxed) { + self.matches + .extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx })); + // there are usually only very few in flight items (one for each writer) + self.remove_in_flight_matches(); + self.process_new_items_trivial(); + if self.should_notify.load(atomic::Ordering::Acquire) { (self.notify)(); } return; } - if query_status != pattern::Status::Unchanged && !self.matches.is_empty() { + + self.process_new_items(); + if pattern_status == pattern::Status::Rescore { + self.matches.clear(); + self.matches + .extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx })); + self.remove_in_flight_matches(); + } + + let matchers = &self.matchers; + let pattern = &self.pattern; + if pattern_status != pattern::Status::Unchanged && !self.matches.is_empty() { self.matches .par_iter_mut() .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed)) .for_each(|match_| { - let item = &items[match_.idx as usize]; + // safety: in-flight items are never added to the matches + let item = self.items.get_unchecked(match_.idx); match_.score = pattern - .score(item.cols(), unsafe { matchers.get() }) + .score(item.matcher_columns, matchers.get()) .unwrap_or(u32::MAX); }); // TODO: do this in parallel? self.matches.retain(|m| m.score != u32::MAX); } - if last_scored_item != self.items.len() { - let items = items[last_scored_item..] - .par_iter() - .enumerate() - .filter_map(|(i, item)| { - let score = if self.canceled.load(atomic::Ordering::Relaxed) { - u32::MAX - 1 - } else { - pattern.score(item.cols(), unsafe { matchers.get() })? - }; - Some(Match { - score, - idx: i as u32, - }) - }); - self.matches.par_extend(items); - } - if !self.canceled.load(atomic::Ordering::Relaxed) { + if self.canceled.load(atomic::Ordering::Relaxed) { + self.was_canceled = true; + } else { // TODO: cancel sort in progress? self.matches.par_sort_unstable_by(|match1, match2| { match2.score.cmp(&match1.score).then_with(|| { // the tie breaker is comparitevly rarely needed so we keep it // in a branch especially because we need to access the items // array here which involves some pointer chasing - let item1 = &items[match1.idx as usize]; - let item2 = &items[match2.idx as usize]; - (item1.len, match1.idx).cmp(&(item2.len, match2.idx)) + let item1 = self.items.get_unchecked(match1.idx); + let item2 = &self.items.get_unchecked(match2.idx); + let len1: u32 = item1 + .matcher_columns + .iter() + .map(|haystack| haystack.len() as u32) + .sum(); + let len2 = item2 + .matcher_columns + .iter() + .map(|haystack| haystack.len() as u32) + .sum(); + (len1, match1.idx).cmp(&(len2, match2.idx)) }) }); } - if self.should_notify.load(atomic::Ordering::Relaxed) { + if self.should_notify.load(atomic::Ordering::Acquire) { (self.notify)(); } }