Chris M. Thomasson
2024-12-01 22:12:42 UTC
Reply
Permalinknot use any CAS. Also, it's interesting to test against other asymmetric
proxy algorithms under heavy load. Can you get it to compile and run on
your end? Thanks.
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
____________________________________
// Chris M. Thomassons Poor Mans RCU... Example 456...
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cstdint>
#include <climits>
#include <functional>
// Masks
static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
static constexpr std::uint32_t ct_ref_complete = 0x30U;
static constexpr std::uint32_t ct_ref_inc = 0x20U;
static constexpr std::uint32_t ct_proxy_mask = 0xFU;
static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
// Iteration settings
static constexpr unsigned long ct_reader_iters_n = 2000000;
static constexpr unsigned long ct_writer_iters_n = 200000;
// Thread counts
static constexpr unsigned long ct_reader_threads_n = 53;
static constexpr unsigned long ct_writer_threads_n = 11;
// Some debug/sanity check things...
// Need to make this conditional in compilation with some macros...
static std::atomic<std::uint32_t> g_debug_node_allocations(0);
static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
static std::atomic<std::uint32_t> g_debug_release_collect(0);
static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);
// Need to align and pad data structures! To do...
struct ct_node
{
std::atomic<ct_node*> m_next;
ct_node* m_defer_next;
ct_node() : m_next(nullptr), m_defer_next(nullptr)
{
g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
}
~ct_node()
{
g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
}
};
// The proxy collector itself... :^)
template<std::size_t T_defer_limit>
class ct_proxy
{
static std::uint32_t prv_destroy(ct_node* n)
{
std::uint32_t count = 0;
while (n)
{
ct_node* next = n->m_defer_next;
delete n;
count++;
n = next;
}
return count;
}
public:
class collector
{
friend class ct_proxy;
private:
std::atomic<ct_node*> m_defer;
std::atomic<std::uint32_t> m_defer_count;
std::atomic<std::uint32_t> m_count;
public:
collector()
: m_defer(nullptr),
m_defer_count(0),
m_count(0)
{
}
~collector()
{
prv_destroy(m_defer.load(std::memory_order_relaxed));
}
};
private:
std::atomic<std::uint32_t> m_current;
std::atomic<bool> m_quiesce;
ct_node* m_defer;
collector m_collectors[2];
public:
ct_proxy()
: m_current(0),
m_quiesce(false),
m_defer(nullptr)
{
}
~ct_proxy()
{
prv_destroy(m_defer);
}
private:
void prv_quiesce_begin()
{
// Try to begin the quiescence process.
if (! m_quiesce.exchange(true, std::memory_order_acquire))
{
g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);
// advance the current collector and grab the old one.
std::uint32_t old =
m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
old = m_current.exchange((old + 1) & 1,
std::memory_order_acq_rel);
collector& c = m_collectors[old & ct_proxy_mask];
// decode reference count.
std::uint32_t refs = old & ct_ref_mask;
// increment and generate an odd reference count.
std::uint32_t old_refs = c.m_count.fetch_add(refs +
ct_proxy_quiescent, std::memory_order_release);
if (old_refs == 0 - refs)
{
g_debug_dtor_collect.fetch_add(1,
std::memory_order_relaxed);
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
}
void prv_quiesce_complete(collector& c)
{
g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);
// the collector `c' is now in a quiescent state! :^)
std::atomic_thread_fence(std::memory_order_acquire);
// maintain the back link and obtain "fresh" objects from
// this collection.
ct_node* n = m_defer;
m_defer = c.m_defer.load(std::memory_order_relaxed);
c.m_defer.store(0, std::memory_order_relaxed);
// reset the reference count.
c.m_count.store(0, std::memory_order_relaxed);
c.m_defer_count.store(0, std::memory_order_relaxed);
// release the quiesce lock.
m_quiesce.store(false, std::memory_order_release);
// destroy nodes.
std::uint32_t count = prv_destroy(n);
g_debug_quiesce_complete_nodes.fetch_add(count,
std::memory_order_relaxed);
}
public:
collector& acquire()
{
// increment the master count _and_ obtain current collector.
std::uint32_t current =
m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
// decode the collector index.
return m_collectors[current & ct_proxy_mask];
}
void release(collector& c)
{
// decrement the collector.
std::uint32_t count =
c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
// check for the completion of the quiescence process.
if ((count & ct_ref_mask) == ct_ref_complete)
{
// odd reference count and drop-to-zero condition detected!
g_debug_release_collect.fetch_add(1,
std::memory_order_relaxed);
prv_quiesce_complete(c);
}
}
collector& sync(collector& c)
{
// check if the `c' is in the middle of a quiescence process.
if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
{
// drop `c' and get the next collector.
release(c);
return acquire();
}
return c;
}
void collect()
{
prv_quiesce_begin();
}
void collect(collector& c, ct_node* n)
{
if (! n) return;
// link node into the defer list.
ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
n->m_defer_next = prev;
// bump the defer count and begin quiescence process if over
// the limit.
std::uint32_t count =
c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
if (count >= (T_defer_limit / 2))
{
prv_quiesce_begin();
}
}
};
typedef ct_proxy<10> ct_proxy_collector;
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course! ;^)
class ct_stack
{
std::atomic<ct_node*> m_head;
public:
ct_stack() : m_head(nullptr)
{
}
public:
void push(ct_node* n)
{
ct_node* head = m_head.load(std::memory_order_relaxed);
do
{
n->m_next.store(head, std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
n,
std::memory_order_release));
}
ct_node* flush()
{
return m_head.exchange(nullptr, std::memory_order_acquire);
}
ct_node* get_head()
{
return m_head.load(std::memory_order_acquire);
}
ct_node* pop()
{
ct_node* head = m_head.load(std::memory_order_acquire);
ct_node* xchg;
do
{
if (! head) return nullptr;
xchg = head->m_next.load(std::memory_order_relaxed);
}
while (!m_head.compare_exchange_weak(
head,
xchg,
std::memory_order_acquire));
return head;
}
};
// The shared state
struct ct_shared
{
ct_proxy<10> m_proxy_gc;
ct_stack m_stack;
};
// Reader threads
// Iterates through the lock free stack
void ct_thread_reader(ct_shared& shared)
{
// iterate the lockfree stack
for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
{
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
ct_node* n = shared.m_stack.get_head();
while (n)
{
// need to add in some processing...
// std::this_thread::yield();
n = n->m_next.load(std::memory_order_relaxed);
}
shared.m_proxy_gc.release(c);
}
}
// Writer threads
// Mutates the lock free stack
void ct_thread_writer(ct_shared& shared)
{
for (unsigned long wloop = 0; wloop < 42; ++wloop)
{
shared.m_proxy_gc.collect();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
shared.m_stack.push(new ct_node());
}
//std::this_thread::yield();
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
shared.m_proxy_gc.collect(c, shared.m_stack.pop());
}
shared.m_proxy_gc.release(c);
for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
{
shared.m_proxy_gc.collect();
}
{
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
ct_node* n = shared.m_stack.pop();
if (! n) break;
shared.m_proxy_gc.collect(c, n);
}
shared.m_proxy_gc.release(c);
}
if ((wloop % 3) == 0)
{
shared.m_proxy_gc.collect();
}
}
}
int main()
{
std::cout << "Chris M. Thomassons Proxy Collector Port ver
.0.0.2...\n";
std::cout << "_______________________________________\n\n";
{
ct_shared shared;
std::thread readers[ct_reader_threads_n];
std::thread writers[ct_writer_threads_n];
std::cout << "Booting threads...\n";
for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
{
writers[i] = std::thread(ct_thread_writer, std::ref(shared));
}
for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
{
readers[i] = std::thread(ct_thread_reader, std::ref(shared));
}
std::cout << "Threads running...\n";
for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
{
readers[i].join();
}
for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
{
writers[i].join();
}
}
std::cout << "Threads completed!\n\n";
// Sanity check!
{
std::uint32_t node_allocations =
g_debug_node_allocations.load(std::memory_order_relaxed);
std::uint32_t node_deallocations =
g_debug_node_deallocations.load(std::memory_order_relaxed);
std::uint32_t dtor_collect =
g_debug_dtor_collect.load(std::memory_order_relaxed);
std::uint32_t release_collect =
g_debug_release_collect.load(std::memory_order_relaxed);
std::uint32_t quiesce_complete =
g_debug_quiesce_complete.load(std::memory_order_relaxed);
std::uint32_t quiesce_begin =
g_debug_quiesce_begin.load(std::memory_order_relaxed);
std::uint32_t quiesce_complete_nodes =
g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);
std::cout << "node_allocations = " << node_allocations << "\n";
std::cout << "node_deallocations = " << node_deallocations <<
"\n\n";
std::cout << "dtor_collect = " << dtor_collect << "\n";
std::cout << "release_collect = " << release_collect << "\n";
std::cout << "quiesce_complete = " << quiesce_complete << "\n";
std::cout << "quiesce_begin = " << quiesce_begin << "\n";
std::cout << "quiesce_complete_nodes = " <<
quiesce_complete_nodes << "\n";
if (node_allocations != node_deallocations)
{
std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " <<
node_allocations - node_deallocations << "\n\n";
}
}
std::cout << "\n\nTest Completed!\n\n";
return 0;
}
____________________________________