cleanup memory ordering

This commit is contained in:
Pascal Kuthe 2023-08-06 03:44:19 +02:00
parent d500fb90dd
commit ccacf9798c
No known key found for this signature in database
GPG Key ID: D715E8655AE166A6
2 changed files with 18 additions and 15 deletions

View File

@ -24,7 +24,7 @@
use std::alloc::Layout; use std::alloc::Layout;
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::mem::MaybeUninit; use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
use std::{ptr, slice}; use std::{ptr, slice};
use crate::{Item, Utf32String}; use crate::{Item, Utf32String};
@ -41,8 +41,6 @@ pub(crate) struct Vec<T> {
inflight: AtomicU64, inflight: AtomicU64,
/// buckets of length 32, 64 .. 2^31 /// buckets of length 32, 64 .. 2^31
buckets: [Bucket<T>; BUCKETS as usize], buckets: [Bucket<T>; BUCKETS as usize],
/// the number of initialized elements in this vector
count: AtomicU32,
/// the number of matcher columns in this vector, its absolutely critical that /// the number of matcher columns in this vector, its absolutely critical that
/// this remains constant and after initilaziaton (safety invariant) since /// this remains constant and after initilaziaton (safety invariant) since
/// it is used to calculate the Entry layout /// it is used to calculate the Entry layout
@ -69,7 +67,6 @@ impl<T> Vec<T> {
Vec { Vec {
buckets: buckets.map(Bucket::new), buckets: buckets.map(Bucket::new),
inflight: AtomicU64::new(0), inflight: AtomicU64::new(0),
count: AtomicU32::new(0),
columns, columns,
} }
} }
@ -78,8 +75,11 @@ impl<T> Vec<T> {
} }
/// Returns the number of elements in the vector. /// Returns the number of elements in the vector.
#[inline]
pub fn count(&self) -> u32 { pub fn count(&self) -> u32 {
self.count.load(Ordering::Acquire) self.inflight
.load(Ordering::Acquire)
.min(MAX_ENTRIES as u64) as u32
} }
// Returns a reference to the element at the given index. // Returns a reference to the element at the given index.
@ -87,19 +87,23 @@ impl<T> Vec<T> {
// # Safety // # Safety
// //
// Entry at `index` must be initialized. // Entry at `index` must be initialized.
#[inline]
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
let location = Location::of(index); let location = Location::of(index);
// safety: caller guarantees the entry is initialized
unsafe { unsafe {
let entries = self let entries = self
.buckets .buckets
.get_unchecked(location.bucket as usize) .get_unchecked(location.bucket as usize)
.entries .entries
.load(Ordering::Acquire); .load(Ordering::Relaxed);
debug_assert!(!entries.is_null()); debug_assert!(!entries.is_null());
let entry = Bucket::<T>::get(entries, location.entry, self.columns); let entry = Bucket::<T>::get(entries, location.entry, self.columns);
debug_assert!((*entry).active.load(Ordering::Acquire)); // this looks odd but is necessary to ensure cross
// thread synchronizaton (essentailly acting as a memory barrier)
// since the caller must only gurantee that he has observed active on any thread
// but the current thread might still have an old value cached (altough unlikely)
let _ = (*entry).active.load(Ordering::Acquire);
Entry::read(entry, self.columns) Entry::read(entry, self.columns)
} }
} }
@ -114,7 +118,7 @@ impl<T> Vec<T> {
.buckets .buckets
.get_unchecked(location.bucket as usize) .get_unchecked(location.bucket as usize)
.entries .entries
.load(Ordering::Acquire); .load(Ordering::Relaxed);
// bucket is uninitialized // bucket is uninitialized
if entries.is_null() { if entries.is_null() {
@ -175,8 +179,6 @@ impl<T> Vec<T> {
(*entry).active.store(true, Ordering::Release); (*entry).active.store(true, Ordering::Release);
} }
// increase the true count
self.count.fetch_add(1, Ordering::Release);
index index
} }
@ -275,7 +277,7 @@ impl<'v, T> Iterator for Iter<'v, T> {
return None; return None;
} }
debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end); debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end);
debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Acquire)); debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Relaxed));
loop { loop {
let entries = unsafe { let entries = unsafe {
@ -283,7 +285,7 @@ impl<'v, T> Iterator for Iter<'v, T> {
.buckets .buckets
.get_unchecked(self.location.bucket as usize) .get_unchecked(self.location.bucket as usize)
.entries .entries
.load(Ordering::Acquire) .load(Ordering::Relaxed)
}; };
debug_assert!(self.location.bucket < BUCKETS); debug_assert!(self.location.bucket < BUCKETS);

View File

@ -161,7 +161,7 @@ impl<T: Sync + Send + 'static> Worker<T> {
// there are usually only very few in flight items (one for each writer) // there are usually only very few in flight items (one for each writer)
self.remove_in_flight_matches(); self.remove_in_flight_matches();
self.process_new_items_trivial(); self.process_new_items_trivial();
if self.should_notify.load(atomic::Ordering::Acquire) { if self.should_notify.load(atomic::Ordering::Relaxed) {
(self.notify)(); (self.notify)();
} }
return; return;
@ -184,6 +184,7 @@ impl<T: Sync + Send + 'static> Worker<T> {
.take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed)) .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed))
.for_each(|match_| { .for_each(|match_| {
if match_.idx == u32::MAX { if match_.idx == u32::MAX {
debug_assert_eq!(match_.score, 0);
unmatched.fetch_add(1, atomic::Ordering::Relaxed); unmatched.fetch_add(1, atomic::Ordering::Relaxed);
return; return;
} }
@ -240,7 +241,7 @@ impl<T: Sync + Send + 'static> Worker<T> {
} else { } else {
self.matches self.matches
.truncate(self.matches.len() - take(unmatched.get_mut()) as usize); .truncate(self.matches.len() - take(unmatched.get_mut()) as usize);
if self.should_notify.load(atomic::Ordering::Acquire) { if self.should_notify.load(atomic::Ordering::Relaxed) {
(self.notify)(); (self.notify)();
} }
} }