Using Impala with Iceberg Tables
Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).
For more information on Iceberg, see the Apache Iceberg site.
Overview of Iceberg features
- ACID compliance: DML operations are atomic, queries always read a consistent snapshot.
- Hidden partitioning: Iceberg produces partition values by taking a column value and optionally transforming it. Partition information is stored in the Iceberg metadata files. Iceberg is able to TRUNCATE column values or calculate a hash of them and use it for partitioning. Readers don't need to be aware of the partitioning of the table.
- Partition layout evolution: When the data volume or the query patterns change you can update the layout of a table. Since hidden partitioning is used, you don't need to rewrite the data files during partition layout evolution.
- Schema evolution: supports add, drop, update, or rename schema elements, and has no side-effects.
- Time travel: enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes.
- Cloning Iceberg tables: create an empty Iceberg table based on the definition of another Iceberg table.
Creating Iceberg tables with Impala
When you have an existing Iceberg table that is not yet present in the Hive Metastore,
you can use the CREATE EXTERNAL TABLE
command in Impala to add the table to the Hive
Metastore and make Impala able to interact with this table. Currently Impala supports
HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog,
and you are using the same Hive Metastore, you need no further actions.
-
HadoopTables. When the table already exists in a HadoopTable it means there is
a location on the file system that contains your table. Use the following command
to add this table to Impala's catalog:
CREATE EXTERNAL TABLE ice_hadoop_tbl STORED AS ICEBERG LOCATION '/path/to/table' TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
-
HadoopCatalog. A table in HadoopCatalog means that there is a catalog location
in the file system under which Iceberg tables are stored. Use the following command
to add a table in a HadoopCatalog to Impala:
CREATE EXTERNAL TABLE ice_hadoop_cat STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/path/to/catalog', 'iceberg.table_identifier'='namespace.table');
-
Alternatively, you can also use custom catalogs to use existing tables. It means you need to define
your catalog in hive-site.xml.
The advantage of this method is that other engines are more likely to be able to interact with this table.
Please note that the automatic metadata update will not work for these tables, you will have to manually
call REFRESH on the table when it changes outside Impala.
To globally register different catalogs, set the following Hadoop configurations:
Config Key Description iceberg.catalog.<catalog_name>.type type of catalog: hive, hadoop, or left unset if using a custom catalog iceberg.catalog.<catalog_name>.catalog-impl catalog implementation, must not be null if type is empty iceberg.catalog.<catalog_name>.<key> any config key and value pairs for the catalog For example, to register a HadoopCatalog called 'hadoop', set the following properties in hive-site.xml:iceberg.catalog.hadoop.type=hadoop; iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse;
Then in the CREATE TABLE statement you can just refer to the catalog name:CREATE EXTERNAL TABLE ice_catalogs STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='<CATALOG-NAME>');
- If the table already exists in HiveCatalog then Impala should be able to see it without any additional commands.
EXTERNAL
keyword. To create an Iceberg table in HiveCatalog the following
CREATE TABLE statement can be used:
CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
By default Impala assumes that the Iceberg table uses Parquet data files. ORC and AVRO are also supported, but we need to tell Impala via setting the table property 'write.format.default' to e.g. 'ORC'.
CREATE TABLE AS SELECT
to create new Iceberg tables, e.g.:
CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;
CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;
CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
Iceberg V2 tables
- position deletes
- equality deletes
CREATE TABLE
statement, they just need to specify
the 'format-version' table property:
CREATE TABLE ice_v2 (i int) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
ALTER TABLE
statement to do so:
ALTER TABLE ice_v1_to_v2 SET TBLPROPERTIES('format-version'='2');
Dropping Iceberg tables
DROP TABLE
statement to remove an Iceberg table:
DROP TABLE ice_t;
When external.table.purge
table property is set to true, then the
DROP TABLE
statement will also delete the data files. This property
is set to true when Impala creates the Iceberg table via CREATE TABLE
.
When CREATE EXTERNAL TABLE
is used (the table already exists in some
catalog) then this external.table.purge
is set to false, i.e.
DROP TABLE
doesn't remove any files, only the table definition
in HMS.
Supported Data Types for Iceberg Columns
You can get information about the supported Iceberg data types in the Iceberg spec.
Iceberg type | SQL type in Impala |
---|---|
boolean | BOOLEAN |
int | INTEGER |
long | BIGINT |
float | FLOAT |
double | DOUBLE |
decimal(P, S) | DECIMAL(P, S) |
date | DATE |
time | Not supported |
timestamp | TIMESTAMP |
timestamptz | Only read support via TIMESTAMP |
string | STRING |
uuid | Not supported |
fixed(L) | Not supported |
binary | Not supported |
struct | STRUCT (read only) |
list | ARRAY (read only) |
map | MAP (read only) |
Schema evolution of Iceberg tables
ALTER TABLE ... RENAME TO ...
(renames the table if the Iceberg catalog supports it)ALTER TABLE ... CHANGE COLUMN ...
(change name and type of a column iff the new type is compatible with the old type)ALTER TABLE ... ADD COLUMNS ...
(adds columns to the end of the table)ALTER TABLE ... DROP COLUMN ...
- int to long
- float to double
- decimal(P, S) to decimal(P', S) if P' > P – widen the precision of decimal types.
Impala currently does not support schema evolution for tables with AVRO file format.
See schema evolution for more details.
Partitioning Iceberg tables
The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.
PARTITIONED BY SPEC
clause to the CREATE TABLE statement, e.g.:
CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
ALTER TABLE SET PARTITION SPEC
statement, e.g.:
ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
- Do not reorder partition fields
- Do not drop partition fields; instead replace the field’s transform with the void transform
- Only add partition fields at the end of the previous partition spec
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
One can inspect a table's partition spec by the SHOW PARTITIONS
or
SHOW CREATE TABLE
commands.
Inserting data into Iceberg tables
Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO
and INSERT OVERWRITE
DML statements are supported. One can also remove the
contents of an Iceberg table via the TRUNCATE
command.
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
INSERT OVERWRITE
statements can replace data in the table with the result of a query.
For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced
by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query
remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform
because dynamic overwrite behavior would be too random in this case. If one needs to replace all
contents of a table, they can still use TRUNCATE
and INSERT INTO
.
Impala can only write Iceberg tables with Parquet data files.
Delete data from Iceberg tables
DELETE
statements against
Iceberg V2 tables. E.g.:
DELETE FROM ice_t where i = 3;
More information about the DELETE
statement can be found at DELETE Statement (Impala 2.8 or higher only).
Updating data int Iceberg tables
UPDATE
statements against
Iceberg V2 tables. E.g.:
UPDATE ice_t SET val = val + 1;
UPDATE ice_t SET k = 4 WHERE i = 5;
UPDATE ice_t SET ice_t.k = o.k, ice_t.j = o.j, FROM ice_t, other_table o where ice_t.id = o.id;
The UPDATE FROM statement can be used to update a target Iceberg table based on a source table (or view) that doesn't need to be an Iceberg table. If there are multiple matches on the JOIN condition, Impala will raise an error.
- Only the merge-on-read update mode is supported.
- Only writes position delete files, i.e. no support for writing equality deletes.
- Cannot update tables with complex types.
- Can only write data and delete files in Parquet format. This means if table properties 'write.format.default' and 'write.delete.format.default' are set, their values must be PARQUET.
- Updating partitioning column with non-constant expression via the UPDATE FROM statement is not allowed. The upcoming MERGE statement will not have this limitation.
More information about the UPDATE
statement can be found at UPDATE Statement (Impala 2.8 or higher only).
Loading data into Iceberg tables
LOAD DATA
statement can be used to load a single file or directory into
an existing Iceberg table. This operation is executed differently compared to HMS tables, the
data is being inserted into the table via sequentially executed statements, which has
some limitations:
- Only Parquet or ORC files can be loaded.
PARTITION
clause is not supported, but the partition transformations are respected.- The loaded files will be re-written as Parquet files.
Optimizing (Compacting) Iceberg tables
OPTIMIZE TABLE [db_name.]table_name;
OPTIMIZE TABLE
statement rewrites
the entire table, executing the following tasks:
- compact small files
- merge delete and update deltas
- rewrite all files, converting them to the latest table schema
- rewrite all partitions according to the latest partition spec
- The user needs ALL privileges on the table.
- The table can conatin any file formats that Impala can read, but
write.format.default
has to beparquet
. - The table cannot contain complex types.
When a table is optimized, a new snapshot is created. The old table state is still accessible by time travel to previous snapshots, because the rewritten data and delete files are not removed physically.
Note that the current implementation of OPTIMIZE TABLE
rewrites
the entire table, therefore this operation can take a long time to complete
depending on the size of the table.
Time travel for Iceberg tables
Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.
FOR SYSTEM_TIME AS OF
and FOR SYSTEM_VERSION AS OF
clauses in SELECT
queries, e.g.:
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
DESCRIBE HISTORY
statement with the following syntax:
DESCRIBE HISTORY [db_name.]table_name
[FROM timestamp];
DESCRIBE HISTORY [db_name.]table_name
[BETWEEN timestamp AND timestamp]
For example:
DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';
DESCRIBE HISTORY
statement is formed
of the following columns:
creation_time
: the snapshot's creation timestamp.snapshot_id
: the snapshot's ID or null.parent_id
: the snapshot's parent ID or null.is_current_ancestor
: TRUE if the snapshot is a current ancestor of the table.
Please note that time travel queries are executed using the old schema of the table from the point specified by the time travel parameters. Prior to Impala 4.3.0 the current table schema is used to query an older snapshot of the table, which might have had a different schema in the past.
Rolling Iceberg tables back to a previous state
Iceberg table modifications cause new table snapshots to be created;
these snapshots represent an earlier version of the table.
The ALTER TABLE [db_name.]table_name EXECUTE ROLLBACK
statement can be used to roll back the table to a previous snapshot.
123456
use:
ALTER TABLE ice_tbl EXECUTE ROLLBACK(123456);
To roll the table back to the most recent (newest) snapshot
that has a creation timestamp that is older than the timestamp '2022-01-04 10:00:00' use:
ALTER TABLE ice_tbl EXECUTE ROLLBACK('2022-01-04 10:00:00');
The timestamp is evaluated using the Timezone for the current session.
It is only possible to roll back to a snapshot that is a current ancestor of the table.
When a table is rolled back to a snapshot, a new snapshot is created with the same snapshot id, but with a new creation timestamp.
Expiring snapshots
ALTER TABLE ... EXECUTE expire_snapshots(...)
statement, which will expire snapshots that are older than the specified
timestamp. For example:
ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');
ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
- does not remove old metadata files by default.
- does not remove orphaned data files.
- respects the minimum number of snapshots to keep:
history.expire.min-snapshots-to-keep
table property.
Old metadata file clean up can be configured with
write.metadata.delete-after-commit.enabled=true
and
write.metadata.previous-versions-max
table properties. This
allows automatic metadata file removal after operations that modify metadata
such as expiring snapshots or inserting data.
Iceberg metadata tables
Iceberg stores extensive metadata for each table (e.g. snapshots, manifests, data and delete files etc.), which is accessible in Impala in the form of virtual tables called metadata tables.
Metadata tables can be queried just like regular tables, including filtering, aggregation and joining with other metadata and regular tables. On the other hand, they are read-only, so it is not possible to change, add or remove records from them, they cannot be dropped and new metadata tables cannot be created. Metadata changes made in other ways (not through metadata tables) are reflected in the tables.
SHOW
METADATA TABLES
command:
SHOW METADATA TABLES IN [db.]tbl [[LIKE] “pattern”]
It is possible to filter the result using pattern
. All Iceberg
tables have the same metadata tables, so this command is mostly for convenience.
Using SHOW METADATA TABLES
on a non-Iceberg table results in an
error.
Just like regular tables, metadata tables have schemas that can be queried with
the DESCRIBE
command. Note, however, that DESCRIBE
FORMATTED|EXTENDED
are not available for metadata tables.
DESCRIBE functional_parquet.iceberg_alltypes_part.history;
To retrieve information from metadata tables, use the usual
SELECT
statement. You can select any subset of the columns or all
of them using ‘*’. Note that in contrast to regular tables, SELECT
*
on metadata tables always includes complex-typed columns in the result.
Therefore, the query option EXPAND_COMPLEX_TYPES
only applies to
regular tables. This holds also in queries that mix metadata tables and regular
tables: for SELECT *
expressions from metadata tables, complex
types will always be included, and for SELECT *
expressions from
regular tables, complex types will be included if and only if
EXPAND_COMPLEX_TYPES
is true.
Note that unnesting collections from metadata tables is not supported.
SELECT
s.operation,
h.is_current_ancestor,
s.summary
FROM functional_parquet.iceberg_alltypes_part.history h
JOIN functional_parquet.iceberg_alltypes_part.snapshots s
ON h.snapshot_id = s.snapshot_id
WHERE s.operation = 'append'
ORDER BY made_current_at;
Cloning Iceberg tables (LIKE clause)
CREATE TABLE ... LIKE ...
to create an empty Iceberg table
based on the definition of another Iceberg table, including any column attributes in
the original table:
CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;
Because of the Data Types of Iceberg and Impala do not correspond one by one, Impala can only clone between Iceberg tables.
Iceberg table properties
-
iceberg.catalog
: controls which catalog is used for this Iceberg table. It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml iceberg.catalog_location
: Iceberg table catalog location wheniceberg.catalog
is'hadoop.catalog'
iceberg.table_identifier
: Iceberg table identifier. We use <database>.<table> instead if this property is not setwrite.format.default
: data file format of the table. Impala can read AVRO, ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.write.parquet.compression-codec
: Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY (default value), LZ4, ZSTD. The table property will be ignored ifCOMPRESSION_CODEC
query option is set.write.parquet.compression-level
: Parquet compression level. Used with ZSTD compression only. Supported range is [1, 22]. Default value is 3. The table property will be ignored ifCOMPRESSION_CODEC
query option is set.write.parquet.row-group-size-bytes
: Parquet row group size in bytes. Supported range is [8388608, 2146435072] (8MB - 2047MB). The table property will be ignored ifPARQUET_FILE_SIZE
query option is set. If neither the table property nor thePARQUET_FILE_SIZE
query option is set, the way Impala calculates row group size will remain unchanged.write.parquet.page-size-bytes
: Parquet page size in bytes. Used for PLAIN encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates page size will remain unchanged.write.parquet.dict-size-bytes
: Parquet dictionary page size in bytes. Used for dictionary encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates dictionary page size will remain unchanged.
Iceberg manifest caching
iceberg.io-impl=org.apache.iceberg.hadoop.HadoopFileIO;
iceberg.io.manifest.cache-enabled=true;
iceberg.io.manifest.cache.max-total-bytes=104857600;
iceberg.io.manifest.cache.expiration-interval-ms=3600000;
iceberg.io.manifest.cache.max-content-length=8388608;
-
iceberg.io-impl
: custom FileIO implementation to use in a catalog. Must be set to enable manifest caching. Impala defaults to HadoopFileIO. It is recommended to not change this to other than HadoopFileIO. -
iceberg.io.manifest.cache-enabled
: enable/disable the manifest caching feature. -
iceberg.io.manifest.cache.max-total-bytes
: maximum total amount of bytes to cache in the manifest cache. Must be a positive value. -
iceberg.io.manifest.cache.expiration-interval-ms
: maximum duration for which an entry stays in the manifest cache. Must be a non-negative value. Setting zero means cache entries expire only if it gets evicted due to memory pressure fromiceberg.io.manifest.cache.max-total-bytes
. -
iceberg.io.manifest.cache.max-content-length
: maximum length of a manifest file to be considered for caching in bytes. Manifest files with a length exceeding this property value will not be cached. Must be set with a positive value and lower thaniceberg.io.manifest.cache.max-total-bytes
.
Manifest caching only works for tables that are loaded with either of
HadoopCatalogs or HiveCatalogs. Individual HadoopCatalog and HiveCatalog will have
separate manifest caches with the same configuration. By default, only 8 catalogs
can have their manifest cache active in memory. This number can be raised by
setting a higher value in the java system property
iceberg.io.manifest.cache.fileio-max
.