处理ClickHouse中的更新和删除操作

kyaa111 6月前 ⋅ 261 阅读

1.png

作为全球最快的实时分析数据库,许多ClickHouse工作负载涉及大量数据,这些数据仅写入一次,不经常修改(例如,由IOT设备生成的遥测事件或由电子商务网站生成的客户点击)。尽管这些数据通常是不可变的,但在分析期间提供上下文的关键数据集(例如,基于设备或客户ID的查找表)可能需要进行修改。

在ClickHouse中,有多种历史方法可用于更新和删除数据,具体取决于您的目标和性能要求。本文的其余部分将描述每种方法及其权衡,以及最近针对轻量级删除的一些发展,以解决一些常见挑战。我们建议最佳实践,并在考虑方法时强调一些重要的考虑因素。

在继续之前,请确定更新是否是解决您的问题的最佳方式。例如,对于不经常更改的数据,对数据进行版本控制可能是更好的选择。ClickHouse是存储效率和查询性能方面的顶尖分析数据库,因此在许多情况下,仅保存数据的多个版本而不是更新数据可能效果更好。

总结

在分析环境中更新和删除数据可能具有挑战性,并可能严重影响数据库性能。为了解决这个问题,ClickHouse为不同情况提供了多种高效的更新和删除数据的强大方法:

  • 通过DELETE FROM语法进行轻量级删除是从ClickHouse中删除数据的最高效方式,前提是不需要立即节省磁盘空间,而且用户可以容忍删除的数据“存在”在磁盘上。
  • 在需要立即节省磁盘空间的情况下,可以使用ALTER…DELETE进行基于突变的删除。例如,合规性要求需要保证数据从磁盘上删除。
  • 在出现不规则和不频繁更改的情况下,可以使用ALTER…UPDATE进行基于突变的更新。
  • 使用TTL(Time To Live)根据日期/时间定期删除(过时的)数据。
  • 使用CollapsingMergeTree来频繁更新或删除单独的行。
  • 使用ReplacingMergeTree来实现基于版本控制的upsert(插入/更新)。
  • 在定期删除大块数据时,通过删除分区来实现。
  • 创建新列(并删除旧列)可能也是更新整个表格的更高效方式。

轻量级删除

轻量级删除是从ClickHouse中删除数据的首选和最有效方式。通过DELETE FROM table语法公开,用户可以指定条件来删除特定行,如下所示:

DELETE FROM table WHERE col1 = 'Hi' AND col2 = 2

除非设置mutations_sync设置为1(请参见下文),否则此操作默认是异步的。执行删除操作时,ClickHouse为每行保存一个掩码,指示它是否已在_row_exists列中删除。随后的查询将排除这些已删除的行,如下所示

2.png

在内部,ClickHouse将数据排序为各个部分,每个部分包含列数据文件和索引。定期的合并周期负责合并和重写这些部分,以确保文件数量不会随着插入更多数据而继续增加,从而保持查询速度。这些合并过程考虑了轻量级删除,将新形成的部分中标记为删除的行排除在外。

3.png

轻量级删除功能在版本22.8中首次发布,截至撰写本文时仍处于实验阶段,但计划在接下来的某个版本中正式投入生产。在此之前,使用轻量级删除功能需要设置allow_experimental_lightweight_delete=true

用户应该明白,依赖正常的后台合并周期时,行将仅在某个时刻从磁盘中最终删除。尽管在搜索结果中被排除,但这些行将一直存在于磁盘上,直到它们所在的部分被合并。这一过程所需的时间是不确定的。这会产生一些影响:

  • 空间节省不会像通过突变操作发出删除指令那样立即发生 - 请参见下文。如果空间节省至关重要,例如,磁盘空间不足,请考虑使用突变操作。
  • 对于有合规要求的用户来说,由于无法保证删除,可能希望使用突变操作以确保数据被删除。

轻量级删除操作的成本取决于WHERE子句中匹配行的数量以及当前数据部分的数量。此操作在匹配少量行时效率最高。用户还应该知道,轻量级删除在Wide部分上性能最佳,其中列数据文件分别存储,而在Compact部分上性能较差,其中一个文件用于存储所有列数据。前者允许将掩码_row_exists存储为单独的文件,从而允许独立写入,而不受其他列的影响。一般情况下,紧凑部分会在插入后形成。一旦部分超过一定大小(例如,由于合并),将使用宽格式。对于大多数工作负载,这不应该成为问题。

最后,请注意,轻量级删除使用与下文描述的相同的变异队列和后台线程。我们建议查阅此处的文档,以获取有关内部实现的更多详细信息。

Mutations / 突变

使用突变操作更新数据

在ClickHouse表中更新数据的最简单方法是使用ALTER...UPDATE语句。

ALTER TABLE table
    UPDATE col1 = 'Hi' WHERE col2 = 2

此查询将使用给定的过滤条件在表table上更新col1。

与某些数据库不同,ClickHouse的ALTER UPDATE语句默认是异步的。这意味着更新在后台进行,您不会立即看到表格的效果。这个更新表的过程称为突变操作。

4.png

这里需要注意的一点是,更新数据是一个繁重的查询,因为ClickHouse必须做大量的工作来优化存储和处理。突变操作会强制重新写入包含这些行的所有数据部分,新部分形成时排除目标行。这可能会引起相当大的I/O和集群开销,因此请谨慎使用,或考虑下面讨论的替代方案。

使用突变操作删除数据

与更新操作一样,使用突变操作也可以进行删除操作,提供了对轻量级删除的替代方案。在大多数情况下,轻量级删除更适合数据删除,因为重新写入所有列的突变成本较高。具体来说,不同于轻量级删除,突变操作会重新写入所有列,而不仅仅是_row_exists掩码列。

然而,考虑到轻量级删除具有“最终从磁盘中删除数据”的属性,用户可能更喜欢这种基于突变的方法来实现有保障的磁盘空间节省。此外,当用户需要确保数据从磁盘上删除,例如出于合规原因时,这种方法也是合适的。

ALTER TABLE table
    DELETE WHERE col2 = 3

在这个查询中,所有col2值为3的行都被删除。与其他突变操作类似,删除操作默认是异步的。可以使用上面描述的mutations_sync设置来使其同步进行。

检查突变操作进度

由于突变操作是异步运行的,可以通过system.mutations表来进行监视。这允许用户检查特定表上突变操作的进度。

SELECT
    command,
    is_done
FROM system.mutations
WHERE table = 'tablename'

┌─command───────────────────────────────┬─is_done─┐
│ UPDATE col1 = 'Hi' WHERE col2 = 2     │       1 │
│ UPDATE col1 = 'All hi' WHERE col2 > 0 │       0 │
└───────────────────────────────────────┴─────────┘

如果特定突变操作的is_done值为0,那么它仍在执行中。突变操作会针对每个表部分执行,其中已变异的部分会立即变得可用:

5.png

同步更新

对于需要同步更新的用户,可以将mutations_sync参数设置为1(或2,如果还需要等待所有副本也完成更新):

SET mutations_sync = 1

现在我们的更新查询将等待突变操作完成:

ALTER TABLE table
    UPDATE col1 = 'bye' WHERE col2 > 0

0 rows in set. Elapsed: 1.182 sec. 

请注意,这个查询完成需要1秒,因为ClickHouse等待后台突变操作完成。请注意,此参数也适用于轻量级删除。

更新整个表格

在某些情况下,用户可能需要更新整个列的值。最初,用户可能会尝试使用不带WHERE子句的ALTER TABLE查询来实现这一目标,然而这会失败,如下所示:

ALTER TABLE table UPDATE col1 = 'bye';

Syntax error: failed at position 38 (end of query):
ALTER TABLE table UPDATE col1 = 'bye';

ClickHouse不允许您更新整个表,因为更新操作开销较大。强制ClickHouse接受此操作的一种方法是使用一个始终为真的过滤条件:

ALTER TABLE table
    UPDATE col1 = 'bye' WHERE true

然而,一个更优化的方法是创建一个新列,将新值设置为默认值,然后切换旧列和新列。例如:

ALTER TABLE table ADD COLUMN col1_new String DEFAULT 'global hi';

ALTER TABLE table
    RENAME COLUMN col1 TO col1_old,
    RENAME COLUMN col1_new TO col1,
    DROP COLUMN col1_old;

我们使用col1_new列的默认值来指定我们要用作更新值的内容。这是安全的,而且效率更高,因为我们避免了繁重的突变操作。

使用JOIN进行更新和删除

有时,我们需要根据关联关系来删除或更新行,因此我们必须进行表格连接。在ClickHouse中,最好使用Join表格引擎和joinGet函数来实现这一点。假设我们有两个表格 - 一个包含所有页面浏览记录,另一个包含所有登录记录:

CREATE TABLE pageviews
(
    `user_id` UInt64,
    `time` DateTime,
    `session_id` UInt64
)
ENGINE = MergeTree
ORDER BY time;

CREATE TABLE logins
(
    `user_id` UInt64,
    `time` DateTime
)
ENGINE = MergeTree
ORDER BY time;

这两个表格的区别在于logins表格仅存储每个会话的单个事件。假设在某个时间点,我们决定向logins表格添加session_id列:

ALTER TABLE logins
    ADD COLUMN `session_id` UInt64

现在,我们需要使用user_id和time在pageviews表格上执行JOIN,将对应的值更新到logins.session_id列中:

SELECT *
FROM logins AS l
JOIN pageviews AS p ON (p.user_id = l.user_id) AND (p.time = l.time)

┌─user_id─┬────────────────time─┬─p.user_id─┬──────────────p.time─┬─session_id─┐
│       2 │ 2023-01-09 12:23:16 │         2 │ 2023-01-09 12:23:16 │ 2752888102 │
│       1 │ 2023-01-09 13:23:16 │         1 │ 2023-01-09 13:23:16 │ 4135462640 │
└─────────┴─────────────────────┴───────────┴─────────────────────┴────────────┘

首先,我们需要创建并填充一个特殊的Join表格:

CREATE TABLE pageviews_join
ENGINE = Join(ANY, LEFT, user_id, time) AS
SELECT *
FROM pageviews

这个表格将允许我们在执行更新查询时使用joinGet函数来根据JOIN获取值:

ALTER TABLE logins
    UPDATE session_id = joinGet('pageviews_join', 'session_id', user_id, time) WHERE session_id = 0

我们可以看到,logins表格已根据JOIN进行了相应的更新:

SELECT * FROM logins

┌─user_id─┬────────────────time─┬─session_id─┐
│       2 │ 2023-01-09 12:23:16 │ 2752888102 │
│       1 │ 2023-01-09 13:23:16 │ 4135462640 │
└─────────┴─────────────────────┴────────────┘

因为我们通过添加session_id列修改了logins表格,所以在更改完成后,我们可以删除pageviews_join表格(在删除之前,请检查system.mutations表格以确保):

DROP TABLE pageviews_join

相同的方法可以用于使用轻量级删除或基于突变的删除数据。

高效删除大块数据

如果我们需要删除大块数据,用户可以对表进行分区,以便根据需要删除分区。这是一项轻量级操作。假设我们有以下表格:

CREATE TABLE hits
(
    `project` String,
    `url` String,
    `time` DateTime,
    `hits` UInt32
)
ENGINE = MergeTree
PARTITION BY project
ORDER BY (project, path, time)

通过按project列对表格进行分区,可以通过删除整个分区来删除具有特定project值的行。让我们删除所有project = c的数据:

ALTER TABLE hits
    DROP PARTITION 'c'

这里,c是我们要删除的project列的值:

6.png

可用分区的列表可以在system.parts表中找到:

SELECT partition
FROM system.parts
WHERE table = 'hits'

┌─partition─┐
│ c         │
│ a         │
│ b         │
└───────────┘

我们还可以使用DETACH和ATTACH语句在表格之间移动分区(例如,如果我们希望将数据移动到垃圾表格而不是删除它)。在设置DDL中的分区时,要注意分区的常见陷阱,即按具有高基数的列或表达式进行分区。这可能会导致创建许多部分,从而导致性能问题。

定期删除旧数据

在时间序列数据的情况下,我们可能需要定期删除过时的数据。ClickHouse为这个特定用例提供了TTL功能。这需要配置一个表格,并指定我们希望何时以及删除哪些数据。假设我们想从我们的hits表中删除一个月前的数据:

ALTER TABLE hits
    MODIFY TTL time + INTERVAL 1 MONTH

在这里,我们要求ClickHouse删除所有时间列值比一个月前的当前时间更早的行。TTL也可以设置在列上,以在一段时间后将它们的值重置为默认值。通过按日期进行分区,舍入到适当的时间单位(例如,天),可以使此操作更加高效。当执行TTL规则时,ClickHouse将以最高效的方式自动删除数据。再次强调,不应该通过高基数的时间列(例如,毫秒级粒度)对表格进行分区,以避免高分区数量。通常,按天或月进行分区对于大多数TTL操作已经足够。

使用CollapsingMergeTree进行删除和更新操作

如果我们经常需要频繁更新单独的行,我们可以使用CollapsingMergeTree引擎来高效管理数据更新。

假设我们有一张表格用于跟踪每篇文章的阅读深度的文章统计数据。我们希望有一行数据,显示每个用户对每篇文章的阅读深度。这里的挑战是,我们必须在用户阅读文章时更新实际的阅读进度。让我们为我们的数据创建一个表格:

CREATE TABLE article_reads
(
    `user_id` UInt32,
    `article_id` UInt32,
    `read_to` UInt8,
    `read_start` DateTime,
    `read_end` DateTime,
    `sign` Int8
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (read_start, article_id, user_id)

对于CollapsingMergeTree,sign列被用作一种方法来告诉ClickHouse我们要更新特定的行。如果我们插入-1到sign列,整行数据将被删除。如果我们插入一行数据,其中sign = 1,ClickHouse将保留这一行。要更新的行根据在创建表时使用的ORDER BY () DDL语句中的排序键来标识:

7.png

为满足排序键上的去重条件,我们必须在read_start、article_id和user_id列中插入相同的值以更新一行。例如,当用户开始阅读一篇文章时,我们插入以下行:

INSERT INTO article_reads
            VALUES(1, 12, 0, now(), now(), 1);

现在我们在表格中有一行数据:

SELECT *
FROM article_reads

┌─user_id─┬─article_id─┬─read_to─┬──────────read_start─┬────────────read_end─┬─sign─┐
│       1 │         12 │       0 │ 2023-01-06 15:20:32 │ 2023-01-06 15:20:32 │    1 │
└─────────┴────────────┴─────────┴─────────────────────┴─────────────────────┴──────┘

一分钟后,当用户阅读文章达到70%时,我们插入以下两行:

INSERT INTO article_reads
            VALUES(1, 12, 0, '2023-01-06 15:20:32', now(), -1),
                  (1, 12, 70, '2023-01-06 15:20:32', now(), 1);

第一行的sign=-1表示要告诉ClickHouse删除现有行(根据ORDER BY元组中的值 - read_start、article_id和user_id列的值)。第二行插入的行(sign=1)是新的行,其中read_to列设置为新值70。

由于数据更新是在后台进行的,结果最终一致性,我们应该在sign列上进行筛选以获取正确的结果:

SELECT
    article_id,
    user_id,
    max(read_end),
    max(read_to)
FROM article_reads
WHERE sign = 1
GROUP BY
    user_id,
    article_id

┌─article_id─┬─user_id─┬───────max(read_end)─┬─max(read_to)─┐
│         12 │       1 │ 2023-01-06 15:21:59 │           70 │
└────────────┴─────────┴─────────────────────┴──────────────┘

1 row in set. Elapsed: 0.004 sec.

CollapsingMergreTree引擎现在会在后台高效地处理删除被取消的行,因此我们不需要手动删除它们。您可以在此处找到使用CollapsingMergeTree引擎的更多示例。

使用版本控制和ReplacingMergeTree进行更新操作

对于更复杂的情况,我们可能希望使用基于ReplacingMergeTree引擎的版本控制。该引擎通过使用特殊的版本列来实现其他DBMS中所称的UPSERT的高效方法,以跟踪应该删除哪些行。如果存在具有相同排序键的多行,存储中仅保留具有最大版本的行,而其他行将被删除:

8.png

对于我们之前的文章阅读示例,我们可以使用以下结构:

CREATE TABLE article_reads
(
    `user_id` UInt32,
    `article_id` UInt32,
    `read_to` UInt8,
    `read_time` DateTime,
    `version` Int32
)
ENGINE = ReplacingMergeTree(version)
ORDER BY (article_id, user_id)

请注意特殊的版本数字列,这将被ReplacingMergeTree引擎用来标记需要删除的行。让我们模拟用户随着时间从0阅读文章到80%:

INSERT INTO article_reads
           VALUES(1, 12, 0, '2023-01-06 15:20:32', 1),
                 (1, 12, 30, '2023-01-06 15:21:42', 2),
                 (1, 12, 45, '2023-01-06 15:22:13', 3),
                 (1, 12, 80, '2023-01-06 15:23:10', 4);

在这里,随着阅读进度的跟踪,我们逐渐增加版本列的值。行删除的过程也是通过正常的合并周期在后台执行的,因此我们需要在查询时基于最新的版本进行筛选:

SELECT *
FROM article_reads
WHERE (user_id = 1) AND (article_id = 12)
ORDER BY version DESC
LIMIT 1

┌─user_id─┬─article_id─┬─read_to─┬───────────read_time─┬─version─┐
│       1 │         12 │      80 │ 2023-01-06 15:23:10 │       5 │
└─────────┴────────────┴─────────┴─────────────────────┴─────────┘

或者我们可以使用LIMIT 1 BY来获取具有最新版本的行的列表:

SELECT
    user_id,
    article_id,
    read_to
FROM article_reads
ORDER BY version DESC
LIMIT 1 BY
    user_id,
    article_id

┌─user_id─┬─article_id─┬─read_to─┐
│       1 │         12 │      80 │
└─────────┴────────────┴─────────┘

而且,我们不需要担心旧版本的删除 - 这由ClickHouse在后台自动完成。

ref:

https://clickhouse.com/blog/handling-updates-and-deletes-in-clickhouse