Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1690,9 +1690,11 @@ Cluster::Addresses StorageDistributed::parseAddresses(const std::string & name)
continue;
}

if (address.replica_index > replicas)
if (address.replica_index == 0 || address.replica_index > replicas)
{
LOG_ERROR(log, "No shard with replica_index={} ({})", address.replica_index, name);
LOG_ERROR(log, "Invalid replica_index={} for directory '{}' (cluster has {} replicas for shard {}). "
"Expected directory format: 'shardN_replicaM' or 'shardN_all_replicas'",
address.replica_index, dirname, replicas, address.shard_index);
continue;
}

Expand Down
69 changes: 69 additions & 0 deletions tests/integration/test_distributed_format/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,72 @@ def test_remove_replica(started_cluster):
"/etc/clickhouse-server/config.d/another_remote_servers.xml",
]
)

def test_invalid_shard_directory_format(started_cluster):
"""
Test that ClickHouse doesn't crash when it encounters
a malformed directory name like 'shard1_all_replicas_bkp'
during distributed table initialization.
"""
node.query("drop table if exists test.dist_invalid sync")
node.query("drop table if exists test.local_invalid sync")
node.query(
"create table test.local_invalid (x UInt64, s String) engine = MergeTree order by x"
)
node.query(
"create table test.dist_invalid (x UInt64, s String) "
"engine = Distributed('test_cluster_internal_replication', test, local_invalid)"
)

node.query(
"insert into test.dist_invalid values (1, 'a'), (2, 'bb')",
settings={"use_compact_format_in_distributed_parts_names": "1"},
)

data_path = node.query(
"SELECT arrayElement(data_paths, 1) FROM system.tables "
"WHERE database='test' AND name='dist_invalid'"
).strip()

# Create a malformed directory that would cause the bug
malformed_dir = f"{data_path}/shard1_all_replicas_bkp"
node.exec_in_container(["mkdir", "-p", malformed_dir])

# Create a dummy file so the directory isn't considered empty
node.exec_in_container(["touch", f"{malformed_dir}/dummy.txt"])

invalid_formats = [
"shard1_all_replicas_backup",
"shard1_all_replicas_old",
"shard2_all_replicas_tmp",
]
for invalid_dir in invalid_formats:
invalid_path = f"{data_path}/{invalid_dir}"
node.exec_in_container(["mkdir", "-p", invalid_path])
# just dummy file to have something in the directory
node.exec_in_container(["touch", f"{invalid_path}/dummy.txt"])

# Reproduce server restart with detach and attach
node.query("detach table test.dist_invalid")
node.query("attach table test.dist_invalid")

node.query("SYSTEM FLUSH LOGS system.text_log")

error_logs = node.query(
"""
SELECT count()
FROM system.text_log
WHERE level = 'Error'
AND message LIKE '%Invalid replica_index%'
AND message LIKE '%shard1_all_replicas%'
"""
).strip()

# We should have at least one error log for each malformed directory
# But we don't strictly require this in case logging is disabled
# The important thing is that the server didn't crash
print(f"Found {error_logs} error log entries for invalid directories")

# Clean up
node.query("drop table test.dist_invalid sync")
node.query("drop table test.local_invalid sync")
Loading