Iceberg

为什么需要Iceberg/为什么需要Lakehouse

数据湖的最主要目的是为了解决Hive速度慢的问题, 利用表格式和索引实现细粒度的数据过滤. 数据湖可以将整个链路变为分钟级, 从离线链路转化为近实时链路并且提升查询速度. 当然, 数据湖还解决了一些其他问题, 如ACID, Schema Evolution, Partition Evolution, Time Travel等.

文件布局

具体查询流程就是从Catalog -> Table Metadata File -> Snapshot -> Manifest List -> Manifest File -> Data File. 如图所示

image.png

Catalog

Catalog 本质就是一个维护表元数据文件的目录(个人理解), 有多种实现方式:

HiveCatalog Hive Metastore 表属性中key值为metadata_location的键值对
HadoopCatalog 文件系统 version-hint.text
JDBC Catalog 关系型数据库 专门的表: 例如jdbc_catalog
REST Catalog 独立的 Web 服务 API端点返回的json

Table Metadata File (vN.metadata.json)

表的元数据文件,记录了表的完整定义和历史快照。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
{
"format-version": 1,
"table-uuid": "f7m1a7b4-c111-407a-a6e1-433a233a1e12",
"location": "s3://my-bucket/warehouse/db/events", //根路径
"last-updated-ms": 1672531200000,
"last-column-id": 4,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{
"id": 1,
"name": "event_ts",
"required": true,
"type": "timestamptz",
"doc": "Event timestamp with timezone"
},
{
"id": 2,
"name": "level",
"required": true,
"type": "string"
},
{
"id": 3,
"name": "message",
"required": false,
"type": "string"
},
{
"id": 4,
"name": "extra_info",
"required": false,
"type": {
"type": "map",
"key-id": 5,
"value-id": 6,
"value-required": false
}
}
]
}
],
"current-schema-id": 0,
"partition-specs": [
{
"spec-id": 0,
"fields": [
{
"name": "day",
"transform": "day",
"source-id": 1,
"field-id": 1000
}
]
}
],
"default-spec-id": 0,
"properties": {
"write.format.default": "parquet",
"commit.retry.num-retries": "2"
},
"current-snapshot-id": 3051729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"parent-snapshot-id": null,
"timestamp-ms": 1672531200000,
"summary": {
"operation": "append",
"spark.app.id": "local-1599119293123",
"added-data-files": "5",
"added-records": "10550",
"added-files-size": "34201",
"changed-partition-count": "1",
"total-records": "10550",
"total-files-size": "34201",
"total-data-files": "5",
"total-delete-files": "0",
"total-position-deletes": "0",
"total-equality-deletes": "0"
},
"manifest-list": "s3://my-bucket/warehouse/db/events/metadata/snap-3051729675574597004-1-2945e143-5152-4091-8869-711e86098059.avro",
"schema-id": 0
}
],
"snapshot-log": [
{
"timestamp-ms": 1672531200000,
"snapshot-id": 3051729675574597004
}
]
}

Manifest List (snap---.avro)

列出了所有的Manifest File及相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[
{
"manifest_path": "s3://warehouse/db/table/metadata/e852441a-5c3c-42a2-b2d9-e31a19a9d701-m0.avro",
"manifest_length": 6224,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 2,
"min_sequence_number": 2,
"added_snapshot_id": 874657636349826781,
"added_files_count": 4,
"existing_files_count": 0,
"deleted_files_count": 0,
"added_rows_count": 4000,
"existing_rows_count": 0,
"deleted_rows_count": 0,
"partitions": [
{
"contains_null": false,
"contains_nan": false,
"lower_bound": "2025-07-31T10:00:00.000Z",
"upper_bound": "2025-07-31T12:00:00.000Z"
}
],
"key_metadata": null
},
{
"manifest_path": "s3://warehouse/db/table/metadata/f334a123-cee3-4733-ac49-bd24a5a176d1-m1.avro",
"manifest_length": 1056,
"partition_spec_id": 0,
"content": 1,
"sequence_number": 2,
"min_sequence_number": 2,
"added_snapshot_id": 874657636349826781,
"added_files_count": 1,
"existing_files_count": 0,
"deleted_files_count": 0,
"added_rows_count": 50,
"existing_rows_count": 0,
"deleted_rows_count": 0,
"partitions": [
{
"contains_null": false,
"contains_nan": false,
"lower_bound": "2025-07-31T11:00:00.000Z",
"upper_bound": "2025-07-31T11:00:00.000Z"
}
],
"key_metadata": null
}
]

Manifest File (.avro)

包含Data File及相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[
{
"status": 1,
"snapshot_id": 4876662349891823142,
"sequence_number": 1,
"file_path": "s3://my-bucket/db/logs/data/event_date=2025-07-30/00000-1-....parquet",
"file_format": "PARQUET",
"partition": {
"event_date": "2025-07-30"
},
"record_count": 4800,
"file_size_in_bytes": 5242880,
"column_sizes": { "1": 76800, "2": 24000, "3": 4234880 },
"value_counts": { "1": 4800, "2": 4800, "3": 4800 },
"null_value_counts": { "1": 0, "2": 0, "3": 0 },
"nan_value_counts": {},
"lower_bounds": {
"1": "2025-07-30T00:00:00.000Z",
"2": "INFO"
},
"upper_bounds": {
"1": "2025-07-30T11:59:59.999Z",
"2": "WARN"
}
},
{
"status": 1,
"snapshot_id": 4876662349891823142,
"sequence_number": 1,
"file_path": "s3://my-bucket/db/logs/data/event_date=2025-07-30/00001-2-....parquet",
"file_format": "PARQUET",
"partition": {
"event_date": "2025-07-30"
},
"record_count": 5200,
"file_size_in_bytes": 5872025,
"column_sizes": { "1": 83200, "2": 26000, "3": 4962825 },
"value_counts": { "1": 5200, "2": 5200, "3": 5200 },
"null_value_counts": { "1": 0, "2": 0, "3": 0 },
"nan_value_counts": {},
"lower_bounds": {
"1": "2025-07-30T12:00:00.000Z",
"2": "INFO"
},
"upper_bounds": {
"1": "2025-07-30T23:59:59.999Z",
"2": "INFO"
}
}
]

一个例子

  1. 建表
1
2
3
4
5
6
7
8
9
10
CREATE TABLE ice_spark.hdfs.t_user1 (
id int,
name string,
ts string
) USING iceberg;

t_user1/
├── metadata
│ └── v1.metadata.json
└── version-hint.text

version-hint.text存的就是当前matadata file的版本信息, 让系统知道是vN.metadata.json

  1. 插入一条数据
1
2
3
4
5
6
7
8
9
10
11
INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES(1, 'hlink', 20250709);

t_user1/
├── data
│ └── 00000-5-ba90a31f-d65c-4e40-9321-3faa8c184547-0-00001.parquet
├── metadata
│ ├── 44a2bbda-e252-4d77-b958-b3cf35708b14-m0.avro
│ ├── snap-8289612053519823397-1-44a2bbda-e252-4d77-b958-b3cf35708b14.avro
│ ├── v1.metadata.json
│ └── v2.metadata.json
└── version-hint.text

多了数据文件00000-5-ba90a31f-d65c-4e40-9321-3faa8c184547-0-00001.parquet, version-hint.text更新到版本2, 新增v2.metadata.json, 新增manifest list文件snap-8289612053519823397-1-44a2bbda-e252-4d77-b958-b3cf35708b14.avro, 新增了manifeat file文件44a2bbda-e252-4d77-b958-b3cf35708b14-m0.avro.

  1. 再插入一条数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES(2, 'iceberg', 20250709);

t_user1/
├── data
│ ├── 00000-5-ba90a31f-d65c-4e40-9321-3faa8c184547-0-00001.parquet
│ └── 00000-7-906189d5-3aba-4a96-9f8b-008d5af2f94d-0-00001.parquet
├── metadata
│ ├── 28173fec-5f71-444c-9abd-543468e78636-m0.avro
│ ├── 44a2bbda-e252-4d77-b958-b3cf35708b14-m0.avro
│ ├── snap-4152532609887149918-1-28173fec-5f71-444c-9abd-543468e78636.avro
│ ├── snap-8289612053519823397-1-44a2bbda-e252-4d77-b958-b3cf35708b14.avro
│ ├── v1.metadata.json
│ ├── v2.metadata.json
│ └── v3.metadata.json
└── version-hint.text

与之前的插入操作是同样的道理

Schema Evolution

Schema Evolution简单来说就是对表格式的变更, Schema Evolution只改变元数据, 不改变底层的数据文件, 支持的变更有:

  1. 添加字段 (Add):可以向 schema 中增加新的字段。新增的字段会被分配一个新的、唯一的字段ID。
  2. 删除字段 (Delete):可以从当前 schema 中移除一个字段。
  3. 重命名字段 (Rename):可以更改一个已存在字段的名称,但其字段ID不会改变。
  4. 重排字段顺序 (Reorder):可以调整已存在字段在 schema 中的顺序。
  5. 类型提升 (Type Promotion):可以将原生类型(primitive type)提升为另一个兼容的类型。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
-- 添加column
ALTER TABLE ice_spark_hdfs.t_user ADD COLUMN age int;
--指定列后面更加column
ALTER TABLE ice_spark_hdfs.t_user ADD COLUMN sex int AFTER id;
--修改列名
ALTER TABLE ice_spark_hdfs.t_user RENAME COLUMN sex TO sex1;
--修改列类型
ALTER TABLE ice_spark_hdfs.t_user ALTER COLUMN sex1 TYPE bigint;
--添加注释
ALTER TABLE ice_spark_hdfs.t_user ALTER COLUMN sex1 COMMENT 'table sex';
--删除列
ALTER TABLE ice_spark_hdfs.t_user DROP COLUMN sex1;

如果新增一个列, 那原来的数据没有这个列, 查询怎么办? 首先Iceberg会意识到原来的Schema没有这个字段, 对于这个没有的字段查询是不会落实的. 这里就要提到**“Column Projection” (列投影)这个机制.** 当查询数据时, Iceberg 使用的是表的**当前 schema(包含了新增字段), 但数据文件本身是用旧的 schema(**不包含新增字段)写入的. 当 Iceberg 在读取旧数据文件前, 在查询schema时发现查询需要的一个 field-id 在文件中并不存在, 它会遵循一套明确的规则来解析这个值:

  1. 从分区数据中获取:如果该字段存在一个 identity(恒等)分区转换,并且分区值存在于清单文件(manifest)的 data_file 对象的 partition 结构中,那么就直接使用这个分区值。这主要用于从 Hive 表迁移等场景。
  2. 使用名称映射 (Name Mapping):如果表配置了 schema.name-mapping.default 属性,Iceberg 会尝试使用这个映射从没有 field-id 的旧文件中按名称找到对应的列。
  3. 使用 initial-default (初始默认值):如果该字段的 schema 定义中包含一个 initial-default 值,那么 Iceberg 就会返回这个预设的默认值。
  4. 返回 null:如果以上所有规则都不适用,那么 Iceberg 将为这个字段返回 null

Partitioning(分区)

核心是将数据按照特定规则(如相同日期、类别的记录)物理存储在一起(文件夹 dt=20250213),减少查询时扫描的数据量,提升性能。是通过数据存储优化来提升查询性能的一种技术.

一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE ice_spark.hdfs.ns.t_user1 (
id INT,
name STRING,
dt DATE
)
USING iceberg
PARTITIONED BY (days(dt)); -- 按天对 dt 列进行分区

INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES (1, 'hlink', 20250709);

t_user1/
├── data
└── dt=20250709
│ └── 00000-5-ba90a31f-d65c-4e40-9321-3faa8c184547-0-00001.parquet
├── metadata
│ ├── 44a2bbda-e252-4d77-b958-b3cf35708b14-m0.avro
│ ├── snap-8289612053519823397-1-44a2bbda-e252-4d77-b958-b3cf35708b14.avro
│ ├── v1.metadata.json
│ └── v2.metadata.json
└── version-hint.text

但不一定是这种布局, 这种的类似, 因为Iceberg的分区不一定要放在一个统一的目录下, Iceberg有隐藏分区设计, 通过 逻辑与物理分离 解决了传统分区(如 Hive 分区)的痛点,分区列维护成本高、分区策略无法灵活变更等问题。几个好处:

  1. 分区转换函数帮助自动进行分区
  2. 查询时不需指明分区
  3. 分区策略动态演化

Partition Evolution

修改分区, 和Schema Evolution类似, 只改变元数据**, 不改变底层的数据文件,** 不会修改旧的分区规范,而是会创建一个全新的分区规范 (Partition Spec)

新建一个包含多个分区的表:

1
2
3
4
5
6
7
CREATE TABLE ice_spark_hdfs.t_user_part_hidden (
id bigint,
name String,
ts timestamp,
dt string
) USING iceberg
PARTITIONED BY (days(ts), bucket(3, id));

删除一个分区, 修改一个分区:

1
2
3
ALTER TABLE ice_spark_hdfs.t_user_part_hidden DROP PARTITION FIELD bucket(3, id);
ALTER TABLE ice_spark_hdfs.t_user_part_hidden
REPLACE PARTITION FIELD days(ts) WITH months(ts) AS month;

查询时旧数据按旧分区查, 新数据按新分区查:

  1. 遍历当前快照中的每一个清单文件(查Manifest List)
  2. 对于每一个清单文件,它会读取该文件元数据中记录的 partition-spec-id
  3. 从表元数据的 partition-specs 列表中找到与该 ID 对应的那个历史版本的分区规范
  4. 使用这个与数据匹配的历史分区规范,来转换用户查询中的过滤条件,从而对该清单文件下的数据文件进行有效的分区裁剪

Time Travel

Iceberg 的 时间旅行 功能允许用户查询表在 特定时间点特定版本 的历史数据快照,无需手动备份或迁移数据。

时间旅行的核心能力:

  • 按时间戳查询:指定具体时间(如 '2023-10-01 10:00:00'),查询该时刻的数据状态
  • 按快照ID查询:通过唯一快照 ID(如 10963874102873)定位数据版本。

使用示例:

1
2
SELECT * FROM ice_spark_hdfs.t_user_part_hidden VERSION AS OF 807038129550906544;
SELECT * FROM ice_spark_hdfs.t_user_part_hidden TIMESTAMP AS OF '2025-02-15 00:39:01.33';

如何实现:

遍历snapshot-log, 查询在时间戳正好在指定的时间点之前或之时的最后一个快照ID或者指定的快照ID, 然后就是正常的查询流程

具体参数的设计可以看下面的文档:

spec

声明: 本文给出的例子不全是真实数据, 只是为了方便理解🥰