Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rust_snuba/src/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ pub fn get_load_balancing_config(storage_name: &str) -> LoadBalancingConfig {
}
}

/// ClickHouse's compiled-in default for `max_insert_block_size`. We refuse to
/// apply any override below this to avoid silently shrinking blocks below what
/// the server would already produce on its own.
pub const CLICKHOUSE_DEFAULT_MAX_INSERT_BLOCK_SIZE: u64 = 1_048_449;
Comment thread
onewland marked this conversation as resolved.

/// Returns Some(n) if `clickhouse_max_insert_block_size:<storage_name>` is set
/// to an integer >= ClickHouse's default (1_048_449); otherwise None. Values
/// below the default are rejected, since they wouldn't increase the block size
/// past what ClickHouse already does by default. Callers should append
/// `&max_insert_block_size=<n>` to the INSERT URL when Some.
pub fn get_max_insert_block_size(storage_name: &str) -> Option<u64> {
get_str_config(&format!("clickhouse_max_insert_block_size:{storage_name}"))
.ok()
.flatten()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&n| n >= CLICKHOUSE_DEFAULT_MAX_INSERT_BLOCK_SIZE)
Comment thread
onewland marked this conversation as resolved.
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
50 changes: 49 additions & 1 deletion rust_snuba/src/strategies/clickhouse/writer_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sentry_arroyo::types::Message;
use sentry_arroyo::{counter, timer};

use crate::config::ClickhouseConfig;
use crate::runtime_config::get_load_balancing_config;
use crate::runtime_config::{get_load_balancing_config, get_max_insert_block_size};
use crate::types::{BytesInsertBatch, RowData};

fn clickhouse_task_runner(
Expand Down Expand Up @@ -196,6 +196,9 @@ impl ClickhouseClient {
if let Some(offset) = lb_config.first_offset {
url.push_str(&format!("&load_balancing_first_offset={offset}"));
}
if let Some(block_size) = get_max_insert_block_size(&self.storage_name) {
url.push_str(&format!("&max_insert_block_size={block_size}"));
}
url
}

Expand Down Expand Up @@ -356,6 +359,51 @@ mod tests {
assert!(url.contains("load_balancing_first_offset=1"));
}

#[test]
fn test_url_with_max_insert_block_size() {
crate::testutils::initialize_python();
let config = make_test_config();
let client = ClickhouseClient::new(
&config,
"test_table",
"writer_v2_block_size_test".to_string(),
);

// Default (key absent): no suffix.
let url = client.build_url();
assert!(!url.contains("max_insert_block_size"));

// Per-storage override at or above the ClickHouse default sets the suffix.
crate::runtime_config::patch_str_config_for_test(
"clickhouse_max_insert_block_size:writer_v2_block_size_test",
Some("2000000"),
);
let url = client.build_url();
assert!(url.contains("&max_insert_block_size=2000000"));

// A different storage isn't affected.
let other_client =
ClickhouseClient::new(&config, "test_table", "writer_v2_other_storage".to_string());
let url = other_client.build_url();
assert!(!url.contains("max_insert_block_size"));

// Values below the ClickHouse default (1_048_449) are rejected.
crate::runtime_config::patch_str_config_for_test(
"clickhouse_max_insert_block_size:writer_v2_block_size_test",
Some("1000000"),
);
let url = client.build_url();
assert!(!url.contains("max_insert_block_size"));

// Exactly the default is accepted.
crate::runtime_config::patch_str_config_for_test(
"clickhouse_max_insert_block_size:writer_v2_block_size_test",
Some("1048449"),
);
let url = client.build_url();
assert!(url.contains("&max_insert_block_size=1048449"));
}

#[tokio::test]
async fn test_retry_with_exponential_backoff() {
crate::testutils::initialize_python();
Expand Down
Loading