Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
* Implement "safe" mode to prevent writing data to wrong replicaset when vshard rebalance is in progress.
* Auto switch to safe mode when rebalance process starts.
* Manual return to fast mode.

## [1.6.1] - 19-09-25

### Added
Expand Down
2 changes: 1 addition & 1 deletion crud-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies = {
'lua ~> 5.1',
'checks >= 3.3.0-1',
'errors >= 2.2.1-1',
'vshard >= 0.1.36-1',
'vshard >= 0.1.39-1',
}

build = {
Expand Down
17 changes: 16 additions & 1 deletion crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local readview = require('crud.readview')
local schema = require('crud.schema')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic, this is not the way, we develop open-source modules. We should rebase that branch in master to make the commit history linear, and not just merge it. So, all commits like Minor changes according to review comments will be visible to our users. You can check out, how Georgy Moiseev did it: proper commits, proper commit messages, tests in every commit (e.g. 8d7cae0).

Now, I'm forced to review more than 1k lines in one step, which is very inconvenient and increases the chanse of missed bugs. And our users won't be able to check the individual commits, if they want to. Of course, it's up to you, since I'm not the maintainer of that module, but it's just not nice to develop and merge code like that.

IMHO, the features should be properly split between commits, every commit must include the associated tests, proper commit messages and mentioning of the #448 ticket. Of course, refactoring or code moving should be in the separate commits. Between commits all of the test must pass

Copy link
Author

@ita-sammann ita-sammann Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic

Of course these commits will not go to master, I will re-split them before merging the PR.

Now, I'm forced to review more than 1k lines in one step

My bad. Never thought of this PR as of multiple features that can be reviewed separately.

local storage_info = require('crud.storage_info')
local storage = require('crud.storage')
local rebalance = require('crud.common.rebalance')

local crud = {}

Expand Down Expand Up @@ -158,8 +159,22 @@ crud.readview = readview.new
-- @function schema
crud.schema = schema.call

crud.rebalance = {}

-- @refer rebalance.router_cache_clear
-- @function router_cache_clear
crud.rebalance.router_cache_clear = rebalance.router_api.cache_clear

-- @refer rebalance.router_cache_length
-- @function router_cache_length
crud.rebalance.router_cache_length = rebalance.router_api.cache_length

-- @refer rebalance.router_cache_last_clear_ts
-- @function router_cache_last_clear_ts
crud.rebalance.router_cache_last_clear_ts = rebalance.router_api.cache_last_clear_ts

function crud.init_router()
rawset(_G, 'crud', crud)
rawset(_G, 'crud', crud)
end

function crud.stop_router()
Expand Down
77 changes: 63 additions & 14 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
local errors = require('errors')
local log = require('log')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local fiber = require('fiber')
local const = require('crud.common.const')
local rebalance = require('crud.common.rebalance')
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -14,14 +17,38 @@ local CallError = errors.new_class('CallError')

local CALL_FUNC_NAME = 'call_on_storage'
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)

local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/fast'

local call = {}

local function call_on_storage(run_as_user, func_name, ...)
local function call_on_storage_safe(run_as_user, func_name, ...)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local function call_on_storage_fast(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast

rebalance.on_safe_mode_toggle(function(is_enabled)
if is_enabled then
call_on_storage = call_on_storage_safe

local fibers_killed = 0
for fb_id, fb in pairs(fiber.info()) do
if fb.name == CRUD_CALL_FIBER_NAME then
fiber.kill(fb_id)
fibers_killed = fibers_killed + 1
end
end
log.debug('Killed %d fibers with fast-mode crud requests.', fibers_killed)
else
call_on_storage = call_on_storage_fast
end
end)

call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}

function call.get_vshard_call_name(mode, prefer_replica, balance)
Expand Down Expand Up @@ -82,7 +109,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts)
--- Executes a vshard call and retries once after performing recovery actions
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
local function call_with_retry_and_recovery(vshard_router,
replicaset, method, func_name, func_args, call_opts, is_single_call)
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)

-- In case cluster was just bootstrapped with auto master discovery,
Expand All @@ -93,7 +123,27 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f
return resp, err
end

if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
-- This is a partial copy of error handling from vshard.router.router_call_impl()
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
if err.class_name == bucket_ref_unref.BucketRefError.name then
if is_single_call and #err.bucket_ref_errs == 1 then
local single_err = err.bucket_ref_errs[1]
local destination = single_err.vshard_err.destination
if destination and vshard_router.replicasets[destination] then
replicaset = vshard_router.replicasets[destination]
end
end

for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
local bucket_id = bucket_ref_err.bucket_id
local vshard_err = bucket_ref_err.vshard_err
if vshard_err.name == 'WRONG_BUCKET' or
vshard_err.name == 'BUCKET_IS_LOCKED' or
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
vshard_router:_bucket_reset(bucket_id)
end
end
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
replicaset:locate_master()
end

Expand Down Expand Up @@ -147,8 +197,8 @@ function call.map(vshard_router, func_name, func_args, opts)
while iter:has_next() do
local args, replicaset, replicaset_id = iter:get()

local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, args, call_opts)
local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, false)

if err ~= nil then
local result_info = {
Expand All @@ -170,9 +220,9 @@ function call.map(vshard_router, func_name, func_args, opts)
futures_by_replicasets[replicaset_id] = future
end

local deadline = fiber_clock() + timeout
local deadline = fiber.clock() + timeout
for replicaset_id, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end
Expand Down Expand Up @@ -221,9 +271,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil

local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout,
request_timeout = request_timeout})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end
Expand All @@ -248,8 +297,8 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = retry_call_with_master_discovery(replicaset, 'call',
func_name, func_args, {timeout = timeout})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call',
func_name, func_args, {timeout = timeout}, false)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end
Expand Down
197 changes: 197 additions & 0 deletions crud/common/rebalance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
local fiber = require('fiber')
local log = require('log')
local trigger = require('internal.trigger')
local vshard_consts = require('vshard.consts')
local utils = require('crud.common.utils')
local has_metrics_module, metrics = pcall(require, 'metrics')

local SAFE_MODE_SPACE = '_crud_rebalance_safe_mode_local'

local rebalance = {
safe_mode = false,
-- Trigger is run with one argument: true if safe mode is enabled and false if disabled.
on_safe_mode_toggle = trigger.new('_crud.safe_mode_toggle'),
_router_cache_last_clear_ts = 0 -- On module load we don't know when (and if) route cache was cleared.
}

local function safe_mode_bucket_trigger(_, new, space, op)
if space ~= '_bucket' then
return
end
-- We are interested only in two operations that indicate the beginning of bucket migration:
-- * We are receiving a bucket (new bucket with status RECEIVING)
-- * We are sending a bucket to another node (existing bucket status changes to SENDING)
if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or
(op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then
local stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' }
if not stored_safe_mode or not stored_safe_mode.value then
box.space[SAFE_MODE_SPACE]:replace{ 'status', true }
end
end
end

local function _safe_mode_enable()
-- The trigger is needed to detect the beginning of rebalance process to enable safe mode.
-- If safe mode is enabled we don't need the trigger anymore.
for _, trig in pairs(box.space._bucket:on_replace()) do
if trig == safe_mode_bucket_trigger then
box.space._bucket:on_replace(nil, trig)
end
end
rebalance.safe_mode = true

-- This function is running inside on_commit trigger, need pcall to protect from errors in external code.
pcall(rebalance.on_safe_mode_toggle.run, rebalance.on_safe_mode_toggle, true)

log.info('Rebalance safe mode enabled')
end

local function _safe_mode_disable()
-- We have disabled safe mode so we need to add the trigger to detect the beginning
-- of rebalance process to enable safe mode again.
box.space._bucket:on_replace(safe_mode_bucket_trigger)
rebalance.safe_mode = false

-- This function is running inside on_commit trigger, need pcall to protect from errors in external code.
pcall(rebalance.on_safe_mode_toggle.run, rebalance.on_safe_mode_toggle, false)

log.info('Rebalance safe mode disabled')
end

local function create_space()
local safe_mode_space = box.schema.space.create(SAFE_MODE_SPACE, {
engine = 'memtx',
format = {
{ name = 'key', type = 'string' },
{ name = 'value', type = 'any' },
},
is_local = true,
if_not_exists = true,
})
safe_mode_space:create_index('primary', { parts = { 'key' }, if_not_exists = true })
safe_mode_space:insert{ 'status', false }
end

local function create_trigger()
box.space[SAFE_MODE_SPACE]:on_replace(function()
box.on_commit(function(rows_iter)
local safe_space_id = box.space[SAFE_MODE_SPACE].id
-- There may be multiple operations on safe mode status tuple in one transaction.
-- We will take only the last action.
-- 0 = do nothing, 1 = enable safe mode, -1 = disable safe mode
local safe_mode_action = 0
for _, old, new, sp in rows_iter() do
if sp ~= safe_space_id then
goto continue
end
assert((old == nil or old.key == 'status') and (new.key == 'status'))

if (not old or not old.value) and new.value then
safe_mode_action = 1
elseif old.value and not new.value then
safe_mode_action = -1
end

::continue::
end

if safe_mode_action == 1 then
_safe_mode_enable()
elseif safe_mode_action == -1 then
_safe_mode_disable()
end
end)
end)
end

function rebalance.init()
local stored_safe_mode
if not box.info.ro then
if box.space[SAFE_MODE_SPACE] == nil then
create_space()
create_trigger()
end
stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' }
else
while box.space[SAFE_MODE_SPACE] == nil or box.space[SAFE_MODE_SPACE].index[0] == nil do
fiber.sleep(0.05)
end
create_trigger()
stored_safe_mode = box.space[SAFE_MODE_SPACE]:get{ 'status' }
end

if stored_safe_mode and stored_safe_mode.value then
_safe_mode_enable()
else
_safe_mode_disable()
end
end

function rebalance.safe_mode_status()
return rebalance.safe_mode
end

function rebalance.safe_mode_enable()
box.space[SAFE_MODE_SPACE]:replace{ 'status', true }
end

function rebalance.safe_mode_disable()
box.space[SAFE_MODE_SPACE]:replace{ 'status', false }
end

--- Rebalance storage API
rebalance.storage_api = {
rebalance_safe_mode_status = rebalance.safe_mode_status,
rebalance_safe_mode_enable = rebalance.safe_mode_enable,
rebalance_safe_mode_disable = rebalance.safe_mode_disable,
}

--- Rebalance router API
rebalance.router_api = {}

function rebalance.router_api.cache_clear()
local router = utils.get_vshard_router_instance()
if router == nil then
log.warn("Router is not initialized yet")
return
end
rebalance._router_cache_last_clear_ts = fiber.time()
return router:_route_map_clear()
end

function rebalance.router_api.cache_length()
local router = utils.get_vshard_router_instance()
if router == nil then
log.warn("Router is not initialized yet")
return
end
return router.known_bucket_count
end

function rebalance.router_api.cache_last_clear_ts()
return rebalance._router_cache_last_clear_ts
end

--- Rebalance related metrics
if has_metrics_module then
local safe_mode_enabled_gauge = metrics.gauge(
'tnt_crud_storage_safe_mode_enabled',
"is safe mode enabled on this storage instance"
)
local router_cache_length_gauge = metrics.gauge(
'tnt_crud_router_cache_length',
"number of bucket routes in vshard router cache"
)
local router_cache_last_clear_ts_gauge = metrics.gauge(
'tnt_crud_router_cache_last_clear_ts',
"when vshard router cache was cleared last time"
)

metrics.register_callback(function()
safe_mode_enabled_gauge:set(rebalance.safe_mode_status() and 1 or 0)
router_cache_length_gauge:set(rebalance.router_api.cache_length())
router_cache_last_clear_ts_gauge:set(rebalance.router_api.cache_last_clear_ts())
end)
end

return rebalance
Loading
Loading