Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class DistributedMergePredicate : public IMergePredicate
if (left.info.isPatch() != right.info.isPatch())
return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) is patch part and another is regular part", left.name, right.name));

if (left.is_in_volume_where_merges_avoid || right.is_in_volume_where_merges_avoid)
return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) lies on volume where merges should be avoided", left.name, right.name));

int64_t left_max_block = left.info.max_block;
int64_t right_min_block = right.info.min_block;
chassert(left_max_block < right_min_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::expected<void, PreformattedMessage> MergeTreeMergePredicate::canMergeParts(
if (left.info.isPatch() != right.info.isPatch())
return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) is patch part and another is regular part", left.name, right.name));

if (left.is_in_volume_where_merges_avoid || right.is_in_volume_where_merges_avoid)
return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) lies on volume where merges should be avoided", left.name, right.name));

if (left.projection_names != right.projection_names)
{
return std::unexpected(PreformattedMessage::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ time_t TTLRowDeleteMergeSelector::getTTLForPart(const PartProperties & part) con

bool TTLRowDeleteMergeSelector::canConsiderPart(const PartProperties & part) const
{
if (part.is_in_volume_where_merges_avoid)
return false;

if (!part.general_ttl_info.has_value())
return false;

Expand All @@ -231,6 +234,9 @@ time_t TTLRecompressMergeSelector::getTTLForPart(const PartProperties & part) co

bool TTLRecompressMergeSelector::canConsiderPart(const PartProperties & part) const
{
if (part.is_in_volume_where_merges_avoid)
return false;

if (!part.recompression_ttl_info.has_value())
return false;

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/Compaction/PartProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ std::set<std::string> getCalculatedProjectionNames(const MergeTreeDataPartPtr &
PartProperties buildPartProperties(
const MergeTreeDataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const StoragePolicyPtr & storage_policy,
time_t current_time)
{
return PartProperties{
.name = part->name,
.info = part->info,
.projection_names = getCalculatedProjectionNames(part),
.all_ttl_calculated_if_any = part->checkAllTTLCalculated(metadata_snapshot),
.is_in_volume_where_merges_avoid = !part->shallParticipateInMerges(storage_policy),
.size = part->getExistingBytesOnDisk(),
.age = current_time - part->modification_time,
.general_ttl_info = buildGeneralTTLInfo(metadata_snapshot, part),
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/Compaction/PartProperties.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <Storages/MergeTree/MergeTreePartInfo.h>

#include <Disks/IStoragePolicy.h>

#include <Core/UUID.h>

#include <optional>
Expand All @@ -24,6 +27,7 @@ struct PartProperties
const std::set<std::string> projection_names = {};

const bool all_ttl_calculated_if_any = false;
const bool is_in_volume_where_merges_avoid = false;

/// Size of data part in bytes.
const size_t size = 0;
Expand Down Expand Up @@ -56,6 +60,7 @@ using PartsRangeView = std::span<const PartProperties>;
PartProperties buildPartProperties(
const MergeTreeDataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const StoragePolicyPtr & storage_policy,
time_t current_time);

}
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/Compaction/PartsCollectors/Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ namespace DB
{

PartsRanges constructPartsRanges(
std::vector<MergeTreeDataPartsVector> && ranges, const StorageMetadataPtr & metadata_snapshot, const time_t & current_time)
std::vector<MergeTreeDataPartsVector> && ranges,
const StorageMetadataPtr & metadata_snapshot,
const StoragePolicyPtr & storage_policy,
const time_t & current_time)
{
PartsRanges properties_ranges;
properties_ranges.reserve(ranges.size());
Expand All @@ -17,7 +20,7 @@ PartsRanges constructPartsRanges(
properties_ranges.reserve(range.size());

for (const auto & part : range)
properties_range.push_back(buildPartProperties(part, metadata_snapshot, current_time));
properties_range.push_back(buildPartProperties(part, metadata_snapshot, storage_policy, current_time));

properties_ranges.push_back(std::move(properties_range));
}
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/Compaction/PartsCollectors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ std::expected<void, PreformattedMessage> checkAllPartsSatisfyPredicate(const std
}

PartsRanges constructPartsRanges(
std::vector<MergeTreeDataPartsVector> && ranges, const StorageMetadataPtr & metadata_snapshot, const time_t & current_time);
std::vector<MergeTreeDataPartsVector> && ranges,
const StorageMetadataPtr & metadata_snapshot,
const StoragePolicyPtr & storage_policy,
const time_t & current_time);

MergeTreeDataPartsVector filterByPartitions(
MergeTreeDataPartsVector && parts, const std::optional<PartitionIdsHint> & partitions_to_keep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ MergeTreeDataPartsVector collectInitial(const MergeTreeData & data, const MergeT

auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, const MergeTreeTransactionPtr & tx, const MergeTreeMergePredicatePtr & merge_pred)
{
bool has_volumes_with_disabled_merges = storage_policy->hasAnyVolumeWithDisabledMerges();

auto predicate = [storage_policy, tx, merge_pred, has_volumes_with_disabled_merges](const MergeTreeDataPartPtr & part) -> std::expected<void, PreformattedMessage>
auto predicate = [storage_policy, tx, merge_pred](const MergeTreeDataPartPtr & part) -> std::expected<void, PreformattedMessage>
{
if (tx)
{
Expand All @@ -93,9 +91,6 @@ auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, co
return std::unexpected(PreformattedMessage::create("Part {} is locked for removal", part->name));
}

if (has_volumes_with_disabled_merges && !part->shallParticipateInMerges(storage_policy))
return std::unexpected(PreformattedMessage::create("Merges for part's {} volume are disabled", part->name));

chassert(merge_pred);
return merge_pred->canUsePartInMerges(part);
};
Expand Down Expand Up @@ -135,7 +130,7 @@ PartsRanges MergeTreePartsCollector::grabAllPossibleRanges(
{
auto parts = filterByPartitions(collectInitial(storage, tx), partitions_hint);
auto ranges = splitPartsByPreconditions(std::move(parts), storage_policy, tx, merge_pred, series_log);
return constructPartsRanges(std::move(ranges), metadata_snapshot, current_time);
return constructPartsRanges(std::move(ranges), metadata_snapshot, storage_policy, current_time);
}

std::expected<PartsRange, PreformattedMessage> MergeTreePartsCollector::grabAllPartsInsidePartition(
Expand All @@ -148,7 +143,7 @@ std::expected<PartsRange, PreformattedMessage> MergeTreePartsCollector::grabAllP
if (auto result = checkAllParts(parts, storage_policy, tx, merge_pred); !result)
return std::unexpected(std::move(result.error()));

auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, current_time);
auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, storage_policy, current_time);
chassert(ranges.size() == 1);

return std::move(ranges.front());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ MergeTreeDataPartsVector collectInitial(const MergeTreeData & data)

auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, const ReplicatedMergeTreeMergePredicatePtr & merge_pred)
{
bool has_volumes_with_disabled_merges = storage_policy->hasAnyVolumeWithDisabledMerges();

auto predicate = [storage_policy, merge_pred, has_volumes_with_disabled_merges](const MergeTreeDataPartPtr & part) -> std::expected<void, PreformattedMessage>
auto predicate = [storage_policy, merge_pred](const MergeTreeDataPartPtr & part) -> std::expected<void, PreformattedMessage>
{
if (has_volumes_with_disabled_merges && !part->shallParticipateInMerges(storage_policy))
return std::unexpected(PreformattedMessage::create("Merges for part's {} volume are disabled", part->name));

chassert(merge_pred);
return merge_pred->canUsePartInMerges(part);
};

Expand Down Expand Up @@ -62,7 +56,7 @@ PartsRanges ReplicatedMergeTreePartsCollector::grabAllPossibleRanges(
{
auto parts = filterByPartitions(collectInitial(storage), partitions_hint);
auto ranges = splitPartsByPreconditions(std::move(parts), storage_policy, merge_pred, series_log);
return constructPartsRanges(std::move(ranges), metadata_snapshot, current_time);
return constructPartsRanges(std::move(ranges), metadata_snapshot, storage_policy, current_time);
}

std::expected<PartsRange, PreformattedMessage> ReplicatedMergeTreePartsCollector::grabAllPartsInsidePartition(
Expand All @@ -75,7 +69,7 @@ std::expected<PartsRange, PreformattedMessage> ReplicatedMergeTreePartsCollector
if (auto result = checkAllParts(parts, storage_policy, merge_pred); !result)
return std::unexpected(std::move(result.error()));

auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, current_time);
auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, storage_policy, current_time);
chassert(ranges.size() == 1);

return std::move(ranges.front());
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,11 @@ MergeSelectorChoices chooseMergesFrom(

for (size_t i = 0; i < choices.size(); ++i)
{
const auto & merge_type = choices[i].merge_type;
const auto & range = choices[i].range;
const auto & range_patches = choices[i].range_patches;
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, range.size());
LOG_TRACE(log, "Merge #{} with {} parts from {} to {} with {} patches", i, range.size(), range.front().name, range.back().name, range_patches.size());
LOG_TRACE(log, "Merge #{} type {} with {} parts from {} to {} with {} patches", i, merge_type, range.size(), range.front().name, range.back().name, range_patches.size());
}
}

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<clickhouse>
<storage_configuration>
<disks>
<with_merges>
<path>/with_merges/</path>
</with_merges>
<no_merges>
<path>/no_merges/</path>
</no_merges>
</disks>
<policies>
<hot_cold_separation_policy>
<volumes>
<with_merges>
<disk>with_merges</disk>
</with_merges>
<no_merges>
<disk>no_merges</disk>
<prefer_not_to_merge>true</prefer_not_to_merge>
</no_merges>
</volumes>
</hot_cold_separation_policy>
</policies>
</storage_configuration>
</clickhouse>
80 changes: 80 additions & 0 deletions tests/integration/test_no_merges_volume_ttl/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest
import time

from helpers.cluster import ClickHouseCluster

cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/storage_config.xml"],
tmpfs=["/with_merges:size=200M", "/no_merges:size=200M"],
)

@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster

finally:
cluster.shutdown()


def wait_parts_count(table, expected_number_of_parts):
for i in range(100):
print(f"Waiting {expected_number_of_parts} for table {table}. Iteration: {i}")
parts_count = int(node.query(f"select count() from system.parts where table = '{table}' and active"))

if parts_count == expected_number_of_parts:
break

time.sleep(1)

assert int(node.query(f"select count() from system.parts where table = '{table}' and active")) == expected_number_of_parts


def test_no_merges_volume_ttl_merge(start_cluster):
node.query("create table t (time DateTime) engine = MergeTree order by tuple() ttl time settings storage_policy='hot_cold_separation_policy', merge_with_ttl_timeout=0")
table_uuid = node.query("select uuid from system.tables where table = 't'").strip()

node.query("system stop merges t")
node.query("insert into t values (now() - interval 1 day)")
assert node.query("select path from system.parts where table = 't' and active").strip() == f"/with_merges/store/{table_uuid[:3]}/{table_uuid}/all_1_1_0/"
assert int(node.query("select count() from t").strip()) == 1

node.query("alter table t move partition () to volume 'no_merges'")
assert node.query("select path from system.parts where table = 't' and active").strip() == f"/no_merges/store/{table_uuid[:3]}/{table_uuid}/all_1_1_0/"
assert int(node.query("select count() from t").strip()) == 1

node.query("system start merges t")
wait_parts_count("t", 0)

assert int(node.query("select count() from t").strip()) == 0
node.query("drop table t sync")


def test_no_merges_volume_no_regular_merges(start_cluster):
node.query("create table t (a UInt64) engine = MergeTree order by tuple() settings storage_policy='hot_cold_separation_policy'")

node.query("system stop merges t")
node.query("insert into t select number from numbers(50) settings max_block_size=1, min_insert_block_size_bytes=1")
assert int(node.query("select count() from t").strip()) == 50
assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50

node.query("alter table t move partition () to volume 'no_merges'")
assert int(node.query("select count() from t").strip()) == 50
assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50

node.query("system start merges t")
node.query("optimize table t")
node.query("optimize table t")
node.query("optimize table t")
node.query("optimize table t")
node.query("optimize table t final")
node.query("optimize table t final")
node.query("optimize table t final")
node.query("optimize table t final")

assert int(node.query("select count() from t").strip()) == 50
assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50
node.query("drop table t sync")
Loading