Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
cmake_minimum_required(VERSION 3.13)

set(LINKS_PLATFORM_TESTS OFF CACHE BOOL "Whether to compile tests")
set(LINKS_PLATFORM_EXTRA_FLAGS "" CACHE STRING "Extra compiler flags")

project(Platform.Threading CXX)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup(TARGETS)

list(APPEND LINKS_PLATFORM_EXTRA_FLAGS ${CONAN_USER_PLATFORM.HASHING_suggested_flags})

add_library(${PROJECT_NAME}.Library INTERFACE)
target_include_directories(${PROJECT_NAME}.Library INTERFACE ${PROJECT_NAME})
target_link_libraries(${PROJECT_NAME}.Library INTERFACE CONAN_PKG::platform.collections)
target_compile_options(${PROJECT_NAME}.Library INTERFACE ${LINKS_PLATFORM_EXTRA_FLAGS})

if(${LINKS_PLATFORM_TESTS})
add_executable(${PROJECT_NAME}.Tests ${PROJECT_NAME}.Tests/AllTests.cpp)
set_target_properties(${PROJECT_NAME}.Tests PROPERTIES CXX_STANDARD 20)
target_link_libraries(${PROJECT_NAME}.Tests PRIVATE CONAN_PKG::gtest)
target_link_libraries(${PROJECT_NAME}.Tests PRIVATE ${PROJECT_NAME}.Library)
endif()
5 changes: 5 additions & 0 deletions cpp/Platform.Threading.Tests/AllTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include <gtest/gtest.h>
#include "Platform.Threading.h"

#include "Await.cpp"
#include "Sync.cpp"
19 changes: 19 additions & 0 deletions cpp/Platform.Threading.Tests/Await.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Platform::Threading::Tests
{
TEST(Await, One)
{
using namespace Synchronization;
using namespace std::chrono_literals;

auto queue = std::queue<std::future<void>>();
queue.push(std::async([]() { std::this_thread::sleep_for(50ms); std::cout << "I like C++\n"; }));
queue.push(std::async([]() { std::cout << "C# it's Go\n"; }));
queue.push(std::async([]() { std::this_thread::sleep_for(100ms); std::cout << "lang\n"; }));
queue.push(std::async([]() { std::cout << "async programming is super\n"; }));

auto sync_queue = Sync(std::move(queue));
AwaitOne(sync_queue).wait();
auto queue_two = Drop(std::move(sync_queue));
std::cout << "not waited count: " << queue_two.size() << std::endl;
}
}
67 changes: 67 additions & 0 deletions cpp/Platform.Threading.Tests/Sync.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace Platform::Threading::Tests
{
TEST(Synchronization, Sync_Stupid)
{
using namespace Synchronization;

auto threads_count = 1000;
auto map_operations_count = 1000;
Sync<std::map<std::string, int>> dict{};

auto work = [&]
{
for (int i = 0; i < map_operations_count; i++)
{
dict->operator[](std::to_string(i))++;
}
};

std::vector<std::thread> threads;

for (int i = 0; i < threads_count; i++)
{
threads.push_back(std::thread(work));
}

for (auto& thread : threads) {
thread.join();
}

for (auto&& [key, value] : Drop(std::move(dict))) {
ASSERT_EQ(value, threads_count);
}
}

TEST(Synchronization, Explicit)
{
using namespace Synchronization;

auto threads_count = 1000;
auto map_operations_count = 1000;

std::map<std::string, int> dict{};

auto work = [&]
{
for (int i = 0; i < map_operations_count; i++)
{
dict.operator[](std::to_string(i))++;
}
};

std::vector<std::thread> threads;

for (int i = 0; i < threads_count; i++)
{
threads.push_back(std::thread([&] { ExecuteWriteOperation(work); }));
}

for (auto& thread : threads) {
thread.join();
}

for (auto&& [key, value] : dict) {
ASSERT_EQ(value, threads_count);
}
}
}
18 changes: 0 additions & 18 deletions cpp/Platform.Threading.Tests/ThreadHelpersTests.cpp

This file was deleted.

37 changes: 26 additions & 11 deletions cpp/Platform.Threading/ConcurrentQueueExtensions.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
namespace Platform::Threading
{
class ConcurrentQueueExtensions
template<typename R>
auto AwaitAll(Synchronization::Sync<std::queue<std::future<R>>>& unsafe_queue)
{
public: static async Task AwaitAll(ConcurrentQueue<Task> queue)
auto lambda = [&unsafe_queue]
{
foreach (auto item in queue.DequeueAll())
auto locked_queue = *unsafe_queue;
auto& queue = *locked_queue;
while (!queue.empty())
{
await item.ConfigureAwait(continueOnCapturedContext: false);
auto& item = queue.front();
item.wait();
queue.pop();
}
}
};
return std::async(lambda);
}

public: static async Task AwaitOne(ConcurrentQueue<Task> queue)
template<typename R>
auto AwaitOne(Synchronization::Sync<std::queue<std::future<R>>>& unsafe_queue)
{
auto lambda = [&unsafe_queue]
{
if (queue.TryDequeue(out Task item))
auto locked_queue = *unsafe_queue;
auto& queue = *locked_queue;
if (!queue.empty())
{
await item.ConfigureAwait(continueOnCapturedContext: false);
auto& item = queue.front();
item.wait();
queue.pop();
}
}
};
return std::async(lambda);
}

public: static void EnqueueAsRunnedTask(ConcurrentQueue<Task> queue, std::function<void()> action) { queue.Enqueue(Task.Run(action)); }
};
// public: static void EnqueueAsRunnedTask(ConcurrentQueue<Task> queue, std::function<void()> action) { queue.Enqueue(Task.Run(action)); }
}
23 changes: 23 additions & 0 deletions cpp/Platform.Threading/Platform.Threading.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef PLATFORM_THREADING
#define PLATFORM_THREADING

#include <map> // TODO: in Collections

#include <Platform.Collections.h>


#include <queue>
#include <future>
#include <thread>
#include <shared_mutex>

namespace Platform::Threading::Synchronization
{
#include "Synchronization/Sync/Sync.h"
}

#include "Synchronization/ReaderWriterLockSynchronization.h"
#include "ConcurrentQueueExtensions.h"


#endif
14 changes: 0 additions & 14 deletions cpp/Platform.Threading/Synchronization/ISynchronization.h

This file was deleted.

This file was deleted.

13 changes: 0 additions & 13 deletions cpp/Platform.Threading/Synchronization/ISynchronized.h

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,59 +1,30 @@
namespace Platform::Threading::Synchronization
{
class ReaderWriterLockSynchronization : public ISynchronization
template<
typename mutex_t = std::recursive_mutex,
template<typename> typename lock_t = std::unique_lock>
void ExecuteReadOperation(auto&& action, auto&&... args)
{
private: readonly ReaderWriterLockSlim _rwLock = ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
static mutex_t mutex;

public: void ExecuteReadOperation(std::function<void()> action)
lock_t lock(mutex);
try
{
_rwLock.EnterReadLock();
try
{
action();
}
finally
{
_rwLock.ExitReadLock();
}
}
action(std::forward<decltype(args)>(args)...);
} catch(...) {}
}

public: TResult ExecuteReadOperation<TResult>(std::function<TResult()> function)
{
_rwLock.EnterReadLock();
try
{
return function();
}
finally
{
_rwLock.ExitReadLock();
}
}

public: void ExecuteWriteOperation(std::function<void()> action)
{
_rwLock.EnterWriteLock();
try
{
action();
}
finally
{
_rwLock.ExitWriteLock();
}
}
template<
typename mutex_t = std::recursive_mutex,
template<typename> typename lock_t = std::unique_lock>
void ExecuteWriteOperation(auto&& action, auto&&... args)
{
static mutex_t mutex;

public: TResult ExecuteWriteOperation<TResult>(std::function<TResult()> function)
lock_t lock(mutex);
try
{
_rwLock.EnterWriteLock();
try
{
return function();
}
finally
{
_rwLock.ExitWriteLock();
}
}
};
action(std::forward<decltype(args)>(args)...);
} catch(...) {}
}
}
Loading