//! Adapted from the `boxcar` crate at //! under MIT licenes: //! //! Copyright (c) 2022 Ibraheem Ahmed //! //! Permission is hereby granted, free of charge, to any person obtaining a copy //! of this software and associated documentation files (the "Software"), to deal //! in the Software without restriction, including without limitation the rights //! to use, copy, modify, merge, publish, distribute, sublicense, and/or sell //! copies of the Software, and to permit persons to whom the Software is //! furnished to do so, subject to the following conditions: //! //! The above copyright notice and this permission notice shall be included in all //! copies or substantial portions of the Software. //! //! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR //! IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, //! FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE //! AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER //! LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, //! OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE //! SOFTWARE. use std::alloc::Layout; use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering}; use std::{ptr, slice}; use crate::{Item, Utf32String}; const BUCKETS: u32 = u32::BITS - SKIP_BUCKET; const MAX_ENTRIES: u32 = u32::MAX - SKIP; /// A lock-free, append-only vector. pub(crate) struct Vec { /// a counter used to retrieve a unique index to push to. /// /// this value may be more than the true length as it will /// be incremented before values are actually stored. inflight: AtomicU64, /// buckets of length 32, 64 .. 2^31 buckets: [Bucket; BUCKETS as usize], /// the number of initialized elements in this vector count: AtomicU32, /// the number of matcher columns in this vector, its absoletly critical that /// this remains constant and after initilaziaton (safety invariant) since /// it is used to calculate the Entry layou columns: u32, } impl Vec { /// Constructs a new, empty `Vec` with the specified capacity and matcher columns. pub fn with_capacity(capacity: u32, columns: u32) -> Vec { assert_ne!(columns, 0, "there must be atleast one matcher column"); let init = match capacity { 0 => 0, // initialize enough buckets for `capacity` elements n => Location::of(n).bucket, }; let mut buckets = [ptr::null_mut(); BUCKETS as usize]; for (i, bucket) in buckets[..=init as usize].iter_mut().enumerate() { let len = Location::bucket_len(i as u32); *bucket = unsafe { Bucket::alloc(len, columns) }; } Vec { buckets: buckets.map(Bucket::new), inflight: AtomicU64::new(0), count: AtomicU32::new(0), columns, } } pub fn columns(&self) -> u32 { self.columns } /// Returns the number of elements in the vector. pub fn count(&self) -> u32 { self.count.load(Ordering::Acquire) } // Returns a reference to the element at the given index. // // # Safety // // Entry at `index` must be initialized. pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { let location = Location::of(index); // safety: caller guarantees the entry is initialized unsafe { let entries = self .buckets .get_unchecked(location.bucket as usize) .entries .load(Ordering::Acquire); debug_assert!(!entries.is_null()); let entry = Bucket::::get(entries, location.entry, self.columns); debug_assert!((*entry).active.load(Ordering::Acquire)); Entry::read(entry, self.columns) } } /// Returns a reference to the element at the given index. pub fn get(&self, index: u32) -> Option> { let location = Location::of(index); unsafe { // safety: `location.bucket` is always in bounds let entries = self .buckets .get_unchecked(location.bucket as usize) .entries .load(Ordering::Acquire); // bucket is uninitialized if entries.is_null() { return None; } // safety: `location.entry` is always in bounds for it's bucket let entry = Bucket::::get(entries, location.entry, self.columns); // safety: the entry is active (*entry) .active .load(Ordering::Acquire) .then(|| Entry::read(entry, self.columns)) } } /// Appends an element to the back of the vector. pub fn push(&self, value: T, fill_columns: impl FnOnce(&mut [Utf32String])) -> u32 { let index = self.inflight.fetch_add(1, Ordering::Release); // the inflight counter is a `u64` to catch overflows of the vector'scapacity let index: u32 = index.try_into().expect("overflowed maximum capacity"); let location = Location::of(index); // eagerly allocate the next bucket if we are close to the end of this one if index == (location.bucket_len - (location.bucket_len >> 3)) { if let Some(next_bucket) = self.buckets.get(location.bucket as usize + 1) { Vec::get_or_alloc(next_bucket, location.bucket_len << 1, self.columns); } } // safety: `location.bucket` is always in bounds let bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) }; let mut entries = bucket.entries.load(Ordering::Acquire); // the bucket has not been allocated yet if entries.is_null() { entries = Vec::get_or_alloc(bucket, location.bucket_len, self.columns); } unsafe { // safety: `location.entry` is always in bounds for it's bucket let entry = Bucket::get(entries, location.entry, self.columns); // safety: we have unique access to this entry. // // 1. it is impossible for another thread to attempt a `push` // to this location as we retrieved it from `inflight.fetch_add` // // 2. any thread trying to `get` this entry will see `active == false`, // and will not try to access it (*entry).slot.get().write(MaybeUninit::new(value)); for col in Entry::matcher_cols_raw(entry, self.columns) { col.get().write(MaybeUninit::new(Utf32String::default())) } fill_columns(Entry::matcher_cols_mut(entry, self.columns)); // let other threads know that this entry is active (*entry).active.store(true, Ordering::Release); } // increase the true count self.count.fetch_add(1, Ordering::Release); index } /// race to initialize a bucket fn get_or_alloc(bucket: &Bucket, len: u32, cols: u32) -> *mut Entry { let entries = unsafe { Bucket::alloc(len, cols) }; match bucket.entries.compare_exchange( ptr::null_mut(), entries, Ordering::Release, Ordering::Acquire, ) { Ok(_) => entries, Err(found) => unsafe { Bucket::dealloc(entries, len, cols); found }, } } /// Returns an iterator over the vector starting at `start` /// the iterator is deterministically sized and will not grow /// as more elements are pushed pub unsafe fn snapshot(&self, start: u32) -> Iter<'_, T> { let end = self .inflight .load(Ordering::Acquire) .min(MAX_ENTRIES as u64) as u32; assert!(start <= end, "index {start} is out of bounds!"); Iter { location: Location::of(start), vec: self, idx: start, end, } } /// Returns an iterator over the vector starting at `start` /// the iterator is deterministically sized and will not grow /// as more elements are pushed pub unsafe fn par_snapshot(&self, start: u32) -> ParIter<'_, T> { let end = self .inflight .load(Ordering::Acquire) .min(MAX_ENTRIES as u64) as u32; assert!(start <= end, "index {start} is out of bounds!"); ParIter { start, end, vec: self, } } } impl Drop for Vec { fn drop(&mut self) { for (i, bucket) in self.buckets.iter_mut().enumerate() { let entries = *bucket.entries.get_mut(); if entries.is_null() { break; } let len = Location::bucket_len(i as u32); // safety: in drop unsafe { Bucket::dealloc(entries, len, self.columns) } } } } type SnapshotItem<'v, T> = (u32, Option>); pub struct Iter<'v, T> { location: Location, idx: u32, end: u32, vec: &'v Vec, } impl Iter<'_, T> { pub fn end(&self) -> u32 { self.end } } impl<'v, T> Iterator for Iter<'v, T> { type Item = SnapshotItem<'v, T>; fn size_hint(&self) -> (usize, Option) { ( (self.end - self.idx) as usize, Some((self.end - self.idx) as usize), ) } fn next(&mut self) -> Option> { if self.end == self.idx { return None; } debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end); debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Acquire)); loop { let entries = unsafe { self.vec .buckets .get_unchecked(self.location.bucket as usize) .entries .load(Ordering::Acquire) }; debug_assert!(self.location.bucket < BUCKETS); if self.location.entry < self.location.bucket_len { if entries.is_null() { // we still want to yield these let index = self.idx; self.location.entry += 1; self.idx += 1; return Some((index, None)); } // safety: bounds and null checked above let entry = unsafe { Bucket::get(entries, self.location.entry, self.vec.columns) }; let index = self.idx; self.location.entry += 1; self.idx += 1; let entry = unsafe { (*entry) .active .load(Ordering::Acquire) .then(|| Entry::read(entry, self.vec.columns)) }; return Some((index, entry)); } self.location.entry = 0; self.location.bucket += 1; if self.location.bucket < BUCKETS { self.location.bucket_len = Location::bucket_len(self.location.bucket); } } } } impl ExactSizeIterator for Iter<'_, T> {} impl DoubleEndedIterator for Iter<'_, T> { fn next_back(&mut self) -> Option { unimplemented!() } } pub struct ParIter<'v, T> { end: u32, start: u32, vec: &'v Vec, } impl<'v, T> ParIter<'v, T> { pub fn end(&self) -> u32 { self.end } } impl<'v, T: Send + Sync> rayon::iter::ParallelIterator for ParIter<'v, T> { type Item = SnapshotItem<'v, T>; fn drive_unindexed(self, consumer: C) -> C::Result where C: rayon::iter::plumbing::UnindexedConsumer, { rayon::iter::plumbing::bridge(self, consumer) } fn opt_len(&self) -> Option { Some((self.end - self.start) as usize) } } impl rayon::iter::IndexedParallelIterator for ParIter<'_, T> { fn len(&self) -> usize { (self.end - self.start) as usize } fn drive>(self, consumer: C) -> C::Result { rayon::iter::plumbing::bridge(self, consumer) } fn with_producer(self, callback: CB) -> CB::Output where CB: rayon::iter::plumbing::ProducerCallback, { callback.callback(ParIterProducer { start: self.start, end: self.end, vec: self.vec, }) } } struct ParIterProducer<'v, T: Send> { start: u32, end: u32, vec: &'v Vec, } impl<'v, T: 'v + Send + Sync> rayon::iter::plumbing::Producer for ParIterProducer<'v, T> { type Item = SnapshotItem<'v, T>; type IntoIter = Iter<'v, T>; fn into_iter(self) -> Self::IntoIter { debug_assert!(self.start <= self.end); Iter { location: Location::of(self.start), idx: self.start, end: self.end, vec: self.vec, } } fn split_at(self, index: usize) -> (Self, Self) { assert!(index <= (self.end - self.start) as usize); let index = index as u32; ( ParIterProducer { start: self.start, end: self.start + index, vec: self.vec, }, ParIterProducer { start: self.start + index, end: self.end, vec: self.vec, }, ) } } struct Bucket { entries: AtomicPtr>, } impl Bucket { fn layout(len: u32, layout: Layout) -> Layout { Layout::from_size_align(layout.size() * len as usize, layout.align()) .expect("exceeded maximum allocation size") } unsafe fn alloc(len: u32, cols: u32) -> *mut Entry { let layout = Entry::::layout(cols); let arr_layout = Self::layout(len, layout); let entries = std::alloc::alloc(arr_layout); if entries.is_null() { std::alloc::handle_alloc_error(arr_layout) } for i in 0..len { let active = entries.add(i as usize * layout.size()) as *mut AtomicBool; active.write(AtomicBool::new(false)) } entries as *mut Entry } unsafe fn dealloc(entries: *mut Entry, len: u32, cols: u32) { let layout = Entry::::layout(cols); let arr_layout = Self::layout(len, layout); for i in 0..len { let entry = Bucket::get(entries, i, cols); if *(*entry).active.get_mut() { ptr::drop_in_place((*(*entry).slot.get()).as_mut_ptr()); for matcher_col in Entry::matcher_cols_raw(entry, cols) { ptr::drop_in_place((*matcher_col.get()).as_mut_ptr()); } } } std::alloc::dealloc(entries as *mut u8, arr_layout) } unsafe fn get(entries: *mut Entry, idx: u32, cols: u32) -> *mut Entry { let layout = Entry::::layout(cols); let ptr = entries as *mut u8; ptr.add(layout.size() * idx as usize) as *mut Entry } fn new(entries: *mut Entry) -> Bucket { Bucket { entries: AtomicPtr::new(entries), } } } #[repr(C)] struct Entry { active: AtomicBool, slot: UnsafeCell>, tail: [UnsafeCell>; 0], } impl Entry { fn layout(cols: u32) -> Layout { let head = Layout::new::(); let tail = Layout::array::(cols as usize).expect("invalid memory layout"); head.extend(tail) .expect("invalid memory layout") .0 .pad_to_align() } unsafe fn matcher_cols_raw<'a>( ptr: *mut Entry, cols: u32, ) -> &'a [UnsafeCell>] { // this whole thing looks weird. The reason we do this is that // we must make sure the pointer retains its provenance which may (or may not?) // be lost if we used tail.as_ptr() let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; let offset = tail.offset_from(ptr as *mut u8) as usize; let ptr = (ptr as *mut u8).add(offset) as *mut _; slice::from_raw_parts(ptr, cols as usize) } unsafe fn matcher_cols_mut<'a>(ptr: *mut Entry, cols: u32) -> &'a mut [Utf32String] { // this whole thing looks weird. The reason we do this is that // we must make sure the pointer retains its provenance which may (or may not?) // be lost if we used tail.as_ptr() let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; let offset = tail.offset_from(ptr as *mut u8) as usize; let ptr = (ptr as *mut u8).add(offset) as *mut _; slice::from_raw_parts_mut(ptr, cols as usize) } // # Safety // // Value must be initialized. unsafe fn read<'a>(ptr: *mut Entry, cols: u32) -> Item<'a, T> { // this whole thing looks weird. The reason we do this is that // we must make sure the pointer retains its provenance which may (or may not?) // be lost if we used tail.as_ptr() let data = (*(*ptr).slot.get()).assume_init_ref(); let tail = std::ptr::addr_of!((*ptr).tail) as *const u8; let offset = tail.offset_from(ptr as *mut u8) as usize; let ptr = (ptr as *mut u8).add(offset) as *mut _; let matcher_columns = slice::from_raw_parts(ptr, cols as usize); Item { data, matcher_columns, } } } #[derive(Debug)] struct Location { // the index of the bucket bucket: u32, // the length of `bucket` bucket_len: u32, // the index of the entry in `bucket` entry: u32, } // skip the shorter buckets to avoid unnecessary allocations. // this also reduces the maximum capacity of a vector. const SKIP: u32 = 32; const SKIP_BUCKET: u32 = (u32::BITS - SKIP.leading_zeros()) - 1; impl Location { fn of(index: u32) -> Location { let skipped = index.checked_add(SKIP).expect("exceeded maximum length"); let bucket = u32::BITS - skipped.leading_zeros(); let bucket = bucket - (SKIP_BUCKET + 1); let bucket_len = Location::bucket_len(bucket); let entry = skipped ^ bucket_len; Location { bucket, bucket_len, entry, } } fn bucket_len(bucket: u32) -> u32 { 1 << (bucket + SKIP_BUCKET) } } #[cfg(test)] mod tests { use super::*; #[test] fn location() { assert_eq!(Location::bucket_len(0), 32); for i in 0..32 { let loc = Location::of(i); assert_eq!(loc.bucket_len, 32); assert_eq!(loc.bucket, 0); assert_eq!(loc.entry, i); } assert_eq!(Location::bucket_len(1), 64); for i in 33..96 { let loc = Location::of(i); assert_eq!(loc.bucket_len, 64); assert_eq!(loc.bucket, 1); assert_eq!(loc.entry, i - 32); } assert_eq!(Location::bucket_len(2), 128); for i in 96..224 { let loc = Location::of(i); assert_eq!(loc.bucket_len, 128); assert_eq!(loc.bucket, 2); assert_eq!(loc.entry, i - 96); } let max = Location::of(MAX_ENTRIES); assert_eq!(max.bucket, BUCKETS - 1); assert_eq!(max.bucket_len, 1 << 31); assert_eq!(max.entry, (1 << 31) - 1); } }