Chris M. Thomasson
2023-11-27 06:14:52 UTC
Fwiw, here is a quick and dirty mock up of my atomic exchange based lifo
work queue thing. Not sure how to classify it quite yet. I am going to
use it as a work "queue" for my parts of my CPU based rendering engine,
no need for it wrt my shader work. However, I needed to do it. It's
passing Relacy tests. That's a good thing. Now I need to add in a slow
path so it can wait in the kernel during periods of no work. I think an
eventcount should work for it. Or perhaps I might integrate some state
in the pointer values for futexes, or whatever. I have not made up my
mind yet.
I will port my Relacy test units over to standard C++. It's not all that
hard. In fact, it's rather straightforward.
Speaking of pointer values, take note of CT_PENDING.
The fun part about this scheme is that it only uses atomic exchange for
its logic. It also has wait-free loopless push and flush:
void push(work* w)
{
// this completes the pending logic...
w->m_next.store(CT_PENDING, rl::mo_relaxed, $);
work* head = m_head.exchange(w, rl::mo_release, $);
w->m_next.store(head, rl::mo_release, $);
}
work* flush()
{
return m_head.exchange(nullptr, rl::mo_acquire, $);
}
Here is my current Relacy test unit:
https://github.com/dvyukov/relacy
_______________________________
#include <iostream>
#include <relacy/relacy.hpp>
#define CT_PRODUCERS_N 2
#define CT_CONSUMERS_N 3
#define CT_THREAD_N (CT_PRODUCERS_N + CT_CONSUMERS_N)
// Notes:
// Using spin backoff for waits as of now
// Need to slow-path it with with kernel waits
// Need to refine the per-node backoff
// humm...
#define CT_PENDING ((ct_xchg_lifo_ver_001::work*)(0xDEADBEEF))
struct ct_xchg_lifo_ver_001
{
// simulate some work...
struct work
{
rl::atomic<work*> m_next;
VAR_T(unsigned long) m_work;
work()
: m_next(CT_PENDING),
m_work(0)
{
}
~work()
{
RL_ASSERT(VAR(m_work) == 1);
}
// no data races on m_work!
void execute_work()
{
VAR(m_work) += 1;
}
// this is the pending node logic...
work* get_next()
{
work* next = m_next.load(rl::mo_consume, $);
while (next == CT_PENDING)
{
rl::backoff();
next = m_next.load(rl::mo_consume, $);
}
return next;
}
};
rl::atomic<work*> m_head;
ct_xchg_lifo_ver_001()
: m_head(nullptr)
{
}
~ct_xchg_lifo_ver_001()
{
RL_ASSERT(! m_head.load(rl::mo_relaxed, $));
}
void push(work* w)
{
// this completes the pending logic...
w->m_next.store(CT_PENDING, rl::mo_relaxed, $);
work* head = m_head.exchange(w, rl::mo_release, $);
w->m_next.store(head, rl::mo_release, $);
}
work* flush()
{
return m_head.exchange(nullptr, rl::mo_acquire, $);
}
void process(work* cur)
{
// process work nodes...
while (cur)
{
// do real work _before_ the pending logic
// this is highly beneficial.
cur->execute_work();
// get the next work node.
cur = cur->get_next();
}
}
// dump worked on nodes...
void destroy(work* cur)
{
while (cur)
{
work* next = cur->m_next.load(rl::mo_relaxed, $);
// no pending work shall be destroyed!
RL_ASSERT(next != CT_PENDING);
delete cur;
cur = next;
}
}
};
struct ct_relacy_test_fun : rl::test_suite<ct_relacy_test_fun, CT_THREAD_N>
{
ct_xchg_lifo_ver_001 m_work_lifo;
void before()
{
}
void after()
{
ct_xchg_lifo_ver_001::work* w = m_work_lifo.flush();
m_work_lifo.process(w);
m_work_lifo.destroy(w);
}
void producer(unsigned int tidx)
{
ct_xchg_lifo_ver_001::work* w = new ct_xchg_lifo_ver_001::work();
m_work_lifo.push(w);
}
void consumer(unsigned int tidx)
{
ct_xchg_lifo_ver_001::work* w = m_work_lifo.flush();
m_work_lifo.process(w);
m_work_lifo.destroy(w);
}
void thread(unsigned int tidx)
{
if (tidx < CT_PRODUCERS_N)
{
// producer threads
producer(tidx);
}
else
{
// consumer threads
consumer(tidx);
}
}
};
int
main()
{
std::cout << "Exchange LIFO Container Experiment ver:0.0.1\n";
std::cout << "Relacy Unit Test ver:0.0.1\n";
std::cout << "by: Chris M. Thomasson\n";
std::cout << "__________________________________\n" << std::endl;
{
rl::test_params p;
p.iteration_count = 400000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;
std::cout << "Executing Relacy Unit Test...\n";
std::cout << "__________________________________" << std::endl;
rl::simulate<ct_relacy_test_fun>(p);
}
return 0;
}
_______________________________
work queue thing. Not sure how to classify it quite yet. I am going to
use it as a work "queue" for my parts of my CPU based rendering engine,
no need for it wrt my shader work. However, I needed to do it. It's
passing Relacy tests. That's a good thing. Now I need to add in a slow
path so it can wait in the kernel during periods of no work. I think an
eventcount should work for it. Or perhaps I might integrate some state
in the pointer values for futexes, or whatever. I have not made up my
mind yet.
I will port my Relacy test units over to standard C++. It's not all that
hard. In fact, it's rather straightforward.
Speaking of pointer values, take note of CT_PENDING.
The fun part about this scheme is that it only uses atomic exchange for
its logic. It also has wait-free loopless push and flush:
void push(work* w)
{
// this completes the pending logic...
w->m_next.store(CT_PENDING, rl::mo_relaxed, $);
work* head = m_head.exchange(w, rl::mo_release, $);
w->m_next.store(head, rl::mo_release, $);
}
work* flush()
{
return m_head.exchange(nullptr, rl::mo_acquire, $);
}
Here is my current Relacy test unit:
https://github.com/dvyukov/relacy
_______________________________
#include <iostream>
#include <relacy/relacy.hpp>
#define CT_PRODUCERS_N 2
#define CT_CONSUMERS_N 3
#define CT_THREAD_N (CT_PRODUCERS_N + CT_CONSUMERS_N)
// Notes:
// Using spin backoff for waits as of now
// Need to slow-path it with with kernel waits
// Need to refine the per-node backoff
// humm...
#define CT_PENDING ((ct_xchg_lifo_ver_001::work*)(0xDEADBEEF))
struct ct_xchg_lifo_ver_001
{
// simulate some work...
struct work
{
rl::atomic<work*> m_next;
VAR_T(unsigned long) m_work;
work()
: m_next(CT_PENDING),
m_work(0)
{
}
~work()
{
RL_ASSERT(VAR(m_work) == 1);
}
// no data races on m_work!
void execute_work()
{
VAR(m_work) += 1;
}
// this is the pending node logic...
work* get_next()
{
work* next = m_next.load(rl::mo_consume, $);
while (next == CT_PENDING)
{
rl::backoff();
next = m_next.load(rl::mo_consume, $);
}
return next;
}
};
rl::atomic<work*> m_head;
ct_xchg_lifo_ver_001()
: m_head(nullptr)
{
}
~ct_xchg_lifo_ver_001()
{
RL_ASSERT(! m_head.load(rl::mo_relaxed, $));
}
void push(work* w)
{
// this completes the pending logic...
w->m_next.store(CT_PENDING, rl::mo_relaxed, $);
work* head = m_head.exchange(w, rl::mo_release, $);
w->m_next.store(head, rl::mo_release, $);
}
work* flush()
{
return m_head.exchange(nullptr, rl::mo_acquire, $);
}
void process(work* cur)
{
// process work nodes...
while (cur)
{
// do real work _before_ the pending logic
// this is highly beneficial.
cur->execute_work();
// get the next work node.
cur = cur->get_next();
}
}
// dump worked on nodes...
void destroy(work* cur)
{
while (cur)
{
work* next = cur->m_next.load(rl::mo_relaxed, $);
// no pending work shall be destroyed!
RL_ASSERT(next != CT_PENDING);
delete cur;
cur = next;
}
}
};
struct ct_relacy_test_fun : rl::test_suite<ct_relacy_test_fun, CT_THREAD_N>
{
ct_xchg_lifo_ver_001 m_work_lifo;
void before()
{
}
void after()
{
ct_xchg_lifo_ver_001::work* w = m_work_lifo.flush();
m_work_lifo.process(w);
m_work_lifo.destroy(w);
}
void producer(unsigned int tidx)
{
ct_xchg_lifo_ver_001::work* w = new ct_xchg_lifo_ver_001::work();
m_work_lifo.push(w);
}
void consumer(unsigned int tidx)
{
ct_xchg_lifo_ver_001::work* w = m_work_lifo.flush();
m_work_lifo.process(w);
m_work_lifo.destroy(w);
}
void thread(unsigned int tidx)
{
if (tidx < CT_PRODUCERS_N)
{
// producer threads
producer(tidx);
}
else
{
// consumer threads
consumer(tidx);
}
}
};
int
main()
{
std::cout << "Exchange LIFO Container Experiment ver:0.0.1\n";
std::cout << "Relacy Unit Test ver:0.0.1\n";
std::cout << "by: Chris M. Thomasson\n";
std::cout << "__________________________________\n" << std::endl;
{
rl::test_params p;
p.iteration_count = 400000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;
std::cout << "Executing Relacy Unit Test...\n";
std::cout << "__________________________________" << std::endl;
rl::simulate<ct_relacy_test_fun>(p);
}
return 0;
}
_______________________________