diff --git a/src/data.c b/src/data.c index 544adf18..8f7a3e92 100644 --- a/src/data.c +++ b/src/data.c @@ -794,6 +794,14 @@ backup_non_data_file(pgFile *file, pgFile *prev_file, return; } + /* special treatment for global/ptrack.map */ + if (strcmp(file->name, "ptrack.map") == 0) + { + copy_ptrackmap_file(from_fullpath, FIO_DB_HOST, + to_fullpath, FIO_BACKUP_HOST, file); + return; + } + /* * If non-data file exists in previous backup * and its mtime is less than parent backup start time ... */ @@ -1383,6 +1391,49 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath, strerror(errno)); + + /* We have to decompress ptrack.map */ + if (tmp_backup->compress_alg > NONE_COMPRESS && strcmp(tmp_file->name, "ptrack.map") == 0) + { + /* do decompression */ + char *buffer; + size_t size; + + const char *errormsg = NULL; + buffer = slurpFile(to_fullpath, "", &size, false, FIO_DB_HOST); + + if (buffer == NULL) + elog(ERROR, "Failed to allocate buffer during ptrack.map decompression"); + + size_t decompressed_size = tmp_file->size * 2; + void* decompressed = pg_malloc(decompressed_size); + + if (decompressed == NULL) + { + pg_free(buffer); + elog(ERROR, "Failed to allocate decompressed buffer during ptrack.map decompression"); + } + + int rc = do_decompress(decompressed, decompressed_size, buffer, size, tmp_backup->compress_alg, &errormsg); + + /* decompression didn't worked */ + if (rc <= 0 || errormsg != NULL) + elog(WARNING, "An error occurred during decompression of ptrack.map: %s", errormsg); + else + decompressed_size = rc; + + writePtrackMap(decompressed, decompressed_size, to_fullpath, FIO_DB_HOST); + + /* Check CRC for decompressed data */ + pg_crc32 file_crc = pgFileGetCRC(to_fullpath, true, true); + + if (file_crc != tmp_file->crc) + elog(WARNING, "CRC mismatch for uncompressed ptrack.map during decompression"); + + pg_free(decompressed); + pg_free(buffer); + } + return tmp_file->write_size; } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index e5d03495..7ecc778f 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -1234,6 +1234,8 @@ extern void get_control_file_or_back_file(const char *pgdata_path, fio_location ControlFileData *control); extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location, const char *to_fullpath, fio_location to_location, pgFile *file); +extern void copy_ptrackmap_file(const char *from_fullpath, fio_location from_location, + const char *to_fullpath, fio_location to_location, pgFile *file); extern void time2iso(char *buf, size_t len, time_t time, bool utc); extern const char *status2str(BackupStatus status); @@ -1258,6 +1260,9 @@ extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeIn extern void check_system_identifiers(PGconn *conn, const char *pgdata); extern void parse_filelist_filenames(parray *files, const char *root); +extern void writePtrackMap(const char *ptrackMap, const size_t ptrackmap_size, + const char *path, fio_location location); + /* in ptrack.c */ extern void make_pagemap_from_ptrack_2(parray* files, PGconn* backup_conn, const char *ptrack_schema, diff --git a/src/util.c b/src/util.c index 5189ba3b..ee63c873 100644 --- a/src/util.c +++ b/src/util.c @@ -459,6 +459,137 @@ copy_pgcontrol_file(const char *from_fullpath, fio_location from_location, pg_free(buffer); } +/* + * Write page_map_entry to ptrackMap + */ +void +writePtrackMap(const char *ptrackMap, const size_t ptrackmap_size, + const char *path, fio_location location) +{ + int fd; + char *buffer = NULL; + char *tmp_path = NULL; + + tmp_path = psprintf("%s.tmp",path); + + /* copy ptrackMap */ + buffer = pg_malloc0(ptrackmap_size); + + if (buffer == NULL) + { + pfree(tmp_path); + elog(ERROR, "Failed to allocate buffer during ptrack.map write"); + } + + memcpy(buffer, ptrackMap, ptrackmap_size); + + /* Write ptrackMap */ + fd = fio_open(tmp_path, + O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, location); + + if (fd < 0) { + pfree(tmp_path); + pg_free(buffer); + elog(ERROR, "Failed to open temp file: %s", tmp_path); + } + + if (fio_write(fd, buffer, ptrackmap_size) != ptrackmap_size) { + fio_close(fd); + fio_unlink(tmp_path, location); + pfree(tmp_path); + pg_free(buffer); + elog(ERROR, "Failed to write temp file: %s", tmp_path); + } + + if (fio_flush(fd) != 0) { + fio_close(fd); + fio_unlink(tmp_path, location); + pg_free(buffer); + pfree(tmp_path); + elog(ERROR, "Failed to sync temp file: %s", path); + } + + fio_close(fd); + + if (fio_rename(tmp_path, path, location) != 0){ + fio_unlink(tmp_path, location); + pg_free(buffer); + pfree(tmp_path); + elog(ERROR, "Failed to rename temp file to: %s", path); + } + + pg_free(buffer); + pfree(tmp_path); +} + +/* +* Copy ptrack.map file to backup. We do apply compression to this file. +*/ +void copy_ptrackmap_file(const char *from_fullpath, fio_location from_location, + const char *to_fullpath, fio_location to_location, + pgFile *file) { + char *buffer; + size_t size; + + bool missing_ok = true; + bool use_crc32c = true; + + const char *errormsg = NULL; + + buffer = slurpFile(from_fullpath, "", &size, false, from_location); + + if (buffer == NULL) + elog(ERROR, "Failed to allocate buffer during ptrack.map copy"); + + /* Calculate CRC of uncompressed map first */ + file->crc = pgFileGetCRC(from_fullpath, use_crc32c, missing_ok); + + size_t compressed_size = (current.compress_alg == ZLIB_COMPRESS)? compressBound(size) : size; + void *compressed = pg_malloc(compressed_size); + + if (compressed == NULL) + { + pg_free(buffer); + elog(ERROR, "Failed to allocate compressed buffer during ptrack.map copy"); + } + + int rc = do_compress(compressed, compressed_size, buffer, size, + current.compress_alg, current.compress_level, &errormsg); + + /* Something went wrong and errormsg was assigned, throw a warning */ + if (rc < 0 && errormsg != NULL) + elog(WARNING, "An error occurred during compression of ptrack.map: %s", + errormsg); + + bool is_compressed = false; + + /* compression didn`t worked */ + if (rc <= 0 || rc >= size) { + /* Do not compress ptrack.map */ + memcpy(compressed, buffer, size); + } else { + is_compressed = true; + compressed_size = rc; + } + + writePtrackMap(compressed, compressed_size, to_fullpath, to_location); + + file->size = compressed_size; + file->read_size = size; + file->write_size = compressed_size; + file->uncompressed_size = size; + + if (is_compressed){ + file->compress_alg = current.compress_alg; + } + else { + file->compress_alg = NONE_COMPRESS; + } + + pg_free(compressed); + pg_free(buffer); +} + /* * Parse string representation of the server version. */ diff --git a/src/validate.c b/src/validate.c index 3bff3f75..22191528 100644 --- a/src/validate.c +++ b/src/validate.c @@ -345,7 +345,7 @@ pgBackupValidateFiles(void *arg) arguments->backup_version <= 20021 || arguments->backup_version >= 20025, false); - if (crc != file->crc) + if (strcmp(file->name, "ptrack.map") != 0 && crc != file->crc) { elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X", file_fullpath, crc, file->crc); diff --git a/tests/ptrack_test.py b/tests/ptrack_test.py index 38317ea2..838a30ad 100644 --- a/tests/ptrack_test.py +++ b/tests/ptrack_test.py @@ -4396,3 +4396,116 @@ def test_horizon_lsn_ptrack(self): # make sure that backup size is exactly the same self.assertEqual(delta_bytes, ptrack_bytes) + + # @unittest.skip("skip") + def test_compress_ptrack_map(self): + """ + make node, make full backup, get ptrack.map size" + "make full backup with compression, check that size of ptrack.map is smaller + """ + + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, 'node'), + set_replication=True, + ptrack_enable=True, + initdb_params=['--data-checksums']) + + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, 'backup') + self.init_pb(backup_dir) + + self.add_instance(backup_dir, 'node', node) + node.slow_start() + node.safe_psql( + "postgres", + "CREATE EXTENSION ptrack") + + node.safe_psql( + "postgres", + "create extension bloom; create sequence t_seq; " + "create table t_heap " + "as select nextval('t_seq')::int as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,2500) i") + + self.backup_node(backup_dir, 'node', node, options=['--stream']) + + node.safe_psql( + 'postgres', + "update t_heap set id = nextval('t_seq'), text = md5(text), " + "tsvector = md5(repeat(tsvector::text, 100))::tsvector") + + node.safe_psql("postgres", "checkpoint") + + ptrack_map_uncompressed = os.path.join(node.data_dir, 'global', 'ptrack.map') + + full_compressed_backup_id = self.backup_node( + backup_dir, 'node', node, options=['--stream', '--compress-algorithm=zlib', '--compress-level=1']) + + full_compressed_backup_dir = os.path.join(backup_dir, "backups", "node", full_compressed_backup_id) + ptrack_map_compressed = os.path.join(full_compressed_backup_dir, "database","global", "ptrack.map") + + ptrack_map_uncompressed_size = os.path.getsize(ptrack_map_uncompressed) + ptrack_map_compressed_size = os.path.getsize(ptrack_map_compressed) + + self.assertGreater(ptrack_map_uncompressed_size, ptrack_map_compressed_size) + + # @unittest.skip("skip") + def test_decompress_ptrack_map(self): + """ + "make node, make full backup, make changes, get ptrack.map size" + "make full backup with compression, restore, validate backup and make sure that ptrack.map size is the same" + """ + + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, 'node'), + set_replication=True, + ptrack_enable=True, + initdb_params=['--data-checksums']) + + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, 'backup') + self.init_pb(backup_dir) + + self.add_instance(backup_dir, 'node', node) + node.slow_start() + node.safe_psql( + "postgres", + "CREATE EXTENSION ptrack") + + node.safe_psql( + "postgres", + "create extension bloom; create sequence t_seq; " + "create table t_heap " + "as select nextval('t_seq')::int as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,2500) i") + + self.backup_node(backup_dir, 'node', node, options=['--stream']) + + node.safe_psql( + 'postgres', + "update t_heap set id = nextval('t_seq'), text = md5(text), " + "tsvector = md5(repeat(tsvector::text, 100))::tsvector") + + node.safe_psql("postgres", "checkpoint") + + ptrack_map_uncompressed_dir = os.path.join(node.data_dir, 'global', 'ptrack.map') + + full_compressed_backup_id = self.backup_node( + backup_dir, 'node', node, options=['--stream', '--compress-algorithm=zlib', '--compress-level=1']) + + self.validate_pb(backup_dir, 'node', backup_id=full_compressed_backup_id) + + restored_node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, 'restored_node') + ) + + restored_node.cleanup() + self.restore_node(backup_dir, 'node', restored_node, backup_id=full_compressed_backup_id) + ptrack_map_restored = os.path.join(restored_node.data_dir, 'global', 'ptrack.map') + + ptrack_map_uncompressed_size = os.path.getsize(ptrack_map_uncompressed_dir) + ptrack_map_restored_size = os.path.getsize(ptrack_map_restored) + + self.assertEqual(ptrack_map_uncompressed_size, ptrack_map_restored_size) + +