Push WriteCache mutation to FDW, SPR-1095#729
Conversation
SPR-1095 WriteCache pushing to FDW
he original concept behind the WriteCache was that we could collect XID changes in-memory and make them available to the FDWs to directly query so that the XIDs can be available at the FDW prior to the on-disk commit. The advantage being that we could do more batching on the ingest node without causing undo replication lag. The first step of that is to add code to push the contents of the write cache to the FDW once we are ready to pass the XID to the committer. The passed extents would need to be in shared memory so that they could be available to all of the connections. |
There was a problem hiding this comment.
Pull request overview
This PR implements the ability to push WriteCache mutations to Foreign Data Wrapper (FDW) processes, enabling FDW to access uncommitted mutations from the write cache. The change introduces a mechanism to notify FDW processes about pending (not-yet-committed) transactions and their associated table IDs, allowing FDW to apply these mutations when reading tables.
Key changes:
- Extended XID push notifications to include metadata about pending transactions (real_commit flag and table_ids)
- Added two new shared memory caches: table_ids cache for tracking which tables were modified in each transaction, and extents cache for caching extent data from WriteCache
- Modified the shared memory cache implementation to track pending XIDs separately from committed XIDs
- Updated committer to defer WriteCache evictions until they're below the vacuum cutoff XID
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| system.json.settings | Added configuration for two new shared memory caches (table_ids and extents) |
| prod.system.settings.json | Added configuration for two new shared memory caches (table_ids and extents) |
| src/proto/xid_manager.proto | Extended XidPushResponse to include real_commit flag and table_ids list |
| src/xid_mgr/xid_mgr_subscriber.cc | Updated subscriber to pass entire XidPushResponse instead of individual fields |
| src/xid_mgr/xid_mgr_server.cc | Added table_ids parameter to commit functions and updated notification logic |
| src/xid_mgr/xid_mgr_service.hh | Extended notify_subscriber to accept real_commit and table_ids parameters |
| include/xid_mgr/xid_mgr_server.hh | Updated function signatures to include table_ids parameter |
| include/xid_mgr/xid_mgr_subscriber.hh | Changed PushCallback to accept full XidPushResponse |
| src/xid_mgr/test/threaded_test.cc | Updated test callback signature to match new interface |
| src/sys_tbl_mgr/shm_cache.cc | Added pending XID tracking logic and enable_xid_history flag |
| include/sys_tbl_mgr/shm_cache.hh | Added XidRecord struct, pending XID methods, and enable_xid_history parameter |
| src/sys_tbl_mgr/test/test_shm_cache.cc | Updated tests to use new constructor signature and added pending XID tests |
| src/sys_tbl_mgr/test/test_sys_tbl_mgr.cc | Updated cache constructor call to include enable_xid_history flag |
| src/write_cache/write_cache_client.cc | Added extents caching support with cache lookup before RPC |
| include/write_cache/write_cache_client.hh | Added extents cache member and use_extents_cache method |
| src/pg_fdw/pg_xid_subscriber_mgr.cc | Added initialization and management of table_ids and extents caches |
| include/pg_fdw/pg_xid_subscriber_mgr.hh | Added table_ids and extents cache members and size parameters |
| src/pg_fdw/pg_fdw_plan_state.cc | Added schema_xid to table reference |
| include/pg_fdw/pg_fdw_plan_state.hh | Added schema_xid field to TableRef struct |
| src/pg_fdw/pg_fdw_mgr.cc | Added _get_table helper to fetch tables with WriteCache mutations |
| include/pg_fdw/pg_fdw_mgr.hh | Added _get_table method and new cache members |
| src/pg_fdw/pg_ddl_mgr.cc | Updated database cleanup to delete xid history from both roots and table_ids caches |
| src/pg_fdw/test/test_xid_subscriber.cc | Updated test to include new cache size parameters |
| src/pg_log_mgr/committer.cc | Changed WriteCache eviction to be deferred and batched based on vacuum cutoff XID |
| include/pg_log_mgr/committer.hh | Added _write_cache_evictions map for tracking pending evictions |
| src/storage/vacuumer.cc | Made get_vacuum_cutoff_xid public for use by committer |
| include/storage/vacuumer.hh | Moved get_vacuum_cutoff_xid to public section |
| include/sys_tbl_mgr/merge_table.hh | Restructured class definitions and added constructors for ChangeSet and Txn |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 29 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| std::shared_ptr<sys_tbl_mgr::ShmCache> _schema_shm_cache; ///< An IPC schema cache shared by pg_xid_subscriber_daemon | ||
| std::shared_ptr<sys_tbl_mgr::ShmCache> _usertype_shm_cache; ///< An IPC usertype cache shared by pg_xid_subscriber_daemon |
There was a problem hiding this comment.
[nitpick] Naming inconsistency: _roots_cache doesn't have the _shm suffix, while _schema_shm_cache and _usertype_shm_cache do. For consistency, consider either renaming to _roots_shm_cache or removing the _shm suffix from the other two (preferably the latter, as the newer caches _table_ids_cache and _extents_cache also lack this suffix).
| std::shared_ptr<sys_tbl_mgr::ShmCache> _schema_shm_cache; ///< An IPC schema cache shared by pg_xid_subscriber_daemon | |
| std::shared_ptr<sys_tbl_mgr::ShmCache> _usertype_shm_cache; ///< An IPC usertype cache shared by pg_xid_subscriber_daemon | |
| std::shared_ptr<sys_tbl_mgr::ShmCache> _schema_cache; ///< An IPC schema cache shared by pg_xid_subscriber_daemon | |
| std::shared_ptr<sys_tbl_mgr::ShmCache> _usertype_cache; ///< An IPC usertype cache shared by pg_xid_subscriber_daemon |
There was a problem hiding this comment.
Agree with this AI comment.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
craigsoules
left a comment
There was a problem hiding this comment.
Make sure to review the comments from sonarqube -- several good cleanup notes in there.
Otherwise mostly minor changes. Thanks!
| using entry_type = std::conditional_t<Reverse, ReverseIteratorEntry, IteratorEntry>; | ||
| using heap_compare_type = std::conditional_t<Reverse, | ||
| ReverseHeapCompare<entry_type>, | ||
| ForwardHeapCompare<entry_type>>; | ||
| using heap_type = std::priority_queue<entry_type, std::vector<entry_type>, heap_compare_type>; |
There was a problem hiding this comment.
We've been using CamelCase for type names unless they are public types necessary for iterator use (e.g., reference, value_type, etc.)
| using heap_type = std::priority_queue<entry_type, std::vector<entry_type>, heap_compare_type>; | ||
|
|
||
| std::shared_ptr<heap_type> _heap; | ||
| const value_type* current_value_; |
There was a problem hiding this comment.
leading _ on member variables
| std::shared_ptr<sys_tbl_mgr::ShmCache> _schema_shm_cache; ///< An IPC schema cache shared by pg_xid_subscriber_daemon | ||
| std::shared_ptr<sys_tbl_mgr::ShmCache> _usertype_shm_cache; ///< An IPC usertype cache shared by pg_xid_subscriber_daemon |
There was a problem hiding this comment.
Agree with this AI comment.
| } else { | ||
| LOG_ERROR("unable to open the roots cache"); | ||
| // helper to delete xid history from shm caches | ||
| auto delete_history =[](auto cache_name, uint64_t db_id) { |
There was a problem hiding this comment.
I'm not a huge fan of inline function definitions like these unless they are needed for callbacks. Can we shift to a private member function?
| table->extent_schema()->row_size(), | ||
| table->extent_schema()->field_types()); | ||
|
|
||
| pe.deserialize(wc_extent.data); |
There was a problem hiding this comment.
I think this is what we were talking about the other day -- copying the data from shared memory to process local memory? Maybe make a comment pointing out what is going on here and that it's an expensive operation we'd like to eventually remove? Conceptually we shouldn't need to copy the data out of shared memory to use it, it's just easy for now.
|



No description provided.