diff --git a/CHANGELOG.md b/CHANGELOG.md index 18e7157a..6ae2e6b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +* Implement "safe" mode to prevent writing data to wrong replicaset when vshard rebalance is in progress. +* Auto switch to safe mode when rebalance process starts. +* Manual return to fast mode. + ## [1.6.1] - 19-09-25 ### Added diff --git a/crud-scm-1.rockspec b/crud-scm-1.rockspec index 30d0c4b0..e86a6374 100644 --- a/crud-scm-1.rockspec +++ b/crud-scm-1.rockspec @@ -13,7 +13,7 @@ dependencies = { 'lua ~> 5.1', 'checks >= 3.3.0-1', 'errors >= 2.2.1-1', - 'vshard >= 0.1.36-1', + 'vshard >= 0.1.39-1', } build = { diff --git a/crud.lua b/crud.lua index 60301163..c62e44ee 100644 --- a/crud.lua +++ b/crud.lua @@ -23,6 +23,7 @@ local readview = require('crud.readview') local schema = require('crud.schema') local storage_info = require('crud.storage_info') local storage = require('crud.storage') +local rebalance = require('crud.common.rebalance') local crud = {} @@ -158,8 +159,22 @@ crud.readview = readview.new -- @function schema crud.schema = schema.call +crud.rebalance = {} + +-- @refer rebalance.router_cache_clear +-- @function router_cache_clear +crud.rebalance.router_cache_clear = rebalance.router_api.cache_clear + +-- @refer rebalance.router_cache_length +-- @function router_cache_length +crud.rebalance.router_cache_length = rebalance.router_api.cache_length + +-- @refer rebalance.router_cache_last_clear_ts +-- @function router_cache_last_clear_ts +crud.rebalance.router_cache_last_clear_ts = rebalance.router_api.cache_last_clear_ts + function crud.init_router() - rawset(_G, 'crud', crud) + rawset(_G, 'crud', crud) end function crud.stop_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index 5887923f..13679bc0 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -1,11 +1,14 @@ local errors = require('errors') +local log = require('log') local call_cache = require('crud.common.call_cache') local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') -local fiber_clock = require('fiber').clock +local fiber = require('fiber') local const = require('crud.common.const') +local rebalance = require('crud.common.rebalance') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -14,14 +17,38 @@ local CallError = errors.new_class('CallError') local CALL_FUNC_NAME = 'call_on_storage' local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME) - +local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/fast' local call = {} -local function call_on_storage(run_as_user, func_name, ...) +local function call_on_storage_safe(run_as_user, func_name, ...) + return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) +end + +local function call_on_storage_fast(run_as_user, func_name, ...) + fiber.name(CRUD_CALL_FIBER_NAME) return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end +local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast + +rebalance.on_safe_mode_toggle(function(is_enabled) + if is_enabled then + call_on_storage = call_on_storage_safe + + local fibers_killed = 0 + for fb_id, fb in pairs(fiber.info()) do + if fb.name == CRUD_CALL_FIBER_NAME then + fiber.kill(fb_id) + fibers_killed = fibers_killed + 1 + end + end + log.debug('Killed %d fibers with fast-mode crud requests.', fibers_killed) + else + call_on_storage = call_on_storage_fast + end +end) + call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} function call.get_vshard_call_name(mode, prefer_replica, balance) @@ -82,7 +109,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc )) end -local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts) +--- Executes a vshard call and retries once after performing recovery actions +--- like bucket cache reset, destination redirect (for single calls), or master discovery. +local function call_with_retry_and_recovery(vshard_router, + replicaset, method, func_name, func_args, call_opts, is_single_call) local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) -- In case cluster was just bootstrapped with auto master discovery, @@ -93,7 +123,27 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f return resp, err end - if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then + -- This is a partial copy of error handling from vshard.router.router_call_impl() + -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard. + if err.class_name == bucket_ref_unref.BucketRefError.name then + if is_single_call and #err.bucket_ref_errs == 1 then + local single_err = err.bucket_ref_errs[1] + local destination = single_err.vshard_err.destination + if destination and vshard_router.replicasets[destination] then + replicaset = vshard_router.replicasets[destination] + end + end + + for _, bucket_ref_err in pairs(err.bucket_ref_errs) do + local bucket_id = bucket_ref_err.bucket_id + local vshard_err = bucket_ref_err.vshard_err + if vshard_err.name == 'WRONG_BUCKET' or + vshard_err.name == 'BUCKET_IS_LOCKED' or + vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then + vshard_router:_bucket_reset(bucket_id) + end + end + elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then replicaset:locate_master() end @@ -147,8 +197,8 @@ function call.map(vshard_router, func_name, func_args, opts) while iter:has_next() do local args, replicaset, replicaset_id = iter:get() - local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, args, call_opts) + local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name, + func_name, args, call_opts, false) if err ~= nil then local result_info = { @@ -170,9 +220,9 @@ function call.map(vshard_router, func_name, func_args, opts) futures_by_replicasets[replicaset_id] = future end - local deadline = fiber_clock() + timeout + local deadline = fiber.clock() + timeout for replicaset_id, future in pairs(futures_by_replicasets) do - local wait_timeout = deadline - fiber_clock() + local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 end @@ -221,9 +271,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local request_timeout = opts.mode == 'read' and opts.request_timeout or nil - local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, func_args, {timeout = timeout, - request_timeout = request_timeout}) + local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name, + func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) end @@ -248,8 +297,8 @@ function call.any(vshard_router, func_name, func_args, opts) end local replicaset_id, replicaset = next(replicasets) - local res, err = retry_call_with_master_discovery(replicaset, 'call', - func_name, func_args, {timeout = timeout}) + local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call', + func_name, func_args, {timeout = timeout}, false) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua new file mode 100644 index 00000000..dde7cc40 --- /dev/null +++ b/crud/common/rebalance.lua @@ -0,0 +1,197 @@ +local fiber = require('fiber') +local log = require('log') +local trigger = require('internal.trigger') +local vshard_consts = require('vshard.consts') +local utils = require('crud.common.utils') +local has_metrics_module, metrics = pcall(require, 'metrics') + +local SAFE_MODE_SPACE = '_crud_rebalance_safe_mode_local' + +local rebalance = { + safe_mode = false, + -- Trigger is run with one argument: true if safe mode is enabled and false if disabled. + on_safe_mode_toggle = trigger.new('_crud.safe_mode_toggle'), + _router_cache_last_clear_ts = 0 -- On module load we don't know when (and if) route cache was cleared. +} + +local function safe_mode_bucket_trigger(_, new, space, op) + if space ~= '_bucket' then + return + end + -- We are interested only in two operations that indicate the beginning of bucket migration: + -- * We are receiving a bucket (new bucket with status RECEIVING) + -- * We are sending a bucket to another node (existing bucket status changes to SENDING) + if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or + (op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then + local stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' } + if not stored_safe_mode or not stored_safe_mode.value then + box.space[SAFE_MODE_SPACE]:replace{ 'status', true } + end + end +end + +local function _safe_mode_enable() + -- The trigger is needed to detect the beginning of rebalance process to enable safe mode. + -- If safe mode is enabled we don't need the trigger anymore. + for _, trig in pairs(box.space._bucket:on_replace()) do + if trig == safe_mode_bucket_trigger then + box.space._bucket:on_replace(nil, trig) + end + end + rebalance.safe_mode = true + + -- This function is running inside on_commit trigger, need pcall to protect from errors in external code. + pcall(rebalance.on_safe_mode_toggle.run, rebalance.on_safe_mode_toggle, true) + + log.info('Rebalance safe mode enabled') +end + +local function _safe_mode_disable() + -- We have disabled safe mode so we need to add the trigger to detect the beginning + -- of rebalance process to enable safe mode again. + box.space._bucket:on_replace(safe_mode_bucket_trigger) + rebalance.safe_mode = false + + -- This function is running inside on_commit trigger, need pcall to protect from errors in external code. + pcall(rebalance.on_safe_mode_toggle.run, rebalance.on_safe_mode_toggle, false) + + log.info('Rebalance safe mode disabled') +end + +local function create_space() + local safe_mode_space = box.schema.space.create(SAFE_MODE_SPACE, { + engine = 'memtx', + format = { + { name = 'key', type = 'string' }, + { name = 'value', type = 'any' }, + }, + is_local = true, + if_not_exists = true, + }) + safe_mode_space:create_index('primary', { parts = { 'key' }, if_not_exists = true }) + safe_mode_space:insert{ 'status', false } +end + +local function create_trigger() + box.space[SAFE_MODE_SPACE]:on_replace(function() + box.on_commit(function(rows_iter) + local safe_space_id = box.space[SAFE_MODE_SPACE].id + -- There may be multiple operations on safe mode status tuple in one transaction. + -- We will take only the last action. + -- 0 = do nothing, 1 = enable safe mode, -1 = disable safe mode + local safe_mode_action = 0 + for _, old, new, sp in rows_iter() do + if sp ~= safe_space_id then + goto continue + end + assert((old == nil or old.key == 'status') and (new.key == 'status')) + + if (not old or not old.value) and new.value then + safe_mode_action = 1 + elseif old.value and not new.value then + safe_mode_action = -1 + end + + ::continue:: + end + + if safe_mode_action == 1 then + _safe_mode_enable() + elseif safe_mode_action == -1 then + _safe_mode_disable() + end + end) + end) +end + +function rebalance.init() + local stored_safe_mode + if not box.info.ro then + if box.space[SAFE_MODE_SPACE] == nil then + create_space() + create_trigger() + end + stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' } + else + while box.space[SAFE_MODE_SPACE] == nil or box.space[SAFE_MODE_SPACE].index[0] == nil do + fiber.sleep(0.05) + end + create_trigger() + stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' } + end + + if stored_safe_mode and stored_safe_mode.value then + _safe_mode_enable() + else + _safe_mode_disable() + end +end + +function rebalance.safe_mode_status() + return rebalance.safe_mode +end + +function rebalance.safe_mode_enable() + box.space[SAFE_MODE_SPACE]:replace{ 'status', true } +end + +function rebalance.safe_mode_disable() + box.space[SAFE_MODE_SPACE]:replace{ 'status', false } +end + +--- Rebalance storage API +rebalance.storage_api = { + rebalance_safe_mode_status = rebalance.safe_mode_status, + rebalance_safe_mode_enable = rebalance.safe_mode_enable, + rebalance_safe_mode_disable = rebalance.safe_mode_disable, +} + +--- Rebalance router API +rebalance.router_api = {} + +function rebalance.router_api.cache_clear() + local router = utils.get_vshard_router_instance() + if router == nil then + log.warn("Router is not initialized yet") + return + end + rebalance._router_cache_last_clear_ts = fiber.time() + return router:_route_map_clear() +end + +function rebalance.router_api.cache_length() + local router = utils.get_vshard_router_instance() + if router == nil then + log.warn("Router is not initialized yet") + return + end + return router.known_bucket_count +end + +function rebalance.router_api.cache_last_clear_ts() + return rebalance._router_cache_last_clear_ts +end + +--- Rebalance related metrics +if has_metrics_module then + local safe_mode_enabled_gauge = metrics.gauge( + 'tnt_crud_storage_safe_mode_enabled', + "is safe mode enabled on this storage instance" + ) + local router_cache_length_gauge = metrics.gauge( + 'tnt_crud_router_cache_length', + "number of bucket routes in vshard router cache" + ) + local router_cache_last_clear_ts_gauge = metrics.gauge( + 'tnt_crud_router_cache_last_clear_ts', + "when vshard router cache was cleared last time" + ) + + metrics.register_callback(function() + safe_mode_enabled_gauge:set(rebalance.safe_mode_status() and 1 or 0) + router_cache_length_gauge:set(rebalance.router_api.cache_length()) + router_cache_last_clear_ts_gauge:set(rebalance.router_api.cache_last_clear_ts()) + end) +end + +return rebalance diff --git a/crud/common/sharding/bucket_ref_unref.lua b/crud/common/sharding/bucket_ref_unref.lua new file mode 100644 index 00000000..dd07074f --- /dev/null +++ b/crud/common/sharding/bucket_ref_unref.lua @@ -0,0 +1,165 @@ +--- Module to call vshard.storage.bucket_ref / vshard.storage.bucket_unref +--- on write requests +--- there are two modes: safe and fast. on safe mode module +--- calls vshard.storage.bucket_ref / vshard.storage.bucket_unref +--- on fast mode it does nothing. +--- Default is fast mode. + +--- bucket_ref/bucket_unref must be called in one transaction in order to prevent +--- safe_mode change during execution. + +local vshard = require('vshard') +local errors = require('errors') +local rebalance = require('crud.common.rebalance') + +local safe_methods +local fast_methods + +local bucket_ref_unref = { + BucketRefError = errors.new_class('bucket_ref_error', {capture_stack = false}) +} + +local function make_bucket_ref_err(bucket_id, vshard_ref_err) + local err = bucket_ref_unref.BucketRefError:new( + "failed bucket_ref: %s, bucket_id: %s", + vshard_ref_err.name, + bucket_id + ) + err.bucket_ref_errs = { + { + bucket_id = bucket_id, + vshard_err = vshard_ref_err, + } + } + return err +end + +--- Safe bucket_refrw implementation that calls vshard.storage.bucket_refrw. +--- must be called with bucket_unrefrw in transaction +function bucket_ref_unref._bucket_refrw(bucket_id) + local ref_ok, vshard_ref_err = vshard.storage.bucket_refrw(bucket_id) + if not ref_ok then + return nil, make_bucket_ref_err(bucket_id, vshard_ref_err) + end + + return true, nil, bucket_ref_unref._bucket_unrefrw +end + +--- Safe bucket_unrefrw implementation that calls vshard.storage.bucket_unrefrw. +--- must be called with bucket_refrw in transaction +function bucket_ref_unref._bucket_unrefrw(bucket_id) + return vshard.storage.bucket_unrefrw(bucket_id) +end + +--- Safe bucket_refro implementation that calls vshard.storage.bucket_refro. +function bucket_ref_unref._bucket_refro(bucket_id) + local ref_ok, vshard_ref_err = vshard.storage.bucket_refro(bucket_id) + if not ref_ok then + return nil, make_bucket_ref_err(bucket_id, vshard_ref_err) + end + + return true, nil, bucket_ref_unref._bucket_unrefro +end + +--- Safe bucket_unrefro implementation that calls vshard.storage.bucket_unrefro. +--- must be called in one transaction with bucket_refrw_many +function bucket_ref_unref._bucket_unrefro(bucket_id) + return vshard.storage.bucket_unrefro(bucket_id) +end + +--- Safe bucket_refrw_many that calls bucket_refrw for every bucket and aggregates errors +--- @param bucket_ids table +function bucket_ref_unref._bucket_refrw_many(bucket_ids) + local bucket_ref_errs = {} + local reffed_bucket_ids = {} + for bucket_id in pairs(bucket_ids) do + local ref_ok, bucket_refrw_err = safe_methods.bucket_refrw(bucket_id) + if not ref_ok then + table.insert(bucket_ref_errs, bucket_refrw_err.bucket_ref_errs[1]) + break + end + reffed_bucket_ids[bucket_id] = true + end + + if next(bucket_ref_errs) ~= nil then + local err = bucket_ref_unref.BucketRefError:new(bucket_ref_unref.BucketRefError:new("failed bucket_ref")) + err.bucket_ref_errs = bucket_ref_errs + bucket_ref_unref._bucket_unrefrw_many(reffed_bucket_ids) + return nil, err + end + + return true, nil, bucket_ref_unref._bucket_unrefrw_many +end + +--- Safe bucket_unrefrw_many that calls vshard.storage.bucket_unrefrw for every bucket. +--- must be called in one transaction with bucket_refrw_many +--- @param bucket_ids table +function bucket_ref_unref._bucket_unrefrw_many(bucket_ids) + local unref_all_ok = true + local unref_last_err + for reffed_bucket_id in pairs(bucket_ids) do + local unref_ok, unref_err = safe_methods.bucket_unrefrw(reffed_bucket_id) + if not unref_ok then + unref_all_ok = false + unref_last_err = unref_err + end + end + + if not unref_all_ok then + return nil, unref_last_err + end + return true +end + +--- _fast implements module logic for fast mode +function bucket_ref_unref._fast() + return true, nil, bucket_ref_unref._fast +end + +safe_methods = { + bucket_refrw = bucket_ref_unref._bucket_refrw, + bucket_unrefrw = bucket_ref_unref._bucket_unrefrw, + bucket_refro = bucket_ref_unref._bucket_refro, + bucket_unrefro = bucket_ref_unref._bucket_unrefro, + bucket_refrw_many = bucket_ref_unref._bucket_refrw_many, + bucket_unrefrw_many = bucket_ref_unref._bucket_unrefrw_many, +} + +fast_methods = { + bucket_refrw = bucket_ref_unref._fast, + bucket_unrefrw = bucket_ref_unref._fast, + bucket_refro = bucket_ref_unref._fast, + bucket_unrefro = bucket_ref_unref._fast, + bucket_refrw_many = bucket_ref_unref._fast, + bucket_unrefrw_many = bucket_ref_unref._fast, +} + +local function set_methods(methods) + for method_name, func in pairs(methods) do + bucket_ref_unref[method_name] = func + end +end + +local function set_safe_mode() + set_methods(safe_methods) +end + +local function set_fast_mode() + set_methods(fast_methods) +end + +if rebalance.safe_mode then + set_safe_mode() +else + set_fast_mode() +end + +rebalance.on_safe_mode_toggle(function(is_enabled) + if is_enabled then + set_safe_mode() + else + set_fast_mode() + end +end) + +return bucket_ref_unref diff --git a/crud/delete.lua b/crud/delete.lua index 6f15b9af..4c12e420 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local DeleteError = errors.new_class('DeleteError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_DELETE_FUNC_NAME = utils.get_storage_call(DELETE_FUNC_NAME) local function delete_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -42,14 +44,25 @@ local function delete_on_storage(space_name, key, field_names, opts) return nil, err end + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + -- add_space_schema_hash is false because -- reloading space format on router can't avoid delete error on storage - return schema.wrap_box_space_func_result(space, 'delete', {key}, { + local result = schema.wrap_box_space_func_result(space, 'delete', {key}, { add_space_schema_hash = false, field_names = field_names, noreturn = opts.noreturn, fetch_latest_metadata = opts.fetch_latest_metadata, }) + local unref_ok, err_unref = unref(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result end delete.storage_api = {[DELETE_FUNC_NAME] = delete_on_storage} @@ -116,6 +129,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local delete_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/get.lua b/crud/get.lua index feca57c5..837ec9a6 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local GetError = errors.new_class('GetError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_GET_FUNC_NAME = utils.get_storage_call(GET_FUNC_NAME) local function get_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -41,13 +43,23 @@ local function get_on_storage(space_name, key, field_names, opts) return nil, err end + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refro(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end -- add_space_schema_hash is false because -- reloading space format on router can't avoid get error on storage - return schema.wrap_box_space_func_result(space, 'get', {key}, { + local result = schema.wrap_box_space_func_result(space, 'get', {key}, { add_space_schema_hash = false, field_names = field_names, fetch_latest_metadata = opts.fetch_latest_metadata, }) + local unref_ok, err_unref = unref(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result end get.storage_api = {[GET_FUNC_NAME] = get_on_storage} @@ -114,6 +126,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts) sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local get_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/insert.lua b/crud/insert.lua index ebb3f865..e49a7682 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local InsertError = errors.new_class('InsertError', {capture_stack = false}) @@ -42,15 +43,29 @@ local function insert_on_storage(space_name, tuple, opts) return nil, err end + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id) + + if not ref_ok then + return nil, bucket_ref_err + end + -- add_space_schema_hash is true only in case of insert_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'insert', {tuple}, { + local result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, { add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, fetch_latest_metadata = opts.fetch_latest_metadata, }) + + local unref_ok, err_unref = unref(bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result end insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage} diff --git a/crud/insert_many.lua b/crud/insert_many.lua index f4299fd6..4eb3a44a 100644 --- a/crud/insert_many.lua +++ b/crud/insert_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -48,6 +49,16 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples) end + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + local inserted_tuples = {} local errs = {} local replica_schema_version = nil @@ -84,7 +95,11 @@ local function insert_many_on_storage(space_name, tuples, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -93,7 +108,11 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end @@ -104,7 +123,11 @@ local function insert_many_on_storage(space_name, tuples, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -113,12 +136,20 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, nil, replica_schema_version end diff --git a/crud/replace.lua b/crud/replace.lua index 5e36906f..73019a25 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local ReplaceError = errors.new_class('ReplaceError', { capture_stack = false }) @@ -42,15 +43,25 @@ local function replace_on_storage(space_name, tuple, opts) return nil, err end + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end -- add_space_schema_hash is true only in case of replace_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'replace', {tuple}, { + local result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, { add_space_schema_hash = opts.add_space_schema_hash, field_names = opts.fields, noreturn = opts.noreturn, fetch_latest_metadata = opts.fetch_latest_metadata, }) + local unref_ok, err_unref = unref(bucket_id) + if not unref_ok then + return nil, err_unref + end + return result end replace.storage_api = {[REPLACE_FUNC_NAME] = replace_on_storage} diff --git a/crud/replace_many.lua b/crud/replace_many.lua index d047a3b0..4d5b5775 100644 --- a/crud/replace_many.lua +++ b/crud/replace_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -52,6 +53,16 @@ local function replace_many_on_storage(space_name, tuples, opts) local errs = {} local replica_schema_version = nil + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + box.begin() for i, tuple in ipairs(tuples) do -- add_space_schema_hash is true only in case of replace_object_many @@ -87,17 +98,23 @@ local function replace_many_on_storage(space_name, tuples, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() - + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end end @@ -107,7 +124,11 @@ local function replace_many_on_storage(space_name, tuples, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -116,13 +137,20 @@ local function replace_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() - + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, nil, replica_schema_version end diff --git a/crud/schema.lua b/crud/schema.lua index 57743ba9..61bb7373 100644 --- a/crud/schema.lua +++ b/crud/schema.lua @@ -46,6 +46,8 @@ schema.system_spaces = { ['_tt_migrations'] = true, -- https://github.com/tarantool/cluster-federation/blob/01738cafa0dc7a3138e64f93c4e84cb323653257/src/internal/utils/utils.go#L17 ['_cdc_state'] = true, + -- crud/common/rebalance.lua + ['_crud_rebalance_safe_mode_local'] = true, } local function get_crud_schema(space) diff --git a/crud/storage.lua b/crud/storage.lua index 0b8ef770..f7cf6cbc 100644 --- a/crud/storage.lua +++ b/crud/storage.lua @@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks') local stash = require('crud.common.stash') local utils = require('crud.common.utils') +local rebalance = require('crud.common.rebalance') local call = require('crud.common.call') local sharding_metadata = require('crud.common.sharding.sharding_metadata') local insert = require('crud.insert') @@ -62,6 +63,7 @@ local function init_storage_call(user, storage_api) end local modules_with_storage_api = { + rebalance, call, sharding_metadata, insert, @@ -103,6 +105,8 @@ local function init_impl() user = utils.get_this_replica_user() or 'guest' end + rebalance.init() + for _, module in ipairs(modules_with_storage_api) do init_storage_call(user, module.storage_api) end diff --git a/crud/update.lua b/crud/update.lua index 36ce242d..95827e9c 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local UpdateError = errors.new_class('UpdateError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME) local function update_on_storage(space_name, key, operations, field_names, opts) dev_checks('string', '?', 'table', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -42,6 +44,11 @@ local function update_on_storage(space_name, key, operations, field_names, opts) return nil, err end + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + -- add_space_schema_hash is false because -- reloading space format on router can't avoid update error on storage local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { @@ -51,20 +58,12 @@ local function update_on_storage(space_name, key, operations, field_names, opts) fetch_latest_metadata = opts.fetch_latest_metadata, }) - if err ~= nil then - return nil, err - end - - if res.err == nil then - return res, nil - end - - -- Relevant for Tarantool older than 2.8.1. - -- We can only add fields to end of the tuple. - -- If schema is updated and nullable fields are added, then we will get error. - -- Therefore, we need to add filling of intermediate nullable fields. - -- More details: https://github.com/tarantool/tarantool/issues/3378 - if utils.is_field_not_found(res.err.code) then + if err == nil and res.err ~= nil and utils.is_field_not_found(res.err.code) then + -- Relevant for Tarantool older than 2.8.1. + -- We can only add fields to end of the tuple. + -- If schema is updated and nullable fields are added, then we will get error. + -- Therefore, we need to add filling of intermediate nullable fields. + -- More details: https://github.com/tarantool/tarantool/issues/3378 operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key)) res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { add_space_schema_hash = false, @@ -74,6 +73,11 @@ local function update_on_storage(space_name, key, operations, field_names, opts) }) end + local unref_ok, err_unref = unref(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + return res, err end @@ -148,6 +152,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local update_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/upsert.lua b/crud/upsert.lua index 5be7bc4a..ad4cb40c 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local UpsertError = errors.new_class('UpsertError', { capture_stack = false}) @@ -41,13 +42,26 @@ local function upsert_on_storage(space_name, tuple, operations, opts) return nil, err end + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + -- add_space_schema_hash is true only in case of upsert_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, { + local result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, { add_space_schema_hash = opts.add_space_schema_hash, fetch_latest_metadata = opts.fetch_latest_metadata, }) + + local unref_ok, err_unref = unref(bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result end upsert.storage_api = {[UPSERT_FUNC_NAME] = upsert_on_storage} diff --git a/crud/upsert_many.lua b/crud/upsert_many.lua index f030e778..cb6d7807 100644 --- a/crud/upsert_many.lua +++ b/crud/upsert_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchUpsertIterator = require('crud.common.map_call_cases.batch_upsert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -47,6 +48,16 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples) end + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + local processed_tuples = {} local errs = {} local replica_schema_version = nil @@ -81,7 +92,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(processed_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, processed_tuples) @@ -90,7 +105,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, errs, replica_schema_version end @@ -101,7 +120,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = unref(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(processed_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, processed_tuples) @@ -110,12 +133,20 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = unref(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, nil, replica_schema_version end diff --git a/test/helper.lua b/test/helper.lua index 9aa127d1..eae17e8c 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -981,6 +981,16 @@ function helpers.start_cluster(g, cartridge_cfg, vshard_cfg, tarantool3_cluster_ error(err) end end + + if g.params and g.params.safe_mode ~= nil then + local safe_mod_func = '_crud.rebalance_safe_mode_disable' + if g.params.safe_mode then + safe_mod_func = '_crud.rebalance_safe_mode_enable' + end + helpers.call_on_storages(g.cluster, function(server) + server:call(safe_mod_func) + end) + end end local function count_storages_in_topology(g, backend, vshard_group, storage_roles) @@ -1178,8 +1188,34 @@ function helpers.is_cartridge_suite_supported() return is_module_provided and is_tarantool_supports end -function helpers.backend_matrix(base_matrix) +function helpers.safe_mode_matrix(base_matrix) + base_matrix = base_matrix or {{}} + + local safe_mode_params = { + { safe_mode = true }, + { safe_mode = false }, + } + + local matrix = {} + for _, params in ipairs(safe_mode_params) do + for _, base in ipairs(base_matrix) do + base = table.deepcopy(base) + base.safe_mode = params.safe_mode + table.insert(matrix, base) + end + end + + return matrix +end + +function helpers.backend_matrix(base_matrix, opts) base_matrix = base_matrix or {{}} + opts = opts or {} + + if not opts.skip_safe_mode then + base_matrix = helpers.safe_mode_matrix(base_matrix) + end + local backend_params = { { backend = helpers.backend.VSHARD, diff --git a/test/integration/cartridge_reload_test.lua b/test/integration/cartridge_reload_test.lua index 5ddeeacf..130dffcf 100644 --- a/test/integration/cartridge_reload_test.lua +++ b/test/integration/cartridge_reload_test.lua @@ -113,6 +113,11 @@ function g.test_storage() t.skip_if(not helpers.is_cartridge_hotreload_supported(), "Cartridge roles reload is not supported") helpers.skip_old_tarantool_cartridge_hotreload() + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + require('crud.common.rebalance').safe_mode_disable() + ]]) + end) g.highload_fiber = fiber.new(highload_loop, 'B') diff --git a/test/integration/double_buckets_test.lua b/test/integration/double_buckets_test.lua new file mode 100644 index 00000000..326de2ca --- /dev/null +++ b/test/integration/double_buckets_test.lua @@ -0,0 +1,315 @@ +local t = require('luatest') +local json = require('json') +local fiber = require('fiber') + +local utils = require('crud.common.utils') + +local helpers = require('test.helper') + +local function wait_balance(g) + t.helpers.retrying({timeout=30}, function() + local buckets_count_s1 = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + local buckets_count_s2 = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + t.assert_equals(buckets_count_s1, 1500) + t.assert_equals(buckets_count_s2, 1500) + end) +end + +local function balance_cluster(g) + if g.params.backend == "config" then + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + wait_balance(g) + end +end + +local pgroup_duplicates = t.group('double_buckets_duplicates', helpers.backend_matrix({ + {engine = 'memtx', operation = 'replace'}, + {engine = 'memtx', operation = 'insert'}, + {engine = 'memtx', operation = 'upsert'}, + {engine = 'memtx', operation = 'insert_many'}, + {engine = 'memtx', operation = 'replace_many'}, + {engine = 'memtx', operation = 'upsert_many'}, +})) + +pgroup_duplicates.before_all(function(g) + helpers.start_default_cluster(g, 'srv_simple_operations') +end) + +pgroup_duplicates.after_all(function(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end) + +pgroup_duplicates.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup_duplicates.after_each(function(g) + balance_cluster(g) +end) + +--- Rebalance stalls if we move all buckets at once; use a small subset. +local test_tuples = { + {22, box.NULL, 'Alex', 34}, + {92, box.NULL, 'Artur', 29}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + {71, box.NULL, 'Oksana', 29}, +} + +local last_call = fiber.time() +local duplicate_operations = { + insert = function(g) + return g.router:call('crud.insert', {'customers', {45, box.NULL, 'John Fedor', 42}}) + end, + replace = function(g) + return g.router:call('crud.replace', {'customers', {45, box.NULL, 'John Fedor', 42}}) + end, + upsert = function (g) + return g.router:call('crud.upsert', {'customers', {45, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}}) + end, + insert_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + return g.router:call('crud.insert_many', {'customers', test_tuples}) + end, + replace_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + return g.router:call('crud.replace_many', {'customers', test_tuples}) + end, + upsert_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + local tuples = {} + for i = 1, 2 do + tuples[i] = {{i, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}} + end + return g.router:call('crud.upsert_many', {'customers', tuples}) + end +} + +local function check_duplicates(tuples) + local ids = {} + for _, tuple in pairs(tuples) do + t.assert_equals(ids[tuple[1]], nil, ('duplicate to tuple: %s'):format(json.encode(tuple))) + ids[tuple[1]] = true + end +end + + +--- write requests cause duplicates by primary key in cluster +pgroup_duplicates.test_duplicates = function(g) + t.skip_if( + not ( + utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config") + ), + 'test implemented only for 3.1 and greater' + ) + if g.params.backend == "config" then + duplicate_operations[g.params.operation](g) + + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 0, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + duplicate_operations[g.params.operation](g) + t.assert_equals(buckets_count, 0) + end) + + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 0, + } + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + duplicate_operations[g.params.operation](g) + t.assert_equals(buckets_count, 0) + end) + + local res = g.router:call('crud.select', {'customers'}) + check_duplicates(res.rows) + end +end + +local pgroup_not_applied = t.group('double_buckets_not_applied', helpers.backend_matrix({ + {engine = 'memtx', operation = 'delete'}, + {engine = 'memtx', operation = 'update'}, + {engine = 'memtx', operation = 'get'}, +})) + +pgroup_not_applied.before_all(function(g) + helpers.start_default_cluster(g, 'srv_simple_operations') +end) + +pgroup_not_applied.after_all(function(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end) + +pgroup_not_applied.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup_not_applied.after_each(function(g) + balance_cluster(g) +end) + +local not_applied_operations = { + delete = { + call = function(g, key) + last_call = fiber.time() + return g.router:call('crud.delete', { 'customers', {key} }) + end, + check_applied = function(rows, applied_ids) + for _, tuple in pairs(rows) do + t.assert_equals( + applied_ids[tuple[1]], + nil, + ('tuples %s was marked as deleted, but exists'):format(json.encode(tuple)) + ) + end + end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were inserted, but crud.delete returned 0 rows, as if there were no such tuples' + ) + end + }, + update = { + call = function(g, key) + return g.router:call('crud.update', { 'customers', key, {{'=', 'name', 'applied'}} }) + end, + check_applied = function(rows, applied_ids) + for _, tuple in pairs(rows) do + if applied_ids[tuple[1]] then + t.assert_equals( + tuple[3], + 'applied', + ('tuples %s was marked as updated, but was not updated'):format(json.encode(tuple)) + ) + end + end + end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were created, but crud.update returned 0 rows, as if there were no such tuples' + ) + end + }, + get = { + call = function (g, key) + return g.router:call('crud.get', { 'customers', key, {mode = 'write'} }) + end, + check_applied = function() end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were created, but crud.get returned 0 rows, as if there were no such tuples' + ) + end + } +} + +--- Some requests do not create duplicates but return 0 rows as if there is no tuple +--- with this key. The tuple can still exist in cluster but be unavailable during +--- rebalance. CRUD should return an error in this case, not 0 rows as if there were +--- no tuples. +pgroup_not_applied.test_not_applied = function(g) + t.skip_if( + not ( + utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config") + ), + 'test implemented only for 3.1 and greater' + ) + + if g.params.backend == "config" then + local tuples, tuples_count = {}, 1000 + for i = 1, tuples_count do + tuples[i] = {i, box.NULL, 'John Fedor', 42} + end + + local _, err = g.router:call('crud.replace_many', {'customers', tuples}) + t.assert_equals(err, nil) + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 0, + } + g.cluster:cfg(cfg) + local tuple_id = 1 + local not_applied_ids = {} + local applied_ids = {} + t.helpers.retrying({timeout=30}, function() + if tuple_id > tuples_count then + return + end + + local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + local res, err = not_applied_operations[g.params.operation].call(g, tuple_id) + if err == nil then + if #res.rows == 0 then + not_applied_ids[tuple_id] = true + else + applied_ids[tuple_id] = true + end + tuple_id = tuple_id + 1 + end + + t.assert_equals(buckets_count, 0) + end) + + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 0, + } + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + if tuple_id > tuples_count then + return + end + + local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + local res, err = not_applied_operations[g.params.operation].call(g, tuple_id) + + if err == nil then + if #res.rows == 0 then + not_applied_ids[tuple_id] = true + else + applied_ids[tuple_id] = true + end + tuple_id = tuple_id + 1 + end + + t.assert_equals(buckets_count, 0) + end) + + local res = g.router:call('crud.select', {'customers'}) + not_applied_operations[g.params.operation].check_applied(res.rows, applied_ids) + not_applied_operations[g.params.operation].check_not_applied(not_applied_ids) + end +end diff --git a/test/integration/metrics_test.lua b/test/integration/metrics_test.lua new file mode 100644 index 00000000..58727a9d --- /dev/null +++ b/test/integration/metrics_test.lua @@ -0,0 +1,109 @@ +local helpers = require('test.helper') +local t = require('luatest') + +local pgroup = t.group('metrics_integration', helpers.backend_matrix({ + {engine = 'memtx'}, +})) + +local function before_all(g) + helpers.start_default_cluster(g, 'srv_stats') +end + +local function after_all(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end + +local function before_each(g) + g.router:eval("crud = require('crud')") + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_disable') + end) +end + +pgroup.before_all(before_all) + +pgroup.after_all(after_all) + +pgroup.before_each(before_each) + +pgroup.test_safe_mode_storage_metrics = function(g) + local has_metrics_module = require('metrics') + t.skip_if(not has_metrics_module, 'No metrics module in current version') + + -- Check safe mode metric on storage + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 0, 'Metric must show safe mode disabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) + + -- Enable safe mode + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_enable') + end) + + -- Check that metric value has changed + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 1, 'Metric must show safe mode enabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) +end + +pgroup.test_safe_mode_router_metrics = function(g) + local has_metrics_module = require('metrics') + t.skip_if(not has_metrics_module, 'No metrics module in current version') + + -- Check initial router cache metric + local observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + t.assert_equals(m.value, 0, 'Last cache clear TS must be zero on start') + break + end + end + + g.router:eval("crud.rebalance.router_cache_clear()") + + -- Check router cache metric after cache clear + observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local first_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + first_ts = m.value + break + end + end + t.assert_gt(first_ts, 0, 'Last cache clear TS must be greater than zero') + + g.router:eval("crud.rebalance.router_cache_clear()") + + -- Check that last_clear_ts has changed + observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local new_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + new_ts = m.value + break + end + end + t.assert_gt(new_ts, first_ts, 'Last cache clear TS is greater than the first one') +end diff --git a/test/integration/privileges_test.lua b/test/integration/privileges_test.lua index 74816417..ec8f8da5 100644 --- a/test/integration/privileges_test.lua +++ b/test/integration/privileges_test.lua @@ -123,6 +123,9 @@ local function privilegies_test_base_init(g, access_operation_type) if access_operation_type and box.space.customers then box.schema.user.grant('testuser1', access_operation_type, 'space', 'customers') end + if box.space._bucket then + box.schema.user.grant('testuser1', 'read', 'space', '_bucket') + end box.session.su(user) end diff --git a/test/tarantool3_helpers/cluster.lua b/test/tarantool3_helpers/cluster.lua index 21574d2b..df8caf81 100644 --- a/test/tarantool3_helpers/cluster.lua +++ b/test/tarantool3_helpers/cluster.lua @@ -297,34 +297,7 @@ function Cluster:cfg(new_config) return table.deepcopy(self.config) end -local function strip_all_entries(t, name) - if type(t) ~= 'table' then - return t - end - - t[name] = nil - - for k, v in pairs(t) do - t[k] = strip_all_entries(v, name) - end - - return t -end - -local function check_only_roles_cfg_changed_in_groups(old_groups, new_groups) - old_groups = table.deepcopy(old_groups) - new_groups = table.deepcopy(new_groups) - - local old_groups_no_roles_cfg = strip_all_entries(old_groups, 'roles_cfg') - local new_groups_no_roles_cfg = strip_all_entries(new_groups, 'roles_cfg') - - t.assert_equals(new_groups_no_roles_cfg, old_groups_no_roles_cfg, - 'groups reload supports only roles_cfg reload') -end - function Cluster:reload_config(new_config) - check_only_roles_cfg_changed_in_groups(self.config.groups, new_config.groups) - for _, server in ipairs(self.servers) do write_config(self.dirs[server], new_config) end diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index f4e21926..912fc692 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -242,6 +242,11 @@ pgroup.test_any_vshard_call = function(g) end pgroup.test_any_vshard_call_timeout = function(g) + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + require('crud.common.rebalance').safe_mode_disable() + ]]) + end) local timeout = 0.2 local results, err = g.router:eval([[ diff --git a/test/unit/not_initialized_test.lua b/test/unit/not_initialized_test.lua index 82f9b4ee..e2f6793a 100644 --- a/test/unit/not_initialized_test.lua +++ b/test/unit/not_initialized_test.lua @@ -5,7 +5,7 @@ local server = require('luatest.server') local pgroup = t.group('not-initialized', helpers.backend_matrix({ {}, -})) +}, { skip_safe_mode = true })) local vshard_cfg_template = { sharding = { diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index fdaf8c01..9ae6847d 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -45,6 +45,7 @@ local function enable_stats(g, params) params = table.deepcopy(params) params.backend = nil params.backend_cfg = nil + params.safe_mode = nil end g.router:eval("stats_module.enable(...)", { params }) end