Encode matcher state as snapshots

The matcher updates its state on every ticker, however thanks to timouts
in the tick function the input and the output state may missmatch. To
make this clearer (and fix some related bugs around restarting/clearing
the items) the current state (like items) is only exposed as a snapshot
now.
This commit is contained in:
Pascal Kuthe 2023-08-05 18:59:06 +02:00
parent 093ecafb01
commit 6127ab86a3
No known key found for this signature in database
GPG Key ID: D715E8655AE166A6

View File

@ -1,4 +1,5 @@
use std::cmp::Reverse;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
@ -80,6 +81,106 @@ pub struct Status {
pub running: bool,
}
pub struct Snapshot<T: Sync + Send + 'static> {
item_count: u32,
matches: Vec<Match>,
pattern: MultiPattern,
items: Arc<boxcar::Vec<T>>,
}
impl<T: Sync + Send + 'static> Snapshot<T> {
fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
self.item_count = 0;
self.matches.clear();
self.items = new_items
}
fn update(&mut self, worker: &Worker<T>) {
self.item_count = worker.item_count();
self.pattern.clone_from(&worker.pattern);
self.matches.clone_from(&worker.matches);
if !Arc::ptr_eq(&worker.items, &self.items) {
self.items = worker.items.clone()
}
}
/// Returns that total number of items
pub fn item_count(&self) -> u32 {
self.item_count
}
/// Returns the pattern which items were matched against
pub fn pattern(&self) -> &MultiPattern {
&self.pattern
}
/// Returns that number of items that matched the pattern
pub fn matched_item_count(&self) -> u32 {
self.matches.len() as u32
}
/// Returns an iteror over the items that correspond to a subrange of
/// all the matches in this snapshot.
///
/// # Panics
/// Panics if `range` has a range bound that is larger than
/// the matched item count
pub fn matched_items(
&self,
range: impl RangeBounds<u32>,
) -> impl Iterator<Item = Item<'_, T>> + ExactSizeIterator + DoubleEndedIterator + '_ {
// TODO: use TAIT
let start = match range.start_bound() {
Bound::Included(&start) => start as usize,
Bound::Excluded(&start) => start as usize + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&end) => end as usize + 1,
Bound::Excluded(&end) => end as usize,
Bound::Unbounded => self.matches.len(),
};
self.matches[start..end]
.iter()
.map(|&m| unsafe { self.items.get_unchecked(m.idx) })
}
/// Returns a reference to the item at the given index.
///
/// # Safety
///
/// Item at `index` must be initialized. That means you must have observed
/// match with the corresponding index in this exact snapshot. Observing
/// a higher index is not enough as item indices can be non-contigously
/// initialized
#[inline]
pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
self.items.get_unchecked(index)
}
/// Returns a reference to the item at the given index.
///
/// Returns `None` if the given `index` is not initialized. This function
/// is only guarteed to return `Some` for item indices that can be found in
/// the `matches` of this struct. Both small and larger indices may returns
/// `None`
#[inline]
pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
self.items.get(index)
}
/// Returns a reference to the nth match.
///
/// Returns `None` if the given `index` is not initialized. This function
/// is only guarteed to return `Some` for item indices that can be found in
/// the `matches` of this struct. Both small and larger indices may returns
/// `None`
#[inline]
pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
self.get_item(self.matches.get(n as usize)?.idx)
}
}
pub struct Nucleo<T: Sync + Send + 'static> {
// the way the API is build we totally don't actually need these to be Arcs
// but this lets us avoid some unsafe
@ -88,12 +189,10 @@ pub struct Nucleo<T: Sync + Send + 'static> {
worker: Arc<Mutex<Worker<T>>>,
pool: ThreadPool,
cleared: bool,
item_count: u32,
pub matches: Vec<Match>,
pub pattern: MultiPattern,
pub last_matched_pattern: MultiPattern,
pub notify: Arc<(dyn Fn() + Sync + Send)>,
items: Arc<boxcar::Vec<T>>,
notify: Arc<(dyn Fn() + Sync + Send)>,
snapshot: Snapshot<T>,
pub pattern: MultiPattern,
}
impl<T: Sync + Send + 'static> Nucleo<T> {
@ -110,19 +209,22 @@ impl<T: Sync + Send + 'static> Nucleo<T> {
should_notify: worker.should_notify.clone(),
items: worker.items.clone(),
pool,
pattern: MultiPattern::new(&config, case_matching, columns as usize),
snapshot: Snapshot {
matches: Vec::with_capacity(2 * 1024),
pattern: MultiPattern::new(&config, case_matching, columns as usize),
last_matched_pattern: MultiPattern::new(&config, case_matching, columns as usize),
item_count: 0,
items: worker.items.clone(),
},
worker: Arc::new(Mutex::new(worker)),
cleared: false,
item_count: 0,
notify,
}
}
/// Returns the total number of items
pub fn item_count(&self) -> u32 {
self.item_count
/// Returns a snapshot of all items
pub fn snapshot(&self) -> &Snapshot<T> {
&self.snapshot
}
pub fn injector(&self) -> Injector<T> {
@ -132,40 +234,30 @@ impl<T: Sync + Send + 'static> Nucleo<T> {
}
}
/// Returns a reference to the item at the given index.
/// Restart the the item stream. Removes all items disconnects all
/// previously created injectors from this instance. If `clear_snapshot` is
/// `true` then all items and matched are removed from the
/// [`Snapshot`](crate::Snapshot) immediately. Otherwise the snapshot will
/// keep the current matches until the matcher has run again.
///
/// # Safety
/// # Note
///
/// Item at `index` must be initialized. That means you must have observed
/// `push` returning this value or `get` returning `Some` for this value.
/// Just because a later index is initialized doesn't mean that this index
/// is initialized
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
self.items.get_unchecked(index)
}
/// Returns a reference to the element at the given index.
pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
self.items.get(index)
}
/// Clears all items
pub fn clear(&mut self) {
/// The injectors will continue to function but they will not affect this
/// instance anymore. The old items will only be dropped when all injectors
/// were dropped.
pub fn restart(&mut self, clear_snapshot: bool) {
self.canceled.store(true, Ordering::Relaxed);
self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
self.cleared = true
self.cleared = true;
if clear_snapshot {
self.snapshot.clear(self.items.clone());
}
}
pub fn update_config(&mut self, config: MatcherConfig) {
self.worker.lock().update_config(config)
}
pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 {
let idx = self.items.push(value, fill_columns);
(self.notify)();
idx
}
pub fn tick(&mut self, timeout: u64) -> Status {
self.should_notify.store(false, atomic::Ordering::Relaxed);
let status = self.pattern.status();
@ -196,10 +288,8 @@ impl<T: Sync + Send + 'static> Nucleo<T> {
let running = canceled || self.items.count() > inner.item_count();
if inner.running {
inner.running = false;
if !inner.was_canceled {
self.item_count = inner.item_count();
self.last_matched_pattern.clone_from(&inner.pattern);
self.matches.clone_from(&inner.matches);
if !inner.was_canceled && !self.cleared {
self.snapshot.update(&inner)
}
}
if running {
@ -209,6 +299,9 @@ impl<T: Sync + Send + 'static> Nucleo<T> {
self.should_notify.store(true, atomic::Ordering::Release);
}
let cleared = self.cleared;
if cleared {
inner.items = self.items.clone();
}
self.pool
.spawn(move || unsafe { inner.run(status, cleared) })
}