Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 70 additions & 19 deletions libstdc++-v3/include/experimental/executor
Original file line number Diff line number Diff line change
Expand Up @@ -1366,13 +1366,16 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION

// construct / copy / destroy:

strand(); // TODO make state
strand() : _M_state(make_shared<_State>()) { }

explicit strand(_Executor __ex) : _M_inner_ex(__ex) { } // TODO make state
explicit strand(_Executor __ex)
: _M_state(make_shared<_State>()),
_M_inner_ex(__ex) { }

template<typename _Alloc>
strand(allocator_arg_t, const _Alloc& __a, _Executor __ex)
: _M_inner_ex(__ex) { } // TODO make state
: _M_state(make_shared<_State>()),
_M_inner_ex(__ex) { }

strand(const strand& __other) noexcept
: _M_state(__other._M_state), _M_inner_ex(__other._M_inner_ex) { }
Expand All @@ -1396,8 +1399,7 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
static_assert(is_copy_assignable<_Executor>::value,
"inner executor type must be CopyAssignable");

// TODO lock __other
// TODO copy state
_M_state = __other._M_state;
_M_inner_ex = __other._M_inner_ex;
return *this;
}
Expand All @@ -1408,7 +1410,7 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
static_assert(is_move_assignable<_Executor>::value,
"inner executor type must be MoveAssignable");

// TODO move state
_M_state = std::move(__other._M_state);
_M_inner_ex = std::move(__other._M_inner_ex);
return *this;
}
Expand All @@ -1420,8 +1422,7 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
static_assert(is_convertible<_OtherExecutor, _Executor>::value,
"inner executor type must be compatible");

// TODO lock __other
// TODO copy state
_M_state = __other._M_state;
_M_inner_ex = __other._M_inner_ex;
return *this;
}
Expand All @@ -1433,17 +1434,11 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
static_assert(is_convertible<_OtherExecutor, _Executor>::value,
"inner executor type must be compatible");

// TODO move state
_M_state = std::move(__other._M_state);
_M_inner_ex = std::move(__other._M_inner_ex);
return *this;
}

~strand()
{
// the task queue outlives this object if non-empty
// TODO create circular ref in queue?
}

// strand operations:

inner_executor_type
Expand All @@ -1452,7 +1447,7 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION

bool
running_in_this_thread() const noexcept
{ return std::this_thread::get_id() == _M_state->_M_running_on; }
{ return _M_state ? _M_state->running_in_this_thread() : false; }

execution_context&
context() const noexcept
Expand All @@ -1473,7 +1468,23 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION

template<typename _Func, typename _Alloc>
void
post(_Func&& __f, const _Alloc& __a) const; // TODO
post(_Func&& __f, const _Alloc& __a) const
{
if (!_M_state)
throw bad_executor();

bool __schedule = false;
{
unique_lock<mutex> __lock(_M_state->_M_mtx);
_M_state->_M_tasks.push(std::move(__f)); // XXX allocator not used
if (!_M_state->_M_scheduled) {
__schedule = __state._M_scheduled = true;
}
}

if (__schedule)
_M_inner_ex.post(_Runnable(_M_state, _M_inner_ex));
}

template<typename _Func, typename _Alloc>
void
Expand All @@ -1485,11 +1496,51 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
operator==(const strand& __a, const strand& __b)
{ return __a._M_state == __b._M_state; }

// TODO add synchronised queue
struct _State
{
std::thread::id _M_running_on;
mutable mutex _M_mtx;
bool _M_scheduled;
queue<function<void()>> _M_tasks;
std::thread::id _M_running_on;

_State() : _M_scheduled(false)
{}

bool running_in_this_thread() const noexcept
{
unique_lock<mutex> __lock(_M_mtx);
return _M_scheduled && std::this_thread::get_id() == _M_running_on;
}
};

struct _Runnable
{
shared_ptr<_State> _M_state;
_Executor _M_inner_ex;

void operator()()
{
function<void()> __f;
{
unique_lock<mutex> __lock(_M_state->_M_mtx);
__f = std::move(_M_state->_M_tasks.front());
_M_state->_M_tasks.pop();
_M_state->_M_running_on = std::this_thread::get_id();
}

__f();

bool __reschedule;
{
unique_lock<mutex> __lock(__state._M_mtx);
__reschedule = __state._M_scheduled = !__state._M_tasks.empty();
}

if (__reschedule)
_M_inner_ex.post(std::move(*this));
}
};

shared_ptr<_State> _M_state;
_Executor _M_inner_ex;
};
Expand Down