mirror of
https://github.com/solaeus/nucleo.git
synced 2024-12-22 01:47:49 +00:00
switch to lock-free item list
This commit is contained in:
parent
2cbb46e738
commit
6cc59600e2
595
src/boxcar.rs
Normal file
595
src/boxcar.rs
Normal file
@ -0,0 +1,595 @@
|
||||
//! Adapted from the `boxcar` crate at https://github.com/ibraheemdev/boxcar/blob/master/src/raw.rs
|
||||
//! under MIT licenes:
|
||||
//!
|
||||
//! Copyright (c) 2022 Ibraheem Ahmed
|
||||
//!
|
||||
//! Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
//! of this software and associated documentation files (the "Software"), to deal
|
||||
//! in the Software without restriction, including without limitation the rights
|
||||
//! to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
//! copies of the Software, and to permit persons to whom the Software is
|
||||
//! furnished to do so, subject to the following conditions:
|
||||
//!
|
||||
//! The above copyright notice and this permission notice shall be included in all
|
||||
//! copies or substantial portions of the Software.
|
||||
//!
|
||||
//! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
//! IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
//! FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
//! AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
//! LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
//! OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
//! SOFTWARE.
|
||||
|
||||
use std::alloc::Layout;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering};
|
||||
use std::{ptr, slice};
|
||||
|
||||
use crate::{Item, Utf32String};
|
||||
|
||||
const BUCKETS: u32 = u32::BITS - SKIP_BUCKET;
|
||||
const MAX_ENTRIES: u32 = u32::MAX - SKIP;
|
||||
|
||||
/// A lock-free, append-only vector.
|
||||
pub(crate) struct Vec<T> {
|
||||
/// a counter used to retrieve a unique index to push to.
|
||||
///
|
||||
/// this value may be more than the true length as it will
|
||||
/// be incremented before values are actually stored.
|
||||
inflight: AtomicU64,
|
||||
/// buckets of length 32, 64 .. 2^31
|
||||
buckets: [Bucket<T>; BUCKETS as usize],
|
||||
/// the number of initialized elements in this vector
|
||||
count: AtomicU32,
|
||||
/// the number of matcher columns in this vector, its absoletly critical that
|
||||
/// this remains constant and after initilaziaton (safety invariant) since
|
||||
/// it is used to calculate the Entry layou
|
||||
columns: u32,
|
||||
}
|
||||
|
||||
impl<T> Vec<T> {
|
||||
/// Constructs a new, empty `Vec<T>` with the specified capacity and matcher columns.
|
||||
pub fn with_capacity(capacity: u32, columns: u32) -> Vec<T> {
|
||||
assert_ne!(columns, 0, "there must be atleast one matcher column");
|
||||
let init = match capacity {
|
||||
0 => 0,
|
||||
// initialize enough buckets for `capacity` elements
|
||||
n => Location::of(n).bucket,
|
||||
};
|
||||
|
||||
let mut buckets = [ptr::null_mut(); BUCKETS as usize];
|
||||
|
||||
for (i, bucket) in buckets[..=init as usize].iter_mut().enumerate() {
|
||||
let len = Location::bucket_len(i as u32);
|
||||
*bucket = unsafe { Bucket::alloc(len, columns) };
|
||||
}
|
||||
|
||||
Vec {
|
||||
buckets: buckets.map(Bucket::new),
|
||||
inflight: AtomicU64::new(0),
|
||||
count: AtomicU32::new(0),
|
||||
columns,
|
||||
}
|
||||
}
|
||||
pub fn columns(&self) -> u32 {
|
||||
self.columns
|
||||
}
|
||||
|
||||
/// Returns the number of elements in the vector.
|
||||
pub fn count(&self) -> u32 {
|
||||
self.count.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
// Returns a reference to the element at the given index.
|
||||
//
|
||||
// # Safety
|
||||
//
|
||||
// Entry at `index` must be initialized.
|
||||
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
|
||||
let location = Location::of(index);
|
||||
|
||||
// safety: caller guarantees the entry is initialized
|
||||
unsafe {
|
||||
let entries = self
|
||||
.buckets
|
||||
.get_unchecked(location.bucket as usize)
|
||||
.entries
|
||||
.load(Ordering::Acquire);
|
||||
debug_assert!(!entries.is_null());
|
||||
let entry = Bucket::<T>::get(entries, location.entry, self.columns);
|
||||
debug_assert!((*entry).active.load(Ordering::Acquire));
|
||||
Entry::read(entry, self.columns)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the element at the given index.
|
||||
pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
|
||||
let location = Location::of(index);
|
||||
|
||||
unsafe {
|
||||
// safety: `location.bucket` is always in bounds
|
||||
let entries = self
|
||||
.buckets
|
||||
.get_unchecked(location.bucket as usize)
|
||||
.entries
|
||||
.load(Ordering::Acquire);
|
||||
|
||||
// bucket is uninitialized
|
||||
if entries.is_null() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// safety: `location.entry` is always in bounds for it's bucket
|
||||
let entry = Bucket::<T>::get(entries, location.entry, self.columns);
|
||||
|
||||
// safety: the entry is active
|
||||
(*entry)
|
||||
.active
|
||||
.load(Ordering::Acquire)
|
||||
.then(|| Entry::read(entry, self.columns))
|
||||
}
|
||||
}
|
||||
|
||||
/// Appends an element to the back of the vector.
|
||||
pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 {
|
||||
let index = self.inflight.fetch_add(1, Ordering::Release);
|
||||
// the inflight counter is a `u64` to catch overflows of the vector'scapacity
|
||||
let index: u32 = index.try_into().expect("overflowed maximum capacity");
|
||||
let location = Location::of(index);
|
||||
|
||||
// eagerly allocate the next bucket if we are close to the end of this one
|
||||
if index == (location.bucket_len - (location.bucket_len >> 3)) {
|
||||
if let Some(next_bucket) = self.buckets.get(location.bucket as usize + 1) {
|
||||
Vec::get_or_alloc(next_bucket, location.bucket_len << 1, self.columns);
|
||||
}
|
||||
}
|
||||
|
||||
// safety: `location.bucket` is always in bounds
|
||||
let bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) };
|
||||
let mut entries = bucket.entries.load(Ordering::Acquire);
|
||||
|
||||
// the bucket has not been allocated yet
|
||||
if entries.is_null() {
|
||||
entries = Vec::get_or_alloc(bucket, location.bucket_len, self.columns);
|
||||
}
|
||||
|
||||
unsafe {
|
||||
// safety: `location.entry` is always in bounds for it's bucket
|
||||
let entry = Bucket::get(entries, location.entry, self.columns);
|
||||
|
||||
// safety: we have unique access to this entry.
|
||||
//
|
||||
// 1. it is impossible for another thread to attempt a `push`
|
||||
// to this location as we retrieved it from `inflight.fetch_add`
|
||||
//
|
||||
// 2. any thread trying to `get` this entry will see `active == false`,
|
||||
// and will not try to access it
|
||||
(*entry).slot.get().write(MaybeUninit::new(value));
|
||||
for col in Entry::matcher_cols_raw(entry, self.columns) {
|
||||
col.get().write(MaybeUninit::new(Utf32String::default()))
|
||||
}
|
||||
fill_columns(Entry::matcher_cols_mut(entry, self.columns));
|
||||
// let other threads know that this entry is active
|
||||
(*entry).active.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
// increase the true count
|
||||
self.count.fetch_add(1, Ordering::Release);
|
||||
index
|
||||
}
|
||||
|
||||
/// race to initialize a bucket
|
||||
fn get_or_alloc(bucket: &Bucket<T>, len: u32, cols: u32) -> *mut Entry<T> {
|
||||
let entries = unsafe { Bucket::alloc(len, cols) };
|
||||
match bucket.entries.compare_exchange(
|
||||
ptr::null_mut(),
|
||||
entries,
|
||||
Ordering::Release,
|
||||
Ordering::Acquire,
|
||||
) {
|
||||
Ok(_) => entries,
|
||||
Err(found) => unsafe {
|
||||
Bucket::dealloc(entries, len, cols);
|
||||
found
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the vector starting at `start`
|
||||
/// the iterator is deterministically sized and will not grow
|
||||
/// as more elements are pushed
|
||||
pub unsafe fn snapshot(&self, start: u32) -> Iter<'_, T> {
|
||||
let end = self
|
||||
.inflight
|
||||
.load(Ordering::Acquire)
|
||||
.min(MAX_ENTRIES as u64) as u32;
|
||||
assert!(start <= end, "index {start} is out of bounds!");
|
||||
Iter {
|
||||
location: Location::of(start),
|
||||
vec: self,
|
||||
idx: start,
|
||||
end,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the vector starting at `start`
|
||||
/// the iterator is deterministically sized and will not grow
|
||||
/// as more elements are pushed
|
||||
pub unsafe fn par_snapshot(&self, start: u32) -> ParIter<'_, T> {
|
||||
let end = self
|
||||
.inflight
|
||||
.load(Ordering::Acquire)
|
||||
.min(MAX_ENTRIES as u64) as u32;
|
||||
assert!(start <= end, "index {start} is out of bounds!");
|
||||
|
||||
ParIter {
|
||||
start,
|
||||
end,
|
||||
vec: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Vec<T> {
|
||||
fn drop(&mut self) {
|
||||
for (i, bucket) in self.buckets.iter_mut().enumerate() {
|
||||
let entries = *bucket.entries.get_mut();
|
||||
|
||||
if entries.is_null() {
|
||||
break;
|
||||
}
|
||||
|
||||
let len = Location::bucket_len(i as u32);
|
||||
// safety: in drop
|
||||
unsafe { Bucket::dealloc(entries, len, self.columns) }
|
||||
}
|
||||
}
|
||||
}
|
||||
type SnapshotItem<'v, T> = (u32, Option<Item<'v, T>>);
|
||||
|
||||
pub struct Iter<'v, T> {
|
||||
location: Location,
|
||||
idx: u32,
|
||||
end: u32,
|
||||
vec: &'v Vec<T>,
|
||||
}
|
||||
impl<T> Iter<'_, T> {
|
||||
pub fn end(&self) -> u32 {
|
||||
self.end
|
||||
}
|
||||
}
|
||||
|
||||
impl<'v, T> Iterator for Iter<'v, T> {
|
||||
type Item = SnapshotItem<'v, T>;
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(
|
||||
(self.end - self.idx) as usize,
|
||||
Some((self.end - self.idx) as usize),
|
||||
)
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Option<SnapshotItem<'v, T>> {
|
||||
if self.end == self.idx {
|
||||
return None;
|
||||
}
|
||||
debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end);
|
||||
debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Acquire));
|
||||
|
||||
loop {
|
||||
let entries = unsafe {
|
||||
self.vec
|
||||
.buckets
|
||||
.get_unchecked(self.location.bucket as usize)
|
||||
.entries
|
||||
.load(Ordering::Acquire)
|
||||
};
|
||||
debug_assert!(self.location.bucket < BUCKETS);
|
||||
|
||||
if self.location.entry < self.location.bucket_len {
|
||||
if entries.is_null() {
|
||||
// we still want to yield these
|
||||
let index = self.idx;
|
||||
self.location.entry += 1;
|
||||
self.idx += 1;
|
||||
return Some((index, None));
|
||||
}
|
||||
// safety: bounds and null checked above
|
||||
let entry = unsafe { Bucket::get(entries, self.location.entry, self.vec.columns) };
|
||||
let index = self.idx;
|
||||
self.location.entry += 1;
|
||||
self.idx += 1;
|
||||
|
||||
let entry = unsafe {
|
||||
(*entry)
|
||||
.active
|
||||
.load(Ordering::Acquire)
|
||||
.then(|| Entry::read(entry, self.vec.columns))
|
||||
};
|
||||
return Some((index, entry));
|
||||
}
|
||||
|
||||
self.location.entry = 0;
|
||||
self.location.bucket += 1;
|
||||
|
||||
if self.location.bucket < BUCKETS {
|
||||
self.location.bucket_len = Location::bucket_len(self.location.bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T> ExactSizeIterator for Iter<'_, T> {}
|
||||
impl<T> DoubleEndedIterator for Iter<'_, T> {
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ParIter<'v, T> {
|
||||
end: u32,
|
||||
start: u32,
|
||||
vec: &'v Vec<T>,
|
||||
}
|
||||
impl<'v, T> ParIter<'v, T> {
|
||||
pub fn end(&self) -> u32 {
|
||||
self.end
|
||||
}
|
||||
}
|
||||
|
||||
impl<'v, T: Send + Sync> rayon::iter::ParallelIterator for ParIter<'v, T> {
|
||||
type Item = SnapshotItem<'v, T>;
|
||||
|
||||
fn drive_unindexed<C>(self, consumer: C) -> C::Result
|
||||
where
|
||||
C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
|
||||
{
|
||||
rayon::iter::plumbing::bridge(self, consumer)
|
||||
}
|
||||
|
||||
fn opt_len(&self) -> Option<usize> {
|
||||
Some((self.end - self.start) as usize)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send + Sync> rayon::iter::IndexedParallelIterator for ParIter<'_, T> {
|
||||
fn len(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
|
||||
fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
|
||||
rayon::iter::plumbing::bridge(self, consumer)
|
||||
}
|
||||
|
||||
fn with_producer<CB>(self, callback: CB) -> CB::Output
|
||||
where
|
||||
CB: rayon::iter::plumbing::ProducerCallback<Self::Item>,
|
||||
{
|
||||
callback.callback(ParIterProducer {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
vec: self.vec,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct ParIterProducer<'v, T: Send> {
|
||||
start: u32,
|
||||
end: u32,
|
||||
vec: &'v Vec<T>,
|
||||
}
|
||||
|
||||
impl<'v, T: 'v + Send + Sync> rayon::iter::plumbing::Producer for ParIterProducer<'v, T> {
|
||||
type Item = SnapshotItem<'v, T>;
|
||||
type IntoIter = Iter<'v, T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
debug_assert!(self.start <= self.end);
|
||||
Iter {
|
||||
location: Location::of(self.start),
|
||||
idx: self.start,
|
||||
end: self.end,
|
||||
vec: self.vec,
|
||||
}
|
||||
}
|
||||
|
||||
fn split_at(self, index: usize) -> (Self, Self) {
|
||||
assert!(index <= (self.end - self.start) as usize);
|
||||
let index = index as u32;
|
||||
(
|
||||
ParIterProducer {
|
||||
start: self.start,
|
||||
end: self.start + index,
|
||||
vec: self.vec,
|
||||
},
|
||||
ParIterProducer {
|
||||
start: self.start + index,
|
||||
end: self.end,
|
||||
vec: self.vec,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct Bucket<T> {
|
||||
entries: AtomicPtr<Entry<T>>,
|
||||
}
|
||||
|
||||
impl<T> Bucket<T> {
|
||||
fn layout(len: u32, layout: Layout) -> Layout {
|
||||
Layout::from_size_align(layout.size() * len as usize, layout.align())
|
||||
.expect("exceeded maximum allocation size")
|
||||
}
|
||||
|
||||
unsafe fn alloc(len: u32, cols: u32) -> *mut Entry<T> {
|
||||
let layout = Entry::<T>::layout(cols);
|
||||
let arr_layout = Self::layout(len, layout);
|
||||
let entries = std::alloc::alloc(arr_layout);
|
||||
if entries.is_null() {
|
||||
std::alloc::handle_alloc_error(arr_layout)
|
||||
}
|
||||
|
||||
for i in 0..len {
|
||||
let active = entries.add(i as usize * layout.size()) as *mut AtomicBool;
|
||||
active.write(AtomicBool::new(false))
|
||||
}
|
||||
entries as *mut Entry<T>
|
||||
}
|
||||
|
||||
unsafe fn dealloc(entries: *mut Entry<T>, len: u32, cols: u32) {
|
||||
let layout = Entry::<T>::layout(cols);
|
||||
let arr_layout = Self::layout(len, layout);
|
||||
for i in 0..len {
|
||||
let entry = Bucket::get(entries, i, cols);
|
||||
if *(*entry).active.get_mut() {
|
||||
ptr::drop_in_place((*(*entry).slot.get()).as_mut_ptr());
|
||||
for matcher_col in Entry::matcher_cols_raw(entry, cols) {
|
||||
ptr::drop_in_place((*matcher_col.get()).as_mut_ptr());
|
||||
}
|
||||
}
|
||||
}
|
||||
std::alloc::dealloc(entries as *mut u8, arr_layout)
|
||||
}
|
||||
|
||||
unsafe fn get(entries: *mut Entry<T>, idx: u32, cols: u32) -> *mut Entry<T> {
|
||||
let layout = Entry::<T>::layout(cols);
|
||||
let ptr = entries as *mut u8;
|
||||
ptr.add(layout.size() * idx as usize) as *mut Entry<T>
|
||||
}
|
||||
|
||||
fn new(entries: *mut Entry<T>) -> Bucket<T> {
|
||||
Bucket {
|
||||
entries: AtomicPtr::new(entries),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
struct Entry<T> {
|
||||
active: AtomicBool,
|
||||
slot: UnsafeCell<MaybeUninit<T>>,
|
||||
tail: [UnsafeCell<MaybeUninit<Utf32String>>; 0],
|
||||
}
|
||||
|
||||
impl<T> Entry<T> {
|
||||
fn layout(cols: u32) -> Layout {
|
||||
let head = Layout::new::<Self>();
|
||||
let tail = Layout::array::<Utf32String>(cols as usize).expect("invalid memory layout");
|
||||
head.extend(tail)
|
||||
.expect("invalid memory layout")
|
||||
.0
|
||||
.pad_to_align()
|
||||
}
|
||||
|
||||
unsafe fn matcher_cols_raw<'a>(
|
||||
ptr: *mut Entry<T>,
|
||||
cols: u32,
|
||||
) -> &'a [UnsafeCell<MaybeUninit<Utf32String>>] {
|
||||
// this whole thing looks weird. The reason we do this is that
|
||||
// we must make sure the pointer retains its provenance which may (or may not?)
|
||||
// be lost if we used tail.as_ptr()
|
||||
let tail = std::ptr::addr_of!((*ptr).tail) as *const u8;
|
||||
let offset = tail.offset_from(ptr as *mut u8) as usize;
|
||||
let ptr = (ptr as *mut u8).add(offset) as *mut _;
|
||||
slice::from_raw_parts(ptr, cols as usize)
|
||||
}
|
||||
|
||||
unsafe fn matcher_cols_mut<'a>(ptr: *mut Entry<T>, cols: u32) -> &'a mut [Utf32String] {
|
||||
// this whole thing looks weird. The reason we do this is that
|
||||
// we must make sure the pointer retains its provenance which may (or may not?)
|
||||
// be lost if we used tail.as_ptr()
|
||||
let tail = std::ptr::addr_of!((*ptr).tail) as *const u8;
|
||||
let offset = tail.offset_from(ptr as *mut u8) as usize;
|
||||
let ptr = (ptr as *mut u8).add(offset) as *mut _;
|
||||
slice::from_raw_parts_mut(ptr, cols as usize)
|
||||
}
|
||||
// # Safety
|
||||
//
|
||||
// Value must be initialized.
|
||||
unsafe fn read<'a>(ptr: *mut Entry<T>, cols: u32) -> Item<'a, T> {
|
||||
// this whole thing looks weird. The reason we do this is that
|
||||
// we must make sure the pointer retains its provenance which may (or may not?)
|
||||
// be lost if we used tail.as_ptr()
|
||||
let data = (*(*ptr).slot.get()).assume_init_ref();
|
||||
let tail = std::ptr::addr_of!((*ptr).tail) as *const u8;
|
||||
let offset = tail.offset_from(ptr as *mut u8) as usize;
|
||||
let ptr = (ptr as *mut u8).add(offset) as *mut _;
|
||||
let matcher_columns = slice::from_raw_parts(ptr, cols as usize);
|
||||
Item {
|
||||
data,
|
||||
matcher_columns,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Location {
|
||||
// the index of the bucket
|
||||
bucket: u32,
|
||||
// the length of `bucket`
|
||||
bucket_len: u32,
|
||||
// the index of the entry in `bucket`
|
||||
entry: u32,
|
||||
}
|
||||
|
||||
// skip the shorter buckets to avoid unnecessary allocations.
|
||||
// this also reduces the maximum capacity of a vector.
|
||||
const SKIP: u32 = 32;
|
||||
const SKIP_BUCKET: u32 = (u32::BITS - SKIP.leading_zeros()) - 1;
|
||||
|
||||
impl Location {
|
||||
fn of(index: u32) -> Location {
|
||||
let skipped = index.checked_add(SKIP).expect("exceeded maximum length");
|
||||
let bucket = u32::BITS - skipped.leading_zeros();
|
||||
let bucket = bucket - (SKIP_BUCKET + 1);
|
||||
let bucket_len = Location::bucket_len(bucket);
|
||||
let entry = skipped ^ bucket_len;
|
||||
|
||||
Location {
|
||||
bucket,
|
||||
bucket_len,
|
||||
entry,
|
||||
}
|
||||
}
|
||||
|
||||
fn bucket_len(bucket: u32) -> u32 {
|
||||
1 << (bucket + SKIP_BUCKET)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn location() {
|
||||
assert_eq!(Location::bucket_len(0), 32);
|
||||
for i in 0..32 {
|
||||
let loc = Location::of(i);
|
||||
assert_eq!(loc.bucket_len, 32);
|
||||
assert_eq!(loc.bucket, 0);
|
||||
assert_eq!(loc.entry, i);
|
||||
}
|
||||
|
||||
assert_eq!(Location::bucket_len(1), 64);
|
||||
for i in 33..96 {
|
||||
let loc = Location::of(i);
|
||||
assert_eq!(loc.bucket_len, 64);
|
||||
assert_eq!(loc.bucket, 1);
|
||||
assert_eq!(loc.entry, i - 32);
|
||||
}
|
||||
|
||||
assert_eq!(Location::bucket_len(2), 128);
|
||||
for i in 96..224 {
|
||||
let loc = Location::of(i);
|
||||
assert_eq!(loc.bucket_len, 128);
|
||||
assert_eq!(loc.bucket, 2);
|
||||
assert_eq!(loc.entry, i - 96);
|
||||
}
|
||||
|
||||
let max = Location::of(MAX_ENTRIES);
|
||||
assert_eq!(max.bucket, BUCKETS - 1);
|
||||
assert_eq!(max.bucket_len, 1 << 31);
|
||||
assert_eq!(max.entry, (1 << 31) - 1);
|
||||
}
|
||||
}
|
139
src/items.rs
139
src/items.rs
@ -1,140 +1 @@
|
||||
use std::mem::swap;
|
||||
use std::ptr::NonNull;
|
||||
|
||||
use crate::Utf32String;
|
||||
|
||||
pub(crate) struct ItemCache {
|
||||
live: Vec<Item>,
|
||||
evicted: Vec<Item>,
|
||||
}
|
||||
impl ItemCache {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
live: Vec::with_capacity(1024),
|
||||
evicted: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
if self.evicted.is_empty() {
|
||||
self.evicted.reserve(1024);
|
||||
swap(&mut self.evicted, &mut self.live)
|
||||
} else {
|
||||
self.evicted.append(&mut self.live)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cleared(&self) -> bool {
|
||||
!self.evicted.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn push(&mut self, item: Box<[Utf32String]>) {
|
||||
self.live.push(Item {
|
||||
cols: Box::leak(item).into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get(&mut self) -> &mut [Item] {
|
||||
&mut self.live
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
pub struct Item {
|
||||
// TODO: small vec optimization??
|
||||
cols: NonNull<[Utf32String]>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Item {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ItemText")
|
||||
.field("cols", &self.cols())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for Item {}
|
||||
unsafe impl Sync for Item {}
|
||||
|
||||
impl Item {
|
||||
pub fn cols(&self) -> &[Utf32String] {
|
||||
// safety: cols is basically a box and treated the same as a box,
|
||||
// however there can be other references so using a box (unique ptr)
|
||||
// would be an alias violation
|
||||
unsafe { self.cols.as_ref() }
|
||||
}
|
||||
}
|
||||
impl Drop for Item {
|
||||
fn drop(&mut self) {
|
||||
// safety: cols is basically a box and treated the same as a box,
|
||||
// however there can be other references (that won't be accessed
|
||||
// anymore at this point) so using a box (unique ptr) would be an alias
|
||||
// violation
|
||||
unsafe { drop(Box::from_raw(self.cols.as_ptr())) }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct ItemSnapshot {
|
||||
cols: NonNull<[Utf32String]>,
|
||||
pub(crate) len: u32,
|
||||
}
|
||||
|
||||
unsafe impl Send for ItemSnapshot {}
|
||||
unsafe impl Sync for ItemSnapshot {}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ItemsSnapshot {
|
||||
items: Vec<ItemSnapshot>,
|
||||
}
|
||||
|
||||
impl ItemsSnapshot {
|
||||
pub(crate) fn new(items: &ItemCache) -> Self {
|
||||
Self {
|
||||
items: items
|
||||
.live
|
||||
.iter()
|
||||
.map(|item| ItemSnapshot {
|
||||
cols: item.cols,
|
||||
len: item.cols().iter().map(|s| s.len() as u32).sum(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn outdated(&self, items: &ItemCache) -> bool {
|
||||
items.live.len() != self.items.len()
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.items.len()
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, items: &ItemCache) -> bool {
|
||||
let cleared = !items.evicted.is_empty();
|
||||
// drop in another thread to ensure we don't wait for a long drop here
|
||||
if cleared {
|
||||
self.items.clear();
|
||||
};
|
||||
let start = self.items.len();
|
||||
self.items
|
||||
.extend(items.live[start..].iter().map(|item| ItemSnapshot {
|
||||
cols: item.cols,
|
||||
len: item.cols().iter().map(|s| s.len() as u32).sum(),
|
||||
}));
|
||||
cleared
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn get(&self) -> &[ItemSnapshot] {
|
||||
&self.items
|
||||
}
|
||||
}
|
||||
|
||||
impl ItemSnapshot {
|
||||
pub(crate) fn cols(&self) -> &[Utf32String] {
|
||||
// safety: we only hand out ItemSnapshot ranges
|
||||
// if the caller asserted via the unsafe ItemsSnapshot::get
|
||||
// function that the pointers are valid
|
||||
unsafe { self.cols.as_ref() }
|
||||
}
|
||||
}
|
||||
|
221
src/lib.rs
221
src/lib.rs
@ -1,24 +1,71 @@
|
||||
use std::cmp::Reverse;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{self, AtomicBool};
|
||||
use std::sync::atomic::{self, AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::items::{Item, ItemCache};
|
||||
use crate::worker::Worker;
|
||||
use parking_lot::lock_api::ArcMutexGuard;
|
||||
use parking_lot::Mutex;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
pub use crate::pattern::{CaseMatching, MultiPattern, Pattern, PatternKind};
|
||||
pub use crate::utf32_string::Utf32String;
|
||||
use crate::worker::Woker;
|
||||
pub use nucleo_matcher::{chars, Matcher, MatcherConfig, Utf32Str};
|
||||
|
||||
mod items;
|
||||
mod boxcar;
|
||||
mod pattern;
|
||||
mod utf32_string;
|
||||
mod worker;
|
||||
pub use nucleo_matcher::{chars, Matcher, MatcherConfig, Utf32Str};
|
||||
|
||||
use parking_lot::{Mutex, MutexGuard, RawMutex};
|
||||
pub struct Item<'a, T> {
|
||||
pub data: &'a T,
|
||||
pub matcher_columns: &'a [Utf32String],
|
||||
}
|
||||
|
||||
pub struct Injector<T> {
|
||||
items: Arc<boxcar::Vec<T>>,
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
}
|
||||
|
||||
impl<T> Clone for Injector<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Injector {
|
||||
items: self.items.clone(),
|
||||
notify: self.notify.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Injector<T> {
|
||||
/// Appends an element to the back of the vector.
|
||||
pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 {
|
||||
let idx = self.items.push(value, fill_columns);
|
||||
(self.notify)();
|
||||
idx
|
||||
}
|
||||
|
||||
/// Returns the total number of items in the current
|
||||
/// queue
|
||||
pub fn injected_items(&self) -> u32 {
|
||||
self.items.count()
|
||||
}
|
||||
|
||||
/// Returns a reference to the item at the given index.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Item at `index` must be initialized. That means you must have observed
|
||||
/// `push` returning this value or `get` retunring `Some` for this value.
|
||||
/// Just because a later index is initialized doesn't mean that this index
|
||||
/// is initialized
|
||||
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
|
||||
self.items.get_unchecked(index)
|
||||
}
|
||||
|
||||
/// Returns a reference to the element at the given index.
|
||||
pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
|
||||
self.items.get(index)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
|
||||
pub struct Match {
|
||||
@ -32,139 +79,134 @@ pub struct Status {
|
||||
pub running: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Items<T> {
|
||||
cache: Arc<Mutex<ItemCache>>,
|
||||
items: Arc<Mutex<Vec<T>>>,
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
}
|
||||
|
||||
impl<T: Sync + Send> Items<T> {
|
||||
pub fn clear(&mut self) {
|
||||
self.items.lock().clear();
|
||||
self.cache.lock().clear();
|
||||
}
|
||||
|
||||
pub fn append(&mut self, items: impl Iterator<Item = (T, Box<[Utf32String]>)>) {
|
||||
let mut cache = self.cache.lock();
|
||||
let mut items_ = self.items.lock();
|
||||
items_.extend(items.map(|(item, text)| {
|
||||
cache.push(text);
|
||||
item
|
||||
}));
|
||||
// notify that a new tick will be necessary
|
||||
(self.notify)();
|
||||
}
|
||||
|
||||
pub fn get(&self) -> impl Deref<Target = [T]> + '_ {
|
||||
MutexGuard::map(self.items.lock(), |items| items.as_mut_slice())
|
||||
}
|
||||
|
||||
pub fn get_matcher_items(&self) -> impl Deref<Target = [Item]> + '_ {
|
||||
MutexGuard::map(self.cache.lock(), |items| items.get())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Nucleo<T: Sync + Send> {
|
||||
pub struct Nucleo<T: Sync + Send + 'static> {
|
||||
// the way the API is build we totally don't actually need these to be Arcs
|
||||
// but this lets us avoid some unsafe
|
||||
worker: Arc<Mutex<Worker>>,
|
||||
canceled: Arc<AtomicBool>,
|
||||
should_notify: Arc<AtomicBool>,
|
||||
worker: Arc<Mutex<Woker<T>>>,
|
||||
pool: ThreadPool,
|
||||
pub items: Items<T>,
|
||||
cleared: bool,
|
||||
item_count: u32,
|
||||
pub matches: Vec<Match>,
|
||||
pub pattern: MultiPattern,
|
||||
should_notify: Arc<AtomicBool>,
|
||||
pub notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
items: Arc<boxcar::Vec<T>>,
|
||||
}
|
||||
|
||||
impl<T: Sync + Send> Nucleo<T> {
|
||||
impl<T: Sync + Send + 'static> Nucleo<T> {
|
||||
pub fn new(
|
||||
config: MatcherConfig,
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
num_threads: Option<usize>,
|
||||
case_matching: CaseMatching,
|
||||
cols: usize,
|
||||
items: impl Iterator<Item = (T, Box<[Utf32String]>)>,
|
||||
columns: u32,
|
||||
) -> Self {
|
||||
let mut cache = ItemCache::new();
|
||||
let items: Vec<_> = items
|
||||
.map(|(item, text)| {
|
||||
cache.push(text);
|
||||
item
|
||||
})
|
||||
.collect();
|
||||
let matches: Vec<_> = (0..items.len())
|
||||
.map(|i| Match {
|
||||
score: 0,
|
||||
idx: i as u32,
|
||||
})
|
||||
.collect();
|
||||
let (pool, worker) =
|
||||
Worker::new(notify.clone(), num_threads, config, matches.clone(), &cache);
|
||||
let (pool, worker) = Woker::new(num_threads, config, notify.clone(), columns);
|
||||
Self {
|
||||
canceled: worker.canceled.clone(),
|
||||
should_notify: worker.should_notify.clone(),
|
||||
items: Items {
|
||||
cache: Arc::new(Mutex::new(cache)),
|
||||
items: Arc::new(Mutex::new(items)),
|
||||
notify,
|
||||
},
|
||||
items: worker.items.clone(),
|
||||
pool,
|
||||
matches,
|
||||
pattern: MultiPattern::new(&config, case_matching, cols),
|
||||
matches: Vec::with_capacity(2 * 1024),
|
||||
pattern: MultiPattern::new(&config, case_matching, columns as usize),
|
||||
worker: Arc::new(Mutex::new(worker)),
|
||||
cleared: false,
|
||||
item_count: 0,
|
||||
notify,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total number of items
|
||||
pub fn item_count(&self) -> u32 {
|
||||
self.item_count
|
||||
}
|
||||
|
||||
pub fn injector(&self) -> Injector<T> {
|
||||
Injector {
|
||||
items: self.items.clone(),
|
||||
notify: self.notify.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the item at the given index.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Item at `index` must be initialized. That means you must have observed
|
||||
/// `push` returning this value or `get` retunring `Some` for this value.
|
||||
/// Just because a later index is initialized doesn't mean that this index
|
||||
/// is initialized
|
||||
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
|
||||
self.items.get_unchecked(index)
|
||||
}
|
||||
|
||||
/// Returns a reference to the element at the given index.
|
||||
pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
|
||||
self.items.get(index)
|
||||
}
|
||||
|
||||
/// Clears all items
|
||||
pub fn clear(&mut self) {
|
||||
self.canceled.store(true, Ordering::Relaxed);
|
||||
self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
|
||||
self.cleared = true
|
||||
}
|
||||
|
||||
pub fn update_config(&mut self, config: MatcherConfig) {
|
||||
self.worker.lock().update_config(config)
|
||||
}
|
||||
|
||||
pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 {
|
||||
let idx = self.items.push(value, fill_columns);
|
||||
(self.notify)();
|
||||
idx
|
||||
}
|
||||
|
||||
pub fn tick(&mut self, timeout: u64) -> Status {
|
||||
self.should_notify.store(false, atomic::Ordering::Relaxed);
|
||||
let status = self.pattern.status();
|
||||
let items = self.items.cache.lock_arc();
|
||||
let canceled = status != pattern::Status::Unchanged || items.cleared();
|
||||
let res = self.tick_inner(timeout, canceled, items, status);
|
||||
let canceled = status != pattern::Status::Unchanged || self.cleared;
|
||||
let res = self.tick_inner(timeout, canceled, status);
|
||||
self.cleared = false;
|
||||
if !canceled {
|
||||
self.should_notify.store(true, atomic::Ordering::Relaxed);
|
||||
return res;
|
||||
}
|
||||
let items = self.items.cache.lock_arc();
|
||||
let res = self.tick_inner(timeout, false, items, pattern::Status::Unchanged);
|
||||
self.should_notify.store(true, atomic::Ordering::Relaxed);
|
||||
res
|
||||
self.tick_inner(timeout, false, pattern::Status::Unchanged)
|
||||
}
|
||||
|
||||
fn tick_inner(
|
||||
&mut self,
|
||||
timeout: u64,
|
||||
canceled: bool,
|
||||
items: ArcMutexGuard<RawMutex, ItemCache>,
|
||||
status: pattern::Status,
|
||||
) -> Status {
|
||||
fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
|
||||
let mut inner = if canceled {
|
||||
self.pattern.reset_status();
|
||||
self.canceled.store(true, atomic::Ordering::Relaxed);
|
||||
self.worker.lock_arc()
|
||||
} else {
|
||||
let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
|
||||
self.should_notify.store(true, Ordering::Release);
|
||||
return Status{ changed: false, running: true };
|
||||
};
|
||||
worker
|
||||
};
|
||||
|
||||
let changed = inner.running;
|
||||
|
||||
let running = canceled || self.items.count() > inner.item_count();
|
||||
if inner.running {
|
||||
inner.running = false;
|
||||
self.matches.clone_from(&inner.matches);
|
||||
if !inner.was_canceled {
|
||||
self.item_count = inner.item_count();
|
||||
self.matches.clone_from(&inner.matches);
|
||||
}
|
||||
}
|
||||
|
||||
let running = canceled || inner.items.outdated(&items);
|
||||
if running {
|
||||
inner.pattern.clone_from(&self.pattern);
|
||||
self.canceled.store(false, atomic::Ordering::Relaxed);
|
||||
self.pool.spawn(move || unsafe { inner.run(items, status) })
|
||||
if !canceled {
|
||||
self.should_notify.store(true, atomic::Ordering::Release);
|
||||
}
|
||||
let cleared = self.cleared;
|
||||
self.pool
|
||||
.spawn(move || unsafe { inner.run(status, cleared) })
|
||||
}
|
||||
Status { changed, running }
|
||||
}
|
||||
@ -181,6 +223,7 @@ impl<T: Sync + Send> Drop for Nucleo<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// convenicne function to easily fuzzy match
|
||||
/// on a (relatively small list of inputs). This is not recommended for building a full tui
|
||||
/// application that can match large numbers of matches as all matching is done on the current
|
||||
|
@ -174,10 +174,10 @@ impl MultiPattern {
|
||||
pub fn new(
|
||||
matcher_config: &MatcherConfig,
|
||||
case_matching: CaseMatching,
|
||||
cols: usize,
|
||||
columns: usize,
|
||||
) -> MultiPattern {
|
||||
MultiPattern {
|
||||
cols: vec![Pattern::new(matcher_config, case_matching); cols],
|
||||
cols: vec![Pattern::new(matcher_config, case_matching); columns],
|
||||
}
|
||||
}
|
||||
|
||||
|
194
src/worker.rs
194
src/worker.rs
@ -3,13 +3,11 @@ use std::sync::atomic::{self, AtomicBool};
|
||||
use std::sync::Arc;
|
||||
|
||||
use nucleo_matcher::MatcherConfig;
|
||||
use parking_lot::lock_api::ArcMutexGuard;
|
||||
use parking_lot::RawMutex;
|
||||
use parking_lot::Mutex;
|
||||
use rayon::{prelude::*, ThreadPool};
|
||||
|
||||
use crate::items::{ItemCache, ItemsSnapshot};
|
||||
use crate::pattern::{self, MultiPattern};
|
||||
use crate::Match;
|
||||
use crate::{boxcar, Match};
|
||||
|
||||
struct Matchers(Box<[UnsafeCell<nucleo_matcher::Matcher>]>);
|
||||
|
||||
@ -24,18 +22,24 @@ impl Matchers {
|
||||
unsafe impl Sync for Matchers {}
|
||||
unsafe impl Send for Matchers {}
|
||||
|
||||
pub(crate) struct Worker {
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
pub(crate) struct Woker<T: Sync + Send + 'static> {
|
||||
pub(crate) running: bool,
|
||||
pub(crate) items: ItemsSnapshot,
|
||||
matchers: Matchers,
|
||||
pub(crate) matches: Vec<Match>,
|
||||
pub(crate) pattern: MultiPattern,
|
||||
pub(crate) canceled: Arc<AtomicBool>,
|
||||
pub(crate) should_notify: Arc<AtomicBool>,
|
||||
pub(crate) was_canceled: bool,
|
||||
pub(crate) last_snapshot: u32,
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
pub(crate) items: Arc<boxcar::Vec<T>>,
|
||||
in_flight: Vec<u32>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
impl<T: Sync + Send + 'static> Woker<T> {
|
||||
pub(crate) fn item_count(&self) -> u32 {
|
||||
self.last_snapshot - self.in_flight.len() as u32
|
||||
}
|
||||
pub(crate) fn update_config(&mut self, config: MatcherConfig) {
|
||||
for matcher in self.matchers.0.iter_mut() {
|
||||
matcher.get_mut().config = config;
|
||||
@ -43,12 +47,11 @@ impl Worker {
|
||||
}
|
||||
|
||||
pub(crate) fn new(
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
worker_threads: Option<usize>,
|
||||
config: MatcherConfig,
|
||||
matches: Vec<Match>,
|
||||
items: &ItemCache,
|
||||
) -> (ThreadPool, Worker) {
|
||||
notify: Arc<(dyn Fn() + Sync + Send)>,
|
||||
cols: u32,
|
||||
) -> (ThreadPool, Self) {
|
||||
let worker_threads = worker_threads
|
||||
.unwrap_or_else(|| std::thread::available_parallelism().map_or(4, |it| it.get()));
|
||||
let pool = rayon::ThreadPoolBuilder::new()
|
||||
@ -59,96 +62,161 @@ impl Worker {
|
||||
let matchers = (0..worker_threads)
|
||||
.map(|_| UnsafeCell::new(nucleo_matcher::Matcher::new(config)))
|
||||
.collect();
|
||||
let worker = Worker {
|
||||
notify,
|
||||
let worker = Woker {
|
||||
running: false,
|
||||
items: ItemsSnapshot::new(items),
|
||||
matchers: Matchers(matchers),
|
||||
matches,
|
||||
last_snapshot: 0,
|
||||
matches: Vec::new(),
|
||||
// just a placeholder
|
||||
pattern: MultiPattern::new(&config, crate::CaseMatching::Ignore, 0),
|
||||
canceled: Arc::new(AtomicBool::new(false)),
|
||||
should_notify: Arc::new(AtomicBool::new(false)),
|
||||
was_canceled: false,
|
||||
notify,
|
||||
items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)),
|
||||
in_flight: Vec::with_capacity(64),
|
||||
};
|
||||
(pool, worker)
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn run(
|
||||
&mut self,
|
||||
items_lock: ArcMutexGuard<RawMutex, ItemCache>,
|
||||
query_status: pattern::Status,
|
||||
) {
|
||||
self.running = true;
|
||||
let mut last_scored_item = self.items.len();
|
||||
let cleared = self.items.update(&items_lock);
|
||||
drop(items_lock);
|
||||
|
||||
// TODO: be smarter around reusing past results for rescoring
|
||||
if cleared || query_status == pattern::Status::Rescore {
|
||||
self.matches.clear();
|
||||
last_scored_item = 0;
|
||||
}
|
||||
unsafe fn process_new_items(&mut self) {
|
||||
let matchers = &self.matchers;
|
||||
let pattern = &self.pattern;
|
||||
let items = unsafe { self.items.get() };
|
||||
self.matches.reserve(self.in_flight.len());
|
||||
self.in_flight.retain(|&idx| {
|
||||
let Some(item) = self.items.get(idx) else {
|
||||
return true;
|
||||
};
|
||||
let Some(score) = pattern.score(item.matcher_columns, matchers.get()) else {
|
||||
return false;
|
||||
};
|
||||
self.matches.push(Match { score, idx });
|
||||
false
|
||||
});
|
||||
let new_snapshot = self.items.par_snapshot(self.last_snapshot);
|
||||
if new_snapshot.end() != self.last_snapshot {
|
||||
let end = new_snapshot.end();
|
||||
let in_flight = Mutex::new(&mut self.in_flight);
|
||||
let items = new_snapshot.filter_map(|(idx, item)| {
|
||||
let Some(item) = item else {
|
||||
in_flight.lock().push(idx);
|
||||
return None;
|
||||
};
|
||||
let score = if self.canceled.load(atomic::Ordering::Relaxed) {
|
||||
0
|
||||
} else {
|
||||
pattern.score(item.matcher_columns, matchers.get())?
|
||||
};
|
||||
Some(Match { score, idx })
|
||||
});
|
||||
self.matches.par_extend(items);
|
||||
self.last_snapshot = end;
|
||||
}
|
||||
}
|
||||
|
||||
if self.pattern.cols.iter().all(|pat| pat.is_empty()) {
|
||||
fn remove_in_flight_matches(&mut self) {
|
||||
let mut off = 0;
|
||||
self.in_flight.retain(|&i| {
|
||||
let is_in_flight = self.items.get(i).is_none();
|
||||
if is_in_flight {
|
||||
self.matches.remove((i - off) as usize);
|
||||
off += 1;
|
||||
}
|
||||
is_in_flight
|
||||
});
|
||||
}
|
||||
|
||||
unsafe fn process_new_items_trivial(&mut self) {
|
||||
let new_snapshot = self.items.snapshot(self.last_snapshot);
|
||||
if new_snapshot.end() != self.last_snapshot {
|
||||
let end = new_snapshot.end();
|
||||
let items = new_snapshot.filter_map(|(idx, item)| {
|
||||
if item.is_none() {
|
||||
self.in_flight.push(idx);
|
||||
return None;
|
||||
};
|
||||
Some(Match { score: 0, idx })
|
||||
});
|
||||
self.matches.extend(items);
|
||||
self.last_snapshot = end;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn run(&mut self, pattern_status: pattern::Status, cleared: bool) {
|
||||
self.running = true;
|
||||
self.was_canceled = false;
|
||||
|
||||
if cleared {
|
||||
self.last_snapshot = 0;
|
||||
}
|
||||
|
||||
// TODO: be smarter around reusing past results for rescoring
|
||||
let empty_pattern = self.pattern.cols.iter().all(|pat| pat.is_empty());
|
||||
if empty_pattern {
|
||||
self.matches.clear();
|
||||
self.matches.extend((0..items.len()).map(|i| Match {
|
||||
score: 0,
|
||||
idx: i as u32,
|
||||
}));
|
||||
if self.should_notify.load(atomic::Ordering::Relaxed) {
|
||||
self.matches
|
||||
.extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx }));
|
||||
// there are usually only very few in flight items (one for each writer)
|
||||
self.remove_in_flight_matches();
|
||||
self.process_new_items_trivial();
|
||||
if self.should_notify.load(atomic::Ordering::Acquire) {
|
||||
(self.notify)();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if query_status != pattern::Status::Unchanged && !self.matches.is_empty() {
|
||||
|
||||
self.process_new_items();
|
||||
if pattern_status == pattern::Status::Rescore {
|
||||
self.matches.clear();
|
||||
self.matches
|
||||
.extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx }));
|
||||
self.remove_in_flight_matches();
|
||||
}
|
||||
|
||||
let matchers = &self.matchers;
|
||||
let pattern = &self.pattern;
|
||||
if pattern_status != pattern::Status::Unchanged && !self.matches.is_empty() {
|
||||
self.matches
|
||||
.par_iter_mut()
|
||||
.take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed))
|
||||
.for_each(|match_| {
|
||||
let item = &items[match_.idx as usize];
|
||||
// safety: in-flight items are never added to the matches
|
||||
let item = self.items.get_unchecked(match_.idx);
|
||||
match_.score = pattern
|
||||
.score(item.cols(), unsafe { matchers.get() })
|
||||
.score(item.matcher_columns, matchers.get())
|
||||
.unwrap_or(u32::MAX);
|
||||
});
|
||||
// TODO: do this in parallel?
|
||||
self.matches.retain(|m| m.score != u32::MAX);
|
||||
}
|
||||
if last_scored_item != self.items.len() {
|
||||
let items = items[last_scored_item..]
|
||||
.par_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, item)| {
|
||||
let score = if self.canceled.load(atomic::Ordering::Relaxed) {
|
||||
u32::MAX - 1
|
||||
} else {
|
||||
pattern.score(item.cols(), unsafe { matchers.get() })?
|
||||
};
|
||||
Some(Match {
|
||||
score,
|
||||
idx: i as u32,
|
||||
})
|
||||
});
|
||||
self.matches.par_extend(items);
|
||||
}
|
||||
|
||||
if !self.canceled.load(atomic::Ordering::Relaxed) {
|
||||
if self.canceled.load(atomic::Ordering::Relaxed) {
|
||||
self.was_canceled = true;
|
||||
} else {
|
||||
// TODO: cancel sort in progress?
|
||||
self.matches.par_sort_unstable_by(|match1, match2| {
|
||||
match2.score.cmp(&match1.score).then_with(|| {
|
||||
// the tie breaker is comparitevly rarely needed so we keep it
|
||||
// in a branch especially because we need to access the items
|
||||
// array here which involves some pointer chasing
|
||||
let item1 = &items[match1.idx as usize];
|
||||
let item2 = &items[match2.idx as usize];
|
||||
(item1.len, match1.idx).cmp(&(item2.len, match2.idx))
|
||||
let item1 = self.items.get_unchecked(match1.idx);
|
||||
let item2 = &self.items.get_unchecked(match2.idx);
|
||||
let len1: u32 = item1
|
||||
.matcher_columns
|
||||
.iter()
|
||||
.map(|haystack| haystack.len() as u32)
|
||||
.sum();
|
||||
let len2 = item2
|
||||
.matcher_columns
|
||||
.iter()
|
||||
.map(|haystack| haystack.len() as u32)
|
||||
.sum();
|
||||
(len1, match1.idx).cmp(&(len2, match2.idx))
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
if self.should_notify.load(atomic::Ordering::Relaxed) {
|
||||
if self.should_notify.load(atomic::Ordering::Acquire) {
|
||||
(self.notify)();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user