Improve cancel speed for large matches

This commit is contained in:
Pascal Kuthe 2023-08-03 02:29:13 +02:00
parent 4f59b0fb91
commit 7a432aa051
No known key found for this signature in database
GPG Key ID: D715E8655AE166A6

View File

@ -1,5 +1,6 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::sync::atomic::{self, AtomicBool}; use std::mem::take;
use std::sync::atomic::{self, AtomicBool, AtomicU32};
use std::sync::Arc; use std::sync::Arc;
use nucleo_matcher::MatcherConfig; use nucleo_matcher::MatcherConfig;
@ -34,6 +35,7 @@ pub(crate) struct Woker<T: Sync + Send + 'static> {
notify: Arc<(dyn Fn() + Sync + Send)>, notify: Arc<(dyn Fn() + Sync + Send)>,
pub(crate) items: Arc<boxcar::Vec<T>>, pub(crate) items: Arc<boxcar::Vec<T>>,
in_flight: Vec<u32>, in_flight: Vec<u32>,
unmatched: AtomicU32,
} }
impl<T: Sync + Send + 'static> Woker<T> { impl<T: Sync + Send + 'static> Woker<T> {
@ -75,6 +77,7 @@ impl<T: Sync + Send + 'static> Woker<T> {
notify, notify,
items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)), items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)),
in_flight: Vec::with_capacity(64), in_flight: Vec::with_capacity(64),
unmatched: AtomicU32::new(0),
}; };
(pool, worker) (pool, worker)
} }
@ -148,11 +151,13 @@ impl<T: Sync + Send + 'static> Woker<T> {
if cleared { if cleared {
self.last_snapshot = 0; self.last_snapshot = 0;
*self.unmatched.get_mut() = 0;
self.matches.clear();
} }
// TODO: be smarter around reusing past results for rescoring // TODO: be smarter around reusing past results for rescoring
let empty_pattern = self.pattern.cols.iter().all(|pat| pat.is_empty()); if self.pattern.cols.iter().all(|pat| pat.is_empty()) {
if empty_pattern { *self.unmatched.get_mut() = 0;
self.matches.clear(); self.matches.clear();
self.matches self.matches
.extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx })); .extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx }));
@ -180,14 +185,19 @@ impl<T: Sync + Send + 'static> Woker<T> {
.par_iter_mut() .par_iter_mut()
.take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed)) .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed))
.for_each(|match_| { .for_each(|match_| {
if match_.idx == u32::MAX {
return;
}
// safety: in-flight items are never added to the matches // safety: in-flight items are never added to the matches
let item = self.items.get_unchecked(match_.idx); let item = self.items.get_unchecked(match_.idx);
match_.score = pattern if let Some(score) = pattern.score(item.matcher_columns, matchers.get()) {
.score(item.matcher_columns, matchers.get()) match_.score = score;
.unwrap_or(u32::MAX); } else {
self.unmatched.fetch_add(1, atomic::Ordering::Release);
match_.score = 0;
match_.idx = u32::MAX;
}
}); });
// TODO: do this in parallel?
self.matches.retain(|m| m.score != u32::MAX);
} }
if self.canceled.load(atomic::Ordering::Relaxed) { if self.canceled.load(atomic::Ordering::Relaxed) {
@ -196,6 +206,9 @@ impl<T: Sync + Send + 'static> Woker<T> {
// TODO: cancel sort in progress? // TODO: cancel sort in progress?
self.matches.par_sort_unstable_by(|match1, match2| { self.matches.par_sort_unstable_by(|match1, match2| {
match2.score.cmp(&match1.score).then_with(|| { match2.score.cmp(&match1.score).then_with(|| {
if match1.idx == u32::MAX || match2.idx == u32::MAX {
return match1.idx.cmp(&match2.idx);
}
// the tie breaker is comparitevly rarely needed so we keep it // the tie breaker is comparitevly rarely needed so we keep it
// in a branch especially because we need to access the items // in a branch especially because we need to access the items
// array here which involves some pointer chasing // array here which involves some pointer chasing
@ -214,6 +227,9 @@ impl<T: Sync + Send + 'static> Woker<T> {
(len1, match1.idx).cmp(&(len2, match2.idx)) (len1, match1.idx).cmp(&(len2, match2.idx))
}) })
}); });
// let old = self.matches.clone();
self.matches
.truncate(self.matches.len() - take(self.unmatched.get_mut()) as usize);
} }
if self.should_notify.load(atomic::Ordering::Acquire) { if self.should_notify.load(atomic::Ordering::Acquire) {