diff --git a/src/lib.rs b/src/lib.rs index eaab53c..f1e9304 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::ops::{Bound, RangeBounds}; use std::sync::atomic::{self, AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -80,6 +81,106 @@ pub struct Status { pub running: bool, } +pub struct Snapshot { + item_count: u32, + matches: Vec, + pattern: MultiPattern, + items: Arc>, +} + +impl Snapshot { + fn clear(&mut self, new_items: Arc>) { + self.item_count = 0; + self.matches.clear(); + self.items = new_items + } + + fn update(&mut self, worker: &Worker) { + self.item_count = worker.item_count(); + self.pattern.clone_from(&worker.pattern); + self.matches.clone_from(&worker.matches); + if !Arc::ptr_eq(&worker.items, &self.items) { + self.items = worker.items.clone() + } + } + + /// Returns that total number of items + pub fn item_count(&self) -> u32 { + self.item_count + } + + /// Returns the pattern which items were matched against + pub fn pattern(&self) -> &MultiPattern { + &self.pattern + } + + /// Returns that number of items that matched the pattern + pub fn matched_item_count(&self) -> u32 { + self.matches.len() as u32 + } + + /// Returns an iteror over the items that correspond to a subrange of + /// all the matches in this snapshot. + /// + /// # Panics + /// Panics if `range` has a range bound that is larger than + /// the matched item count + pub fn matched_items( + &self, + range: impl RangeBounds, + ) -> impl Iterator> + ExactSizeIterator + DoubleEndedIterator + '_ { + // TODO: use TAIT + let start = match range.start_bound() { + Bound::Included(&start) => start as usize, + Bound::Excluded(&start) => start as usize + 1, + Bound::Unbounded => 0, + }; + let end = match range.end_bound() { + Bound::Included(&end) => end as usize + 1, + Bound::Excluded(&end) => end as usize, + Bound::Unbounded => self.matches.len(), + }; + self.matches[start..end] + .iter() + .map(|&m| unsafe { self.items.get_unchecked(m.idx) }) + } + + /// Returns a reference to the item at the given index. + /// + /// # Safety + /// + /// Item at `index` must be initialized. That means you must have observed + /// match with the corresponding index in this exact snapshot. Observing + /// a higher index is not enough as item indices can be non-contigously + /// initialized + #[inline] + pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> { + self.items.get_unchecked(index) + } + + /// Returns a reference to the item at the given index. + /// + /// Returns `None` if the given `index` is not initialized. This function + /// is only guarteed to return `Some` for item indices that can be found in + /// the `matches` of this struct. Both small and larger indices may returns + /// `None` + #[inline] + pub fn get_item(&self, index: u32) -> Option> { + self.items.get(index) + } + + /// Returns a reference to the nth match. + /// + /// Returns `None` if the given `index` is not initialized. This function + /// is only guarteed to return `Some` for item indices that can be found in + /// the `matches` of this struct. Both small and larger indices may returns + /// `None` + #[inline] + pub fn get_matched_item(&self, n: u32) -> Option> { + self.get_item(self.matches.get(n as usize)?.idx) + } +} + pub struct Nucleo { // the way the API is build we totally don't actually need these to be Arcs // but this lets us avoid some unsafe @@ -88,12 +189,10 @@ pub struct Nucleo { worker: Arc>>, pool: ThreadPool, cleared: bool, - item_count: u32, - pub matches: Vec, - pub pattern: MultiPattern, - pub last_matched_pattern: MultiPattern, - pub notify: Arc<(dyn Fn() + Sync + Send)>, items: Arc>, + notify: Arc<(dyn Fn() + Sync + Send)>, + snapshot: Snapshot, + pub pattern: MultiPattern, } impl Nucleo { @@ -110,19 +209,22 @@ impl Nucleo { should_notify: worker.should_notify.clone(), items: worker.items.clone(), pool, - matches: Vec::with_capacity(2 * 1024), pattern: MultiPattern::new(&config, case_matching, columns as usize), - last_matched_pattern: MultiPattern::new(&config, case_matching, columns as usize), + snapshot: Snapshot { + matches: Vec::with_capacity(2 * 1024), + pattern: MultiPattern::new(&config, case_matching, columns as usize), + item_count: 0, + items: worker.items.clone(), + }, worker: Arc::new(Mutex::new(worker)), cleared: false, - item_count: 0, notify, } } - /// Returns the total number of items - pub fn item_count(&self) -> u32 { - self.item_count + /// Returns a snapshot of all items + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot } pub fn injector(&self) -> Injector { @@ -132,40 +234,30 @@ impl Nucleo { } } - /// Returns a reference to the item at the given index. + /// Restart the the item stream. Removes all items disconnects all + /// previously created injectors from this instance. If `clear_snapshot` is + /// `true` then all items and matched are removed from the + /// [`Snapshot`](crate::Snapshot) immediately. Otherwise the snapshot will + /// keep the current matches until the matcher has run again. /// - /// # Safety + /// # Note /// - /// Item at `index` must be initialized. That means you must have observed - /// `push` returning this value or `get` returning `Some` for this value. - /// Just because a later index is initialized doesn't mean that this index - /// is initialized - pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { - self.items.get_unchecked(index) - } - - /// Returns a reference to the element at the given index. - pub fn get(&self, index: u32) -> Option> { - self.items.get(index) - } - - /// Clears all items - pub fn clear(&mut self) { + /// The injectors will continue to function but they will not affect this + /// instance anymore. The old items will only be dropped when all injectors + /// were dropped. + pub fn restart(&mut self, clear_snapshot: bool) { self.canceled.store(true, Ordering::Relaxed); self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns())); - self.cleared = true + self.cleared = true; + if clear_snapshot { + self.snapshot.clear(self.items.clone()); + } } pub fn update_config(&mut self, config: MatcherConfig) { self.worker.lock().update_config(config) } - pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 { - let idx = self.items.push(value, fill_columns); - (self.notify)(); - idx - } - pub fn tick(&mut self, timeout: u64) -> Status { self.should_notify.store(false, atomic::Ordering::Relaxed); let status = self.pattern.status(); @@ -196,10 +288,8 @@ impl Nucleo { let running = canceled || self.items.count() > inner.item_count(); if inner.running { inner.running = false; - if !inner.was_canceled { - self.item_count = inner.item_count(); - self.last_matched_pattern.clone_from(&inner.pattern); - self.matches.clone_from(&inner.matches); + if !inner.was_canceled && !self.cleared { + self.snapshot.update(&inner) } } if running { @@ -209,6 +299,9 @@ impl Nucleo { self.should_notify.store(true, atomic::Ordering::Release); } let cleared = self.cleared; + if cleared { + inner.items = self.items.clone(); + } self.pool .spawn(move || unsafe { inner.run(status, cleared) }) }