Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
682c05d
[SPR-842]: Initial - Reorganize log reader and copy thread flow
rsdcbabu Jun 24, 2025
7c14a50
[SPR-842]: Make resync aware of table drop
rsdcbabu Jun 25, 2025
2c92e97
[SPR-842]: Track only tables in the resync map
rsdcbabu Jun 25, 2025
28c4ef6
[SPR-842]: Handle more copy table entries post copy-start
rsdcbabu Jun 25, 2025
d15a3e0
[SPR-842]: Handle resync map with xids
rsdcbabu Jun 26, 2025
3685ded
[SPR-842]: Run nightly once
rsdcbabu Jun 26, 2025
c69a37d
[SPR-842]: Use flag skip_setting_inflight to skip inflight setting
rsdcbabu Jun 26, 2025
caf6337
[SPR-842]: Dont wake logreader until resync queue is empty
rsdcbabu Jun 26, 2025
c38f0e8
[SPR-842]: Update reference cache
rsdcbabu Jun 26, 2025
80684b3
[SPR-842]: Handle a case when request couldnt be popped from redis qu…
rsdcbabu Jun 27, 2025
bd9c277
Merge remote-tracking branch 'origin/main' into SPR-842-logger-deadlo…
rsdcbabu Aug 11, 2025
c6fbe2d
[SPR-842]: Call _get_copy_table_ids instead of redis fetch to avoid d…
rsdcbabu Aug 11, 2025
eddb0e5
[SPR-842]: Fix comments
rsdcbabu Aug 11, 2025
ca97cd7
[SPR-842]: Add method comments
rsdcbabu Aug 11, 2025
218dbc0
Merge remote-tracking branch 'origin/main' into SPR-842-logger-deadlo…
rsdcbabu Aug 12, 2025
904d7e7
[SPR-842]: Use timeout only for the initial fetch and then use try_po…
rsdcbabu Aug 12, 2025
8cb2665
[SPR-842]: Wait for resync to begin
rsdcbabu Aug 12, 2025
c3da553
Set redis fdw state to initialize for tests prior to DDL mgr start, s…
garthgoodson Aug 12, 2025
ea04086
Merge remote-tracking branch 'origin/main' into SPR-842-logger-deadlo…
rsdcbabu Aug 13, 2025
68f2de5
Merge remote-tracking branch 'origin/SPR-941-fdw-set-running' into SP…
rsdcbabu Aug 13, 2025
2b6644e
[SPR-842]: Run large_data once, but the others multiple times
rsdcbabu Aug 13, 2025
322b126
Move disconnect of fdw connections prior to fdw shutdown/restart.
garthgoodson Aug 13, 2025
456a0f3
Merge remote-tracking branch 'origin/test-recovery-fdw-disconnect' in…
rsdcbabu Aug 14, 2025
cd0a894
[SPR-842]: Comment proxy tests
rsdcbabu Aug 14, 2025
2c3d4ee
[SPR-842]: Use variable for nightly iteration count
rsdcbabu Aug 14, 2025
34f3932
Merge remote-tracking branch 'origin/main' into SPR-842-logger-deadlo…
rsdcbabu Aug 14, 2025
64028f8
[SPR-842]: Add alias for types for readability
rsdcbabu Aug 19, 2025
e264ec3
[SPR-842]: Skip setting inflight automatically based on picked_resync
rsdcbabu Aug 19, 2025
a65f35f
[SPR-842]: Code cleanup
rsdcbabu Aug 19, 2025
2fdd8ca
[SPR-842]: Make map aliases
rsdcbabu Aug 19, 2025
f550347
[SPR-842]: Fix alias
rsdcbabu Aug 19, 2025
031bd55
Merge remote-tracking branch 'origin/main' into SPR-842-logger-deadlo…
rsdcbabu Aug 19, 2025
44ef8dd
[SPR-842]: Remove nightly changes
rsdcbabu Aug 20, 2025
0073478
[SPR-842]: Notify as many as times as resync requests were received f…
rsdcbabu Aug 20, 2025
2ae5243
Revert "[SPR-842]: Notify as many as times as resync requests were re…
rsdcbabu Aug 20, 2025
fa4bdc4
Merge branch 'main' into SPR-842-logger-deadlock-2
garthgoodson Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/pg_log_mgr/pg_log_mgr.hh
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,16 @@ namespace springtail::pg_log_mgr {
void _copy_thread();

/** Process copy table results; insert into redis */
void _process_copy_results(const std::vector<PgCopyResultPtr> &res);
bool _process_copy_results(const std::vector<PgCopyResultPtr> &res);

/**
* @brief Pick copy table request from the redis queue
*
* @param timeout timeout in seconds (0 = use try_pop)
*
* @return pair<table_id, optional<XidLsn of the copy table>>
*/
std::pair<uint32_t, std::optional<XidLsn>> _get_copy_table_ids(uint32_t timeout=0);

/** Redis cache callback for watching database state change */
RedisCache::RedisChangeWatcherPtr _cache_watcher_db_states;
Expand Down
20 changes: 20 additions & 0 deletions include/pg_log_mgr/sync_tracker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ namespace springtail::pg_log_mgr {
*/
SkipDetails should_skip(uint64_t db_id, uint64_t table_id, uint32_t pg_xid) const;

/**
* @brief Record PG_XID at which table is picked for resync
*
* @param db_id Database ID
* @param table_id Table ID
* @param xid PG XID/LSN which will be marked
*/
void pick_table_for_sync(uint64_t db_id, uint64_t table_id, const XidLsn &xid);

protected:
SyncTracker() : Singleton<SyncTracker>(ServiceId::SyncTrackerId) {}
virtual ~SyncTracker() override = default;
Expand Down Expand Up @@ -269,6 +278,17 @@ namespace springtail::pg_log_mgr {
/** Used to track all of the table syncs operating at a given snapshot XID. */
DbMap<PgXidMap<std::shared_ptr<XidRecord>>> _sync_map;

/** db-> table indicating that a resync was issued but it hasn't been picked up by the copy
thread yet. */
using TableSyncXidsMap = std::map<uint64_t, std::set<XidLsn>>;
using DbTableSyncXidsMap = std::map<uint64_t, TableSyncXidsMap>;
DbTableSyncXidsMap _resync_map;

/** db -> table -> xid indicating the table @ XID is selected for resync, yet to in-flight*/
using TableSyncPickedXidMap = std::map<uint64_t, XidLsn>;
using DbTableSyncPickedXidMap = std::map<uint64_t, TableSyncPickedXidMap>;
DbTableSyncPickedXidMap _resync_picked_map;

/** Entry is added here when a copy for the table is in-flight but hasn't completed. */
DbMap<TableMap<std::shared_ptr<Inflight>>> _inflight_map;

Expand Down
10 changes: 10 additions & 0 deletions include/pg_repl/pg_copy_table.hh
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ namespace springtail
*/
void _reset_schema();

/**
* @brief Check if table is dropped in the primary
*
* @param schema_oid Schema OID in which table is present
* @param table_oid Table OID
*
* @return bool indicating if table is dropped or not
*/
bool _is_table_dropped(uint64_t schema_oid, uint64_t table_oid);

/**
* @brief Extract schema from table and store in internal _schema object
* @details Uses atttypid from pg_attribute table for identifier of the type.
Expand Down
11 changes: 11 additions & 0 deletions include/redis/redis_containers.hh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ namespace springtail {
return nullptr;
}

/**
* @brief Peek an item from the queue, doesnt move/remove it
*/
std::shared_ptr<T> peek() const {
auto res = _redis->lindex(_key, -1);
if (res) {
return std::make_shared<T>(std::move(*res));
}
return nullptr;
}

/**
* @brief Try to pop an item from queue (list).
*
Expand Down
100 changes: 72 additions & 28 deletions src/pg_log_mgr/pg_log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,20 @@ namespace springtail::pg_log_mgr {
while (!_shutdown) {
std::set<uint32_t> table_ids;

// block on redis table sync queue w/timeout for shutdown
auto request = _redis_sync_queue.pop(REDIS_WORKER_ID, constant::COORDINATOR_KEEP_ALIVE_TIMEOUT);
if (request == nullptr) {
continue; // timeout, check for shutdown
// _get_copy_table_ids block on redis table sync queue w/timeout for shutdown
auto [next_table_id, next_xid_lsn] = _get_copy_table_ids(constant::COORDINATOR_KEEP_ALIVE_TIMEOUT);
if (next_table_id == -1) {
continue; // check for shutdown
}

do {
// populate the tables to copy; check for more work
LOG_DEBUG(LOG_PG_LOG_MGR, "Table sync queue: {}@{}:{}", request->table_id(),
request->xid().xid, request->xid().lsn);
table_ids.insert(request->table_id());
table_ids.insert(next_table_id);

// Mark table's XID to be processed for resync
SyncTracker::get_instance()->pick_table_for_sync(_db_id, next_table_id, next_xid_lsn.value());

request = _redis_sync_queue.try_pop(REDIS_WORKER_ID);
} while (request != nullptr);
std::tie(next_table_id, next_xid_lsn) = _get_copy_table_ids();
} while (next_table_id != -1);

CHECK(!table_ids.empty());

Expand All @@ -337,6 +337,24 @@ namespace springtail::pg_log_mgr {
}
}

std::pair<uint32_t, std::optional<XidLsn>>
PgLogMgr::_get_copy_table_ids(uint32_t timeout) {
uint32_t table_id = -1;
std::shared_ptr<TableSyncRequest> request;
if (timeout > 0) {
request = _redis_sync_queue.pop(REDIS_WORKER_ID, timeout);
} else {
request = _redis_sync_queue.try_pop(REDIS_WORKER_ID);
}
if (request != nullptr) {
// populate the tables to copy; check for more work
LOG_DEBUG(LOG_PG_LOG_MGR, "Table sync queue: {}@{}:{}", request->table_id(),
request->xid().xid, request->xid().lsn);
return std::make_pair(request->table_id(), request->xid());
}
return std::make_pair(table_id, std::nullopt);
}

void
PgLogMgr::_do_table_copies(std::optional<std::set<uint32_t>> table_ids)
{
Expand All @@ -348,6 +366,9 @@ namespace springtail::pg_log_mgr {
// notify xact handler to rollover log
_notify_xact_start_sync();

// ensure the pipeline was stalled before we complete
_internal_state.wait_for_state(STATE_SYNCING);

// copy tables
LOG_DEBUG(LOG_PG_LOG_MGR, "Copying tables for db {}; state=synchronizing", _db_id);
std::vector<PgCopyResultPtr> res;
Expand All @@ -361,19 +382,37 @@ namespace springtail::pg_log_mgr {
res = PgCopyTable::copy_db(_db_id, xid);
}

// ensure the pipeline was stalled before we complete
_internal_state.wait_for_state(STATE_SYNCING);

LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; res size={}", res.size());
if (res.size() > 0) {
// process copy results
_process_copy_results(res);
} else {
// no tables copied
LOG_DEBUG(LOG_PG_LOG_MGR, "No tables copied; setting state=running");
// set to running this unblocks the xact handler
_internal_state.set(STATE_RUNNING);
Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING);

bool copy_task_pending = true;
while(copy_task_pending) {
if (res.size() > 0) {
// process copy results
copy_task_pending = _process_copy_results(res);
if (copy_task_pending) {
auto [next_table_id, next_xid_lsn] = _get_copy_table_ids();
if (next_table_id != -1) {
xid = _pg_log_reader->get_next_xid();
SyncTracker::get_instance()->pick_table_for_sync(_db_id, next_table_id, next_xid_lsn.value());
LOG_DEBUG(LOG_PG_LOG_MGR, "Copying more tables; target xid={}", xid);
res = PgCopyTable::copy_tables(_db_id, xid, std::set<uint32_t>{next_table_id});
} else {
// Not able to fetch next table, so exit copy
LOG_DEBUG(LOG_PG_LOG_MGR, "Couldn't fetch more tables; setting state=running");
// set to running this unblocks the xact handler
_internal_state.set(STATE_RUNNING);
Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING);
copy_task_pending = false;
Comment thread
rsdcbabu marked this conversation as resolved.
}
}
} else {
// no tables copied
LOG_DEBUG(LOG_PG_LOG_MGR, "No more tables copied; setting state=running");
// set to running this unblocks the xact handler
_internal_state.set(STATE_RUNNING);
Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING);
copy_task_pending = false;
}
}
}

Expand All @@ -387,7 +426,7 @@ namespace springtail::pg_log_mgr {
}


void
bool
PgLogMgr::_process_copy_results(const std::vector<PgCopyResultPtr> &res)
{
assert(_internal_state.is(STATE_SYNCING));
Expand All @@ -407,11 +446,16 @@ namespace springtail::pg_log_mgr {
SyncTracker::get_instance()->add_sync(std::get<pg_log_mgr::PgXactMsg::TableSyncMsg>(redis_xact.msg));
}

// process stalled messages; set state to replaying
_internal_state.set(STATE_REPLAYING);
_internal_state.wait_for_state(STATE_RUNNING);

LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; state=replaying");
if (_redis_sync_queue.peek() == nullptr) {
// process stalled messages; set state to replaying
_internal_state.set(STATE_REPLAYING);
_internal_state.wait_for_state(STATE_RUNNING);
LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; state=replaying");
return false;
} else {
LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; More to process, dont wake up logreader");
return true;
}
}

bool
Expand Down
70 changes: 42 additions & 28 deletions src/pg_log_mgr/pg_log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,6 @@ namespace springtail::pg_log_mgr {
for (auto &entry : swap->table_info()) {
// update the existence cache for the referenced tables
_exists_cache->insert(db_id, entry->table_id, true);

auto copy_info = entry->info;
if (copy_info == nullptr) {
// During resync if the table is found to be invalid as part of the copy flow, the table
Expand All @@ -1198,36 +1197,51 @@ namespace springtail::pg_log_mgr {
LOG_DEBUG(LOG_PG_LOG_MGR, "Copy info not present for table {}", entry->table_id);
continue;
}
LOG_DEBUG(LOG_PG_LOG_MGR, "table_id {}", entry->table_id);

// perform the table swap
auto *namespace_req = copy_info->mutable_namespace_req();
namespace_req->set_xid(xid);
namespace_req->set_lsn(constant::RESYNC_NAMESPACE_LSN);

auto *create = copy_info->mutable_table_req();
create->set_xid(xid);
create->set_lsn(constant::RESYNC_CREATE_LSN);

auto *indexes = copy_info->mutable_index_reqs();
std::vector<proto::IndexRequest> indexes_vec;
for (auto &index : *indexes) {
index.set_xid(xid);
index.set_lsn(constant::RESYNC_CREATE_LSN);
indexes_vec.push_back(index);
}

auto *roots = copy_info->mutable_roots_req();
roots->set_xid(xid);
if (copy_info->is_table_dropped()) {
// Table is dropped when the sync was in queue/in-progress

auto table_req = copy_info->table_req();
auto table_info = table_req.table();
PgMsgDropTable drop_msg;
drop_msg.oid = table_info.id();
drop_msg.namespace_name = table_info.namespace_name();
drop_msg.table = table_info.name();
std::string &&ddl_stmt = client->drop_table(_db_id, XidLsn{xid}, std::move(drop_msg));
ddls.emplace_back(ddl_stmt);
} else {

LOG_DEBUG(LOG_PG_LOG_MGR, "table_id {}", entry->table_id);

// perform the table swap
auto *namespace_req = copy_info->mutable_namespace_req();
namespace_req->set_xid(xid);
namespace_req->set_lsn(constant::RESYNC_NAMESPACE_LSN);

// note: this will also invalidate the table's client cache entry
auto ddl_str = client->swap_sync_table(*namespace_req, *create, indexes_vec, *roots);
auto *create = copy_info->mutable_table_req();
create->set_xid(xid);
create->set_lsn(constant::RESYNC_CREATE_LSN);

// store the ddl mutations for the FDWs
auto ddl = nlohmann::json::parse(ddl_str);
assert(ddl.is_array());
ddls.insert(ddls.end(), ddl.begin(), ddl.end());
table_ids.emplace_back(static_cast<uint64_t>(entry->table_id));
auto *indexes = copy_info->mutable_index_reqs();
std::vector<proto::IndexRequest> indexes_vec;
for (auto &index : *indexes) {
index.set_xid(xid);
index.set_lsn(constant::RESYNC_CREATE_LSN);
indexes_vec.push_back(index);
}

auto *roots = copy_info->mutable_roots_req();
roots->set_xid(xid);

// note: this will also invalidate the table's client cache entry
auto ddl_str = client->swap_sync_table(*namespace_req, *create, indexes_vec, *roots);

// store the ddl mutations for the FDWs
auto ddl = nlohmann::json::parse(ddl_str);
assert(ddl.is_array());
ddls.insert(ddls.end(), ddl.begin(), ddl.end());
table_ids.emplace_back(static_cast<uint64_t>(entry->table_id));
}
}
LOG_INFO("Swapped synced tables: {}@{}", db_id, xid);

Expand Down
Loading
Loading