diff --git a/include/pg_log_mgr/pg_log_mgr.hh b/include/pg_log_mgr/pg_log_mgr.hh index 9227caffa..4b07ec654 100644 --- a/include/pg_log_mgr/pg_log_mgr.hh +++ b/include/pg_log_mgr/pg_log_mgr.hh @@ -236,7 +236,16 @@ namespace springtail::pg_log_mgr { void _copy_thread(); /** Process copy table results; insert into redis */ - void _process_copy_results(const std::vector &res); + bool _process_copy_results(const std::vector &res); + + /** + * @brief Pick copy table request from the redis queue + * + * @param timeout timeout in seconds (0 = use try_pop) + * + * @return pair> + */ + std::pair> _get_copy_table_ids(uint32_t timeout=0); /** Redis cache callback for watching database state change */ RedisCache::RedisChangeWatcherPtr _cache_watcher_db_states; diff --git a/include/pg_log_mgr/sync_tracker.hh b/include/pg_log_mgr/sync_tracker.hh index b3c904c7c..69b0d64b9 100644 --- a/include/pg_log_mgr/sync_tracker.hh +++ b/include/pg_log_mgr/sync_tracker.hh @@ -142,6 +142,15 @@ namespace springtail::pg_log_mgr { */ SkipDetails should_skip(uint64_t db_id, uint64_t table_id, uint32_t pg_xid) const; + /** + * @brief Record PG_XID at which table is picked for resync + * + * @param db_id Database ID + * @param table_id Table ID + * @param xid PG XID/LSN which will be marked + */ + void pick_table_for_sync(uint64_t db_id, uint64_t table_id, const XidLsn &xid); + protected: SyncTracker() : Singleton(ServiceId::SyncTrackerId) {} virtual ~SyncTracker() override = default; @@ -269,6 +278,17 @@ namespace springtail::pg_log_mgr { /** Used to track all of the table syncs operating at a given snapshot XID. */ DbMap>> _sync_map; + /** db-> table indicating that a resync was issued but it hasn't been picked up by the copy + thread yet. */ + using TableSyncXidsMap = std::map>; + using DbTableSyncXidsMap = std::map; + DbTableSyncXidsMap _resync_map; + + /** db -> table -> xid indicating the table @ XID is selected for resync, yet to in-flight*/ + using TableSyncPickedXidMap = std::map; + using DbTableSyncPickedXidMap = std::map; + DbTableSyncPickedXidMap _resync_picked_map; + /** Entry is added here when a copy for the table is in-flight but hasn't completed. */ DbMap>> _inflight_map; diff --git a/include/pg_repl/pg_copy_table.hh b/include/pg_repl/pg_copy_table.hh index 2698517d7..7d2cd94eb 100644 --- a/include/pg_repl/pg_copy_table.hh +++ b/include/pg_repl/pg_copy_table.hh @@ -177,6 +177,16 @@ namespace springtail */ void _reset_schema(); + /** + * @brief Check if table is dropped in the primary + * + * @param schema_oid Schema OID in which table is present + * @param table_oid Table OID + * + * @return bool indicating if table is dropped or not + */ + bool _is_table_dropped(uint64_t schema_oid, uint64_t table_oid); + /** * @brief Extract schema from table and store in internal _schema object * @details Uses atttypid from pg_attribute table for identifier of the type. diff --git a/include/redis/redis_containers.hh b/include/redis/redis_containers.hh index 58c1aa689..db6616dea 100644 --- a/include/redis/redis_containers.hh +++ b/include/redis/redis_containers.hh @@ -89,6 +89,17 @@ namespace springtail { return nullptr; } + /** + * @brief Peek an item from the queue, doesnt move/remove it + */ + std::shared_ptr peek() const { + auto res = _redis->lindex(_key, -1); + if (res) { + return std::make_shared(std::move(*res)); + } + return nullptr; + } + /** * @brief Try to pop an item from queue (list). * diff --git a/src/pg_log_mgr/pg_log_mgr.cc b/src/pg_log_mgr/pg_log_mgr.cc index fda07abc0..13a40a044 100644 --- a/src/pg_log_mgr/pg_log_mgr.cc +++ b/src/pg_log_mgr/pg_log_mgr.cc @@ -304,20 +304,20 @@ namespace springtail::pg_log_mgr { while (!_shutdown) { std::set table_ids; - // block on redis table sync queue w/timeout for shutdown - auto request = _redis_sync_queue.pop(REDIS_WORKER_ID, constant::COORDINATOR_KEEP_ALIVE_TIMEOUT); - if (request == nullptr) { - continue; // timeout, check for shutdown + // _get_copy_table_ids block on redis table sync queue w/timeout for shutdown + auto [next_table_id, next_xid_lsn] = _get_copy_table_ids(constant::COORDINATOR_KEEP_ALIVE_TIMEOUT); + if (next_table_id == -1) { + continue; // check for shutdown } - do { // populate the tables to copy; check for more work - LOG_DEBUG(LOG_PG_LOG_MGR, "Table sync queue: {}@{}:{}", request->table_id(), - request->xid().xid, request->xid().lsn); - table_ids.insert(request->table_id()); + table_ids.insert(next_table_id); + + // Mark table's XID to be processed for resync + SyncTracker::get_instance()->pick_table_for_sync(_db_id, next_table_id, next_xid_lsn.value()); - request = _redis_sync_queue.try_pop(REDIS_WORKER_ID); - } while (request != nullptr); + std::tie(next_table_id, next_xid_lsn) = _get_copy_table_ids(); + } while (next_table_id != -1); CHECK(!table_ids.empty()); @@ -337,6 +337,24 @@ namespace springtail::pg_log_mgr { } } + std::pair> + PgLogMgr::_get_copy_table_ids(uint32_t timeout) { + uint32_t table_id = -1; + std::shared_ptr request; + if (timeout > 0) { + request = _redis_sync_queue.pop(REDIS_WORKER_ID, timeout); + } else { + request = _redis_sync_queue.try_pop(REDIS_WORKER_ID); + } + if (request != nullptr) { + // populate the tables to copy; check for more work + LOG_DEBUG(LOG_PG_LOG_MGR, "Table sync queue: {}@{}:{}", request->table_id(), + request->xid().xid, request->xid().lsn); + return std::make_pair(request->table_id(), request->xid()); + } + return std::make_pair(table_id, std::nullopt); + } + void PgLogMgr::_do_table_copies(std::optional> table_ids) { @@ -348,6 +366,9 @@ namespace springtail::pg_log_mgr { // notify xact handler to rollover log _notify_xact_start_sync(); + // ensure the pipeline was stalled before we complete + _internal_state.wait_for_state(STATE_SYNCING); + // copy tables LOG_DEBUG(LOG_PG_LOG_MGR, "Copying tables for db {}; state=synchronizing", _db_id); std::vector res; @@ -361,19 +382,37 @@ namespace springtail::pg_log_mgr { res = PgCopyTable::copy_db(_db_id, xid); } - // ensure the pipeline was stalled before we complete - _internal_state.wait_for_state(STATE_SYNCING); - LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; res size={}", res.size()); - if (res.size() > 0) { - // process copy results - _process_copy_results(res); - } else { - // no tables copied - LOG_DEBUG(LOG_PG_LOG_MGR, "No tables copied; setting state=running"); - // set to running this unblocks the xact handler - _internal_state.set(STATE_RUNNING); - Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING); + + bool copy_task_pending = true; + while(copy_task_pending) { + if (res.size() > 0) { + // process copy results + copy_task_pending = _process_copy_results(res); + if (copy_task_pending) { + auto [next_table_id, next_xid_lsn] = _get_copy_table_ids(); + if (next_table_id != -1) { + xid = _pg_log_reader->get_next_xid(); + SyncTracker::get_instance()->pick_table_for_sync(_db_id, next_table_id, next_xid_lsn.value()); + LOG_DEBUG(LOG_PG_LOG_MGR, "Copying more tables; target xid={}", xid); + res = PgCopyTable::copy_tables(_db_id, xid, std::set{next_table_id}); + } else { + // Not able to fetch next table, so exit copy + LOG_DEBUG(LOG_PG_LOG_MGR, "Couldn't fetch more tables; setting state=running"); + // set to running this unblocks the xact handler + _internal_state.set(STATE_RUNNING); + Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING); + copy_task_pending = false; + } + } + } else { + // no tables copied + LOG_DEBUG(LOG_PG_LOG_MGR, "No more tables copied; setting state=running"); + // set to running this unblocks the xact handler + _internal_state.set(STATE_RUNNING); + Properties::set_db_state(_db_id, redis::db_state_change::REDIS_STATE_RUNNING); + copy_task_pending = false; + } } } @@ -387,7 +426,7 @@ namespace springtail::pg_log_mgr { } - void + bool PgLogMgr::_process_copy_results(const std::vector &res) { assert(_internal_state.is(STATE_SYNCING)); @@ -407,11 +446,16 @@ namespace springtail::pg_log_mgr { SyncTracker::get_instance()->add_sync(std::get(redis_xact.msg)); } - // process stalled messages; set state to replaying - _internal_state.set(STATE_REPLAYING); - _internal_state.wait_for_state(STATE_RUNNING); - - LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; state=replaying"); + if (_redis_sync_queue.peek() == nullptr) { + // process stalled messages; set state to replaying + _internal_state.set(STATE_REPLAYING); + _internal_state.wait_for_state(STATE_RUNNING); + LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; state=replaying"); + return false; + } else { + LOG_DEBUG(LOG_PG_LOG_MGR, "Table copy done; More to process, dont wake up logreader"); + return true; + } } bool diff --git a/src/pg_log_mgr/pg_log_reader.cc b/src/pg_log_mgr/pg_log_reader.cc index 0e4263360..fe341a98f 100644 --- a/src/pg_log_mgr/pg_log_reader.cc +++ b/src/pg_log_mgr/pg_log_reader.cc @@ -1189,7 +1189,6 @@ namespace springtail::pg_log_mgr { for (auto &entry : swap->table_info()) { // update the existence cache for the referenced tables _exists_cache->insert(db_id, entry->table_id, true); - auto copy_info = entry->info; if (copy_info == nullptr) { // During resync if the table is found to be invalid as part of the copy flow, the table @@ -1198,36 +1197,51 @@ namespace springtail::pg_log_mgr { LOG_DEBUG(LOG_PG_LOG_MGR, "Copy info not present for table {}", entry->table_id); continue; } - LOG_DEBUG(LOG_PG_LOG_MGR, "table_id {}", entry->table_id); - - // perform the table swap - auto *namespace_req = copy_info->mutable_namespace_req(); - namespace_req->set_xid(xid); - namespace_req->set_lsn(constant::RESYNC_NAMESPACE_LSN); - - auto *create = copy_info->mutable_table_req(); - create->set_xid(xid); - create->set_lsn(constant::RESYNC_CREATE_LSN); - - auto *indexes = copy_info->mutable_index_reqs(); - std::vector indexes_vec; - for (auto &index : *indexes) { - index.set_xid(xid); - index.set_lsn(constant::RESYNC_CREATE_LSN); - indexes_vec.push_back(index); - } - auto *roots = copy_info->mutable_roots_req(); - roots->set_xid(xid); + if (copy_info->is_table_dropped()) { + // Table is dropped when the sync was in queue/in-progress + + auto table_req = copy_info->table_req(); + auto table_info = table_req.table(); + PgMsgDropTable drop_msg; + drop_msg.oid = table_info.id(); + drop_msg.namespace_name = table_info.namespace_name(); + drop_msg.table = table_info.name(); + std::string &&ddl_stmt = client->drop_table(_db_id, XidLsn{xid}, std::move(drop_msg)); + ddls.emplace_back(ddl_stmt); + } else { + + LOG_DEBUG(LOG_PG_LOG_MGR, "table_id {}", entry->table_id); + + // perform the table swap + auto *namespace_req = copy_info->mutable_namespace_req(); + namespace_req->set_xid(xid); + namespace_req->set_lsn(constant::RESYNC_NAMESPACE_LSN); - // note: this will also invalidate the table's client cache entry - auto ddl_str = client->swap_sync_table(*namespace_req, *create, indexes_vec, *roots); + auto *create = copy_info->mutable_table_req(); + create->set_xid(xid); + create->set_lsn(constant::RESYNC_CREATE_LSN); - // store the ddl mutations for the FDWs - auto ddl = nlohmann::json::parse(ddl_str); - assert(ddl.is_array()); - ddls.insert(ddls.end(), ddl.begin(), ddl.end()); - table_ids.emplace_back(static_cast(entry->table_id)); + auto *indexes = copy_info->mutable_index_reqs(); + std::vector indexes_vec; + for (auto &index : *indexes) { + index.set_xid(xid); + index.set_lsn(constant::RESYNC_CREATE_LSN); + indexes_vec.push_back(index); + } + + auto *roots = copy_info->mutable_roots_req(); + roots->set_xid(xid); + + // note: this will also invalidate the table's client cache entry + auto ddl_str = client->swap_sync_table(*namespace_req, *create, indexes_vec, *roots); + + // store the ddl mutations for the FDWs + auto ddl = nlohmann::json::parse(ddl_str); + assert(ddl.is_array()); + ddls.insert(ddls.end(), ddl.begin(), ddl.end()); + table_ids.emplace_back(static_cast(entry->table_id)); + } } LOG_INFO("Swapped synced tables: {}@{}", db_id, xid); diff --git a/src/pg_log_mgr/sync_tracker.cc b/src/pg_log_mgr/sync_tracker.cc index 4daa3863b..b159a87ae 100644 --- a/src/pg_log_mgr/sync_tracker.cc +++ b/src/pg_log_mgr/sync_tracker.cc @@ -32,17 +32,32 @@ SyncTracker::issue_resync_and_wait(uint64_t db_id, TableSyncRequest request(table_id, xid); table_sync_queue.push(request); + // add the table to the resync map; will get removed when mark_inflight() is called + _resync_map[db_id][table_id].insert(xid); + // wait for the resync to begin auto wait = std::make_shared(); auto wait_i = _wait_map.emplace(db_id, wait).first; wait->condition.wait(lock, [&wait]() { - return wait->notified; - }); + return wait->notified; + }); // clear the entry _wait_map.erase(wait_i); } +void +SyncTracker::pick_table_for_sync(uint64_t db_id, + uint64_t table_id, + const XidLsn &xid) +{ + LOG_DEBUG(LOG_PG_LOG_MGR, "Pick for Sync: db {} table {} xid {}:{}", + db_id, table_id, xid.xid, xid.lsn); + std::unique_lock lock(_mutex); + + _resync_picked_map[db_id][table_id] = xid; +} + void SyncTracker::mark_inflight(uint64_t db_id, uint64_t table_id, @@ -54,6 +69,42 @@ SyncTracker::mark_inflight(uint64_t db_id, db_id, table_id, xid.xid, xid.lsn); std::unique_lock lock(_mutex); + // Find the XID of the table thats picked for resync + auto picked_db_i = _resync_picked_map.find(db_id); + + // If resync is not picked, return + // this is usually the case during initial resyncs + if (picked_db_i == _resync_picked_map.end()) { + return; + } + + auto picked_table_i = picked_db_i->second.find(table_id); + CHECK(picked_table_i != picked_db_i->second.end()); + auto picked_table_xid = picked_table_i->second; + picked_db_i->second.erase(table_id); + if (picked_db_i->second.empty()) { + _resync_picked_map.erase(picked_db_i); + } + + // Erase the entries from the resync_map + auto db_i = _resync_map.find(db_id); + CHECK(db_i != _resync_map.end()); + if (db_i != _resync_map.end()) { + auto table_i = db_i->second.find(table_id); + CHECK(table_i != db_i->second.end()); + + // clear from the resync map + // Erase all elements less than or equal to picked_table_xid + table_i->second.erase(table_i->second.begin(), table_i->second.upper_bound(picked_table_xid)); + + if (table_i->second.empty()) { + db_i->second.erase(table_i); + if (db_i->second.empty()) { + _resync_map.erase(db_i); + } + } + } + // add the entry to the inflight map auto entry = std::make_shared(copy->pg_xid, copy->xmax, copy->xips, schema); _inflight_map[db_id].emplace(table_id, entry); @@ -198,6 +249,14 @@ SyncTracker::add_sync(const pg_log_mgr::PgXactMsg::TableSyncMsg &sync_msg) LOG_DEBUG(LOG_PG_LOG_MGR, "db {} table_id {} pg_xid {}", db_id, table_id, pg_xid); + // first check the resync map + auto resync_i = _resync_map.find(db_id); + if (resync_i != _resync_map.end()) { + if (resync_i->second.contains(table_id)) { + return { true, true }; // if the table is present, skip + } + } + // check the inflight map auto inflight_i = _inflight_map.find(db_id); if (inflight_i != _inflight_map.end()) { diff --git a/src/pg_repl/pg_copy_table.cc b/src/pg_repl/pg_copy_table.cc index 2cb81d6fa..bd95838c3 100644 --- a/src/pg_repl/pg_copy_table.cc +++ b/src/pg_repl/pg_copy_table.cc @@ -225,6 +225,12 @@ namespace springtail return {pg_xid8, _schema.xids}; } + bool PgCopyTable::_is_table_dropped(uint64_t schema_oid, uint64_t table_oid) { + _connection.exec( + fmt::format("SELECT 1 FROM pg_class WHERE oid = {} AND relnamespace = {}", table_oid, schema_oid)); + return (_connection.ntuples() == 0); + } + void PgCopyTable::_get_secondary_indexes() { _connection.exec(fmt::format(SECONDARY_INDEX_QUERY, _schema.table_oid)); @@ -680,6 +686,7 @@ namespace springtail stats->set_end_offset(metadata.stats.end_offset); roots_req->set_snapshot_xid(metadata.snapshot_xid); + copy_info->set_is_table_dropped(_is_table_dropped(schema_oid, table_oid)); LOG_INFO("Copied table {}.{} with oid {} and schema oid {}", schema_name, table_name, table_oid, schema_oid); diff --git a/src/proto/pg_copy_table.proto b/src/proto/pg_copy_table.proto index 10c4cd89a..f644597c5 100644 --- a/src/proto/pg_copy_table.proto +++ b/src/proto/pg_copy_table.proto @@ -17,4 +17,7 @@ message CopyTableInfo { // Root pointers and stats update UpdateRootsRequest roots_req = 4; + + // Indicate if the table is dropped + bool is_table_dropped = 5; }