Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/async/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ namespace detail {

// ------------------------------------------------------------------------------

// Returns true if try_lock() would have succeeded. This is useful for TTAS.
bool test() {
return st_.load(std::memory_order_relaxed) == state::none;
}

bool try_lock() {
auto st = state::none;
return st_.compare_exchange_strong(
Expand Down
82 changes: 61 additions & 21 deletions include/async/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,28 @@ namespace async {
template<typename T, typename Allocator>
struct queue {
queue(Allocator allocator = {})
: buffer_{allocator} {}
: allocator_{std::move(allocator)} {}

~queue() {
assert(sinks_.empty());
while(!buffer_.empty())
frg::destruct(allocator_, buffer_.pop_front());
}

queue(const queue &) = delete;
queue &operator=(const queue &) = delete;

private:
// Note: element is heap allocated. It must be allocated *outside* the mutex.
struct element {
template<typename... Ts>
element(Ts&&... args)
: item(std::forward<Ts>(args)...) {}

T item;
frg::default_list_hook<element> hook;
};

struct sink {
friend struct queue;

Expand All @@ -24,7 +43,7 @@ struct queue {
virtual void complete() = 0;

protected:
frg::optional<T> value;
element *el{nullptr};

private:
frg::default_list_hook<sink> hook_;
Expand All @@ -33,7 +52,7 @@ struct queue {
bool try_cancel(sink *sp) {
frg::unique_lock lock{mutex_};

if(!sp->value) {
if(!sp->el) {
auto it = sinks_.iterator_to(sp);
sinks_.erase(it);
return true;
Expand All @@ -48,17 +67,19 @@ struct queue {

template<typename... Ts>
void emplace(Ts&&... arg) {
auto *el = frg::construct<element>(allocator_, std::forward<Ts>(arg)...);

sink *complete_sp = nullptr;
{
frg::unique_lock lock{mutex_};

if(!sinks_.empty()) {
assert(buffer_.empty());
auto sp = sinks_.pop_front();
sp->value.emplace(std::forward<Ts>(arg)...);
sp->el = el;
complete_sp = sp;
}else{
buffer_.emplace_back(std::forward<Ts>(arg)...);
buffer_.push_back(el);
}
}

Expand All @@ -76,28 +97,27 @@ struct queue {
: q_{q}, ct_{std::move(ct)}, r_{std::move(r)} { }

void start() {
bool fast_path = false;
element *el = nullptr;
{
frg::unique_lock lock{q_->mutex_};

if(!q_->buffer_.empty()) {
assert(q_->sinks_.empty());
value = std::move(q_->buffer_.front());
q_->buffer_.pop_front();
fast_path = true;
el = q_->buffer_.pop_front();
}else{
q_->sinks_.push_back(this);
}
}

if(fast_path)
return execution::set_value(r_, std::move(value));
if(el) {
frg::optional<T> result{std::move(el->item)};
frg::destruct(q_->allocator_, el);
return execution::set_value(r_, std::move(result));
}
cr_.listen(ct_);
}

private:
using sink::value;

struct try_cancel_fn {
bool operator()(auto *cr) {
auto self = frg::container_of(cr, &get_operation::cr_);
Expand All @@ -107,7 +127,12 @@ struct queue {
struct resume_fn {
void operator()(auto *cr) {
auto self = frg::container_of(cr, &get_operation::cr_);
execution::set_value(self->r_, std::move(self->value));
frg::optional<T> result;
if (self->el) {
result = std::move(self->el->item);
frg::destruct(self->q_->allocator_, self->el);
}
execution::set_value(self->r_, std::move(result));
}
};

Expand Down Expand Up @@ -142,23 +167,38 @@ struct queue {
}

bool empty() {
frg::unique_lock lock{mutex_};

return buffer_.empty();
}

frg::optional<T> maybe_get() {
frg::unique_lock lock{mutex_};
element *el;
{
frg::unique_lock lock{mutex_};

if(buffer_.empty())
return {};
auto object = std::move(buffer_.front());
buffer_.pop_front();
return object;
if(buffer_.empty())
return {};
el = buffer_.pop_front();
}
auto value = std::move(el->item);
frg::destruct(allocator_, el);
return value;
}

private:
platform::mutex mutex_;

frg::list<T, Allocator> buffer_;
[[no_unique_address]] Allocator allocator_;

frg::intrusive_list<
element,
frg::locate_member<
element,
frg::default_list_hook<element>,
&element::hook
>
> buffer_;

frg::intrusive_list<
sink,
Expand Down
Loading