/// An entry in a linked list. /// /// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different /// cache-line than thread-local data in terms of performance. #[derive(Debug)] pub(crate) structEntry { /// The next entry in the linked list. /// If the tag is 1, this entry is marked as deleted. next: Atomic<Entry>, }
/// A lock-free, intrusive linked list of type `T`. #[derive(Debug)] pub(crate) structList<T, C: IsElement<T> = T> { /// The head of the linked list. head: Atomic<Entry>,
/// The phantom data for using `T` and `C`. _marker: PhantomData<(T, C)>, }
/// Implementing this trait asserts that the type `T` can be used as an element in the intrusive /// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance /// of `Entry`. pub(crate) traitIsElement<T> { /// Returns a reference to this element's `Entry`. fnentry_of(_: &T) -> &Entry;
/// Given a reference to an element's entry, returns that element. /// /// ```ignore /// let elem = ListElement::new(); /// assert_eq!(elem.entry_of(), /// unsafe { ListElement::element_of(elem.entry_of()) } ); /// ``` /// /// # Safety /// /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance /// of the element type (`T`). unsafefnelement_of(_: &Entry) -> &T;
/// The function that is called when an entry is unlinked from list. /// /// # Safety /// /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance /// of the element type (`T`). unsafefnfinalize(_: &Entry, _: &Guard); }
/// Inserts `entry` into the head of the list. /// /// # Safety /// /// You should guarantee that: /// /// - `container` is not null /// - `container` is immovable, e.g. inside an `Owned` /// - the same `Entry` is not inserted more than once /// - the inserted object will be removed before the list is dropped pub(crate) unsafefninsert<'g>(&'gself, container: Shared<'g, T>, guard: &'g Guard) { // Insert right after head, i.e. at the beginning of the list. letto = &self.head; // Get the intrusively stored Entry of the new element to insert. letentry: &Entry = C::entry_of(container.deref()); // Make a Shared ptr to that Entry. letentry_ptr = Shared::from(entry as *const _); // Read the current successor of where we want to insert. letmut next = to.load(Relaxed, guard);
loop { // Set the Entry of the to-be-inserted element to point to the previous successor of // `to`. entry.next.store(next, Relaxed); match to.compare_exchange_weak(next, entry_ptr, Release, Relaxed, guard) { Ok(_) => break, // We lost the race or weak CAS failed spuriously. Update the successor and try // again. Err(err) => next = err.current, } } }
插入逻辑如下图所示:
delete
删除节点:
1 2 3 4 5 6 7 8 9 10 11 12
implEntry { /// Marks this entry as deleted, deferring the actual deallocation to a later iteration. /// /// # Safety /// /// The entry should be a member of a linked list, and it should not have been deleted. /// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C` /// is the associated helper for the linked list. pub(crate) unsafefndelete(&self, guard: &Guard) { self.next.fetch_or(1, Release, guard); } }
删除节点仅仅是把 entry 的 next 原子指针的 tag 设置为1,标记这个节点已经在逻辑上被删除了。之所以不立即把这个节点从链表中删除,是为了避免由于并发的插入/删除节点导致插入成功的节点被丢失的问题。
/// Returns an iterator over all objects. /// /// # Caveat /// /// Every object that is inserted at the moment this function is called and persists at least /// until the end of iteration will be returned. Since this iterator traverses a lock-free /// linked list that may be concurrently modified, some additional caveats apply: /// /// 1. If a new object is inserted during iteration, it may or may not be returned. /// 2. If an object is deleted during iteration, it may or may not be returned. /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning /// thread will continue to iterate over the same list. pub(crate) fniter<'g>(&'gself, guard: &'g Guard) -> Iter<'g, T, C> { Iter { guard, pred: &self.head, curr: self.head.load(Acquire, guard), head: &self.head, _marker: PhantomData, } }
if succ.tag() == 1 { // This entry was removed. Try unlinking it from the list. letsucc = succ.with_tag(0);
// The tag should always be zero, because removing a node after a logically deleted // node leaves the list in an invalid state. debug_assert!(self.curr.tag() == 0);
// Try to unlink `curr` from the list, and get the new value of `self.pred`. letsucc = matchself .pred .compare_exchange(self.curr, succ, Acquire, Acquire, self.guard) { Ok(_) => { // We succeeded in unlinking `curr`, so we have to schedule // deallocation. Deferred drop is okay, because `list.delete()` can only be // called if `T: 'static`. unsafe { C::finalize(self.curr.deref(), self.guard); }
// `succ` is the new value of `self.pred`. succ } Err(e) => { // `e.current` is the current value of `self.pred`. e.current } };
// If the predecessor node is already marked as deleted, we need to restart from // `head`. if succ.tag() != 0 { self.pred = self.head; self.curr = self.head.load(Acquire, self.guard);
returnSome(Err(IterError::Stalled)); }
// Move over the removed by only advancing `curr`, not `pred`. self.curr = succ; continue; }