Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 46 additions & 61 deletions libkineto/src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
* LICENSE file in the root directory of this source tree.
*/

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>

// TODO(T90238193)
// @lint-ignore-every CLANGTIDY facebook-hte-RelativeInclude
Expand All @@ -16,10 +20,8 @@
#include "ConfigLoader.h"
#include "DaemonConfigLoader.h"
#include "DeviceUtil.h"
#include "ThreadUtil.h"
#ifdef HAS_CUPTI
#include "CuptiActivityApi.h"
#include "CuptiCallbackApi.h"
#include "CuptiRangeProfiler.h"
#include "EventProfilerController.h"
#endif
Expand All @@ -38,6 +40,39 @@

namespace KINETO_NAMESPACE {

// Uses condition_variable instead of std::async to allow immediate cancellation
// on shutdown. std::future destructor blocks for the full sleep duration.
class DelayedInitializer {
public:
explicit DelayedInitializer(std::chrono::seconds delay) {
thread_ = std::thread([this, delay]() {
std::unique_lock<std::mutex> lock(mutex_);
if (!cv_.wait_for(lock, delay, [this] { return stop_.load(); })) {
libkineto::api().initProfilerIfRegistered();
}
});
}

~DelayedInitializer() {
stop_ = true;
cv_.notify_one();
if (thread_.joinable()) {
thread_.join();
}
}

DelayedInitializer(const DelayedInitializer&) = delete;
DelayedInitializer& operator=(const DelayedInitializer&) = delete;

private:
std::thread thread_;
std::atomic<bool> stop_{false};
std::mutex mutex_;
std::condition_variable cv_;
};

static std::unique_ptr<DelayedInitializer> delayedInit_;

#if __linux__ || defined(HAS_CUPTI)
static bool initialized = false;

Expand All @@ -57,26 +92,6 @@ static void initProfilers() {
#endif // __linux__ || defined(HAS_CUPTI)

#ifdef HAS_CUPTI
bool enableEventProfiler() {
if (getenv("KINETO_ENABLE_EVENT_PROFILER") != nullptr) {
return true;
} else {
return false;
}
}

static void initProfilersCallback(
CUpti_CallbackDomain /*domain*/,
CUpti_CallbackId /*cbid*/,
const CUpti_CallbackData* /*cbInfo*/) {
VLOG(0) << "CUDA Context created";
initProfilers();

if (enableEventProfiler()) {
LOG(WARNING) << "Event Profiler is no longer supported in kineto";
}
}

// Some models suffer from excessive instrumentation code gen
// on dynamic attach which can hang for more than 5+ seconds.
// If the workload was meant to be traced, preload the CUPTI
Expand All @@ -90,41 +105,6 @@ static bool shouldPreloadCuptiInstrumentation() {
#endif
}

bool setupCuptiInitCallback(bool logOnError) {
// libcupti will be lazily loaded on this call.
// If it is not available (e.g. CUDA is not installed),
// then this call will return an error and we just abort init.
auto cbapi = CuptiCallbackApi::singleton();
cbapi->initCallbackApi();

bool status = false;

if (cbapi->initSuccess()) {
const CUpti_CallbackDomain domain = CUPTI_CB_DOMAIN_RESOURCE;
status = cbapi->registerCallback(
domain,
CuptiCallbackApi::RESOURCE_CONTEXT_CREATED,
initProfilersCallback);
if (status) {
status = cbapi->enableCallback(
domain, CuptiCallbackApi::RESOURCE_CONTEXT_CREATED);
}
}

if (!cbapi->initSuccess() || !status) {
if (logOnError) {
CUPTI_CALL(cbapi->getCuptiStatus());
LOG(WARNING) << "CUPTI initialization failed - "
<< "CUDA profiler activities will be missing";
LOG(INFO)
<< "If you see CUPTI_ERROR_INSUFFICIENT_PRIVILEGES, refer to "
<< "https://developer.nvidia.com/nvidia-development-tools-solutions-err-nvgpuctrperm-cupti";
}
}

return status;
}

static std::unique_ptr<CuptiRangeProfilerInit> rangeProfilerInit;
#endif // HAS_CUPTI

Expand Down Expand Up @@ -154,10 +134,9 @@ void libkineto_init(bool cpuOnly, bool logOnError) {
#ifdef HAS_CUPTI
bool initRangeProfiler = true;

if (!cpuOnly && !libkineto::isDaemonEnvVarSet()) {
bool success = setupCuptiInitCallback(logOnError);
cpuOnly = !success;
initRangeProfiler = success;
if (!cpuOnly) {
cpuOnly = !isCUDAGpuAvailable();
initRangeProfiler = !cpuOnly;
}

// Initialize CUPTI Range Profiler API
Expand Down Expand Up @@ -215,6 +194,12 @@ void libkineto_init(bool cpuOnly, bool logOnError) {
initProfilers();
}
#endif

// Start delayed profiler initialization for GPU profiling
if (!cpuOnly) {
delayedInit_ =
std::make_unique<DelayedInitializer>(std::chrono::seconds(1));
}
}

// The cuda driver calls this function if the CUDA_INJECTION64_PATH environment
Expand Down
85 changes: 85 additions & 0 deletions libkineto/test/DelayedInitializerTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>

namespace {

class TestDelayedInitializer {
public:
explicit TestDelayedInitializer(
std::chrono::milliseconds delay,
std::function<void()> callback)
: callback_(std::move(callback)) {
thread_ = std::thread([this, delay]() {
std::unique_lock<std::mutex> lock(mutex_);
if (!cv_.wait_for(lock, delay, [this] { return stop_.load(); })) {
callback_();
called_ = true;
}
});
}

~TestDelayedInitializer() {
stop_ = true;
cv_.notify_one();
if (thread_.joinable()) {
thread_.join();
}
}

bool wasCalled() const {
return called_.load();
}

private:
std::function<void()> callback_;
std::thread thread_;
std::atomic<bool> stop_{false};
std::atomic<bool> called_{false};
std::mutex mutex_;
std::condition_variable cv_;
};

} // namespace

TEST(DelayedInitializerTest, CallsCallbackAfterDelay) {
std::atomic<bool> initialized{false};
{
TestDelayedInitializer init(
std::chrono::milliseconds(50), [&]() { initialized = true; });
EXPECT_FALSE(initialized.load());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_TRUE(initialized.load());
}
}

TEST(DelayedInitializerTest, CancelsOnEarlyDestruction) {
std::atomic<bool> initialized{false};
{
TestDelayedInitializer init(
std::chrono::milliseconds(500), [&]() { initialized = true; });
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
EXPECT_FALSE(initialized.load());
}

TEST(DelayedInitializerTest, DestructorDoesNotBlock) {
auto start = std::chrono::steady_clock::now();
{
TestDelayedInitializer init(std::chrono::seconds(10), []() {});
}
auto elapsed = std::chrono::steady_clock::now() - start;
EXPECT_LT(elapsed, std::chrono::milliseconds(100));
}