Using Impala to Query Kudu Tables
You can use Impala to query tables stored by Apache Kudu. This capability allows convenient access to a storage system that is tuned for different kinds of workloads than the default with Impala.
By default, Impala tables are stored on HDFS using data files with various file formats. HDFS files are ideal for bulk loads (append operations) and queries using full-table scans, but do not support in-place updates or deletes. Kudu is an alternative storage engine used by Impala which can do both in-place updates (for mixed read/write workloads) and fast scans (for data-warehouse/analytic operations). Using Kudu tables with Impala can simplify the ETL pipeline by avoiding extra steps to segregate and reorganize newly arrived data.
Certain Impala SQL statements and clauses, such as DELETE
,
UPDATE
, UPSERT
, and PRIMARY KEY
work
only with Kudu tables. Other statements and clauses, such as LOAD DATA
,
TRUNCATE TABLE
, and INSERT OVERWRITE
, are not applicable
to Kudu tables.
Benefits of Using Kudu Tables with Impala
The combination of Kudu and Impala works best for tables where scan performance is important, but data arrives continuously, in small batches, or needs to be updated without being completely replaced. HDFS-backed tables can require substantial overhead to replace or reorganize data files as new data arrives. Impala can perform efficient lookups and scans within Kudu tables, and Impala can also perform update or delete operations efficiently. You can also use the Kudu Java, C++, and Python APIs to do ingestion or transformation operations outside of Impala, and Impala can query the current data at any time.
Configuring Impala for Use with Kudu
The -kudu_master_hosts
configuration property must be set correctly
for the impalad daemon, for CREATE TABLE ... STORED AS
KUDU
statements to connect to the appropriate Kudu server. Typically, the
required value for this setting is kudu_host:7051
.
In a high-availability Kudu deployment, specify the names of multiple Kudu hosts separated by commas.
If the -kudu_master_hosts
configuration property is not set, you can
still associate the appropriate value for each table by specifying a
TBLPROPERTIES('kudu.master_addresses')
clause in the CREATE TABLE
statement or
changing the TBLPROPERTIES('kudu.master_addresses')
value with an ALTER TABLE
statement.
Cluster Topology for Kudu Tables
With HDFS-backed tables, you are typically concerned with the number of DataNodes in the cluster, how many and how large HDFS data files are read during a query, and therefore the amount of work performed by each DataNode and the network communication to combine intermediate results and produce the final result set.
With Kudu tables, the topology considerations are different, because:
-
The underlying storage is managed and organized by Kudu, not represented as HDFS data files.
-
Kudu handles some of the underlying mechanics of partitioning the data. You can specify the partitioning scheme with combinations of hash and range partitioning, so that you can decide how much effort to expend to manage the partitions as new data arrives. For example, you can construct partitions that apply to date ranges rather than a separate partition for each day or each hour.
-
Data is physically divided based on units of storage called tablets. Tablets are stored by tablet servers. Each tablet server can store multiple tablets, and each tablet is replicated across multiple tablet servers, managed automatically by Kudu. Where practical, co-locate the tablet servers on the same hosts as the Impala daemons, although that is not required.
Kudu Replication Factor
By default, Kudu tables created through Impala use a tablet
replication factor of 3. To change the replication factor for a Kudu
table, specify the replication factor using TBLPROPERTIES
('kudu.num_tablet_replicas' = 'n')
in the CREATE TABLE Statement statement.
The number of replicas for a Kudu table must be odd.
Altering the kudu.num_tablet_replicas
property after
table creation currently has no effect.
Impala DDL Enhancements for Kudu Tables (CREATE TABLE and ALTER TABLE)
You can use the Impala CREATE TABLE
and ALTER TABLE
statements to create and fine-tune the characteristics of Kudu tables. Because Kudu
tables have features and properties that do not apply to other kinds of Impala tables,
familiarize yourself with Kudu-related concepts and syntax first.
For the general syntax of the CREATE TABLE
statement for Kudu tables, see CREATE TABLE Statement.
Non-unique Primary Keys for Kudu Tables
Kudu now allows a user to create a non-unique primary key for a table when creating a table. The data engine handles this by appending a system generated auto-incrementing column to the non-unique primary key columns. This is done to guarantee the uniqueness of the primary key. This auto-incrementing column is named as 'auto_incrementing_id' with bigint type and this column is only system generated and cannot be explicitly created by the user. This auto_incrementing_id column is unique across a partition/tablet i.e. every partition/tablet would have this column starting from one and incrementing monotonically. The assignment to this column during insertion is automatic.
Create a Kudu Table with a non-unique PRIMARY KEY
The following example shows creating a table with a non-unique PRIMARY KEY.
CREATE TABLE kudu_tbl1
(
id INT NON UNIQUE PRIMARY KEY,
name STRING
)
PARTITION BY HASH (id) PARTITIONS 3 STORED as KUDU;
The effective PRIMARY KEY in the above case will be {id, auto_increment_id}
Verify the PRIMARY KEY is non-unique
You can now check the PRIMARY KEY created is non-unique by running the following DESCRIBE command. A new property "key_unique" shows if the primary key is unique. System generated column "auto_incrementing_id" is shown in the output for the table as a non-unique primary key.
describe kudu_tbl1
+----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
| name | type | comment | primary_key | key_unique | nullable | default_value | encoding | compression | block_size |
+----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
| id | int | | true | false | false | | AUTO_ENCODING | DEFAULT_COMPRESSION | 0 |
| auto_incrementing_id | bigint | | true | false | false | | AUTO_ENCODING | DEFAULT_COMPRESSION | 0 |
| name | string | | false | | true | | AUTO_ENCODING | DEFAULT_COMPRESSION | 0 |
+----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
Fetched 3 row(s) in 4.72s
Query Auto Incrementing Column
When you query a table using the SELECT statement, it will not display the system generated auto incrementing column unless the column is explicitly specified in the select list.
Create a Kudu table without a PRIMARY KEY attribute
You can create a Kudu table without specifying a PRIMARY KEY or a PARTITION KEY since they are optional, however you cannot create a Kudu table without specifying both PRIMARY KEY and PARTITION KEY. If you do not specify the primary key attribute, the partition key columns can be promoted as a non-unique primary key. This is possible only if those columns are the beginning columns of the table.
In the following example, 'a' and 'b' will be promoted as a non-unique primary key, 'auto_incrementing_id' column will be added by Kudu engine. 'a', 'b' and 'auto_incrementing_id' form the effective unique composite primary key.
CREATE TABLE auto_table
(
a BIGINT,
b STRING,
)
PARTITION BY HASH(a, b) PARTITIONS 2 STORED AS KUDU;
The effective primary key in this case would be {a, b, auto_incrementing_id}
Limitations
- UPSERT operation is not supported for Kudu tables with non-unique primary key. If you run an UPSERT statement for a Kudu table with a non-unique primary key it will fail with an error.
- Since the auto generated key for each row will be assigned after the row’s data is generated and after the row lands in the tablet, you cannot use this column in the partition key.
Primary Key Columns for Kudu Tables
Kudu tables introduce the notion of primary keys to Impala for the first time. The primary key is made up of one or more columns, whose values are combined and used as a lookup key during queries. The primary key can be non-unique. The uniqueness of the primary key is guaranteed by appending a system-generated auto-incrementing column to the non-unique primary key columns. The tuple represented by these columns cannot contain any NULL values, and can never be updated once inserted. For a Kudu table, all the partition key columns must come from the set of primary key columns.
The primary key has both physical and logical aspects:
-
On the physical side, it is used to map the data values to particular tablets for fast retrieval. Because the tuples formed by the primary key values are unique, the primary key columns are typically highly selective.
-
You can insert non-unique data using an INSERT statement but the data saved in Kudu table for each row which will be turned to unique by the system generated auto-incrementing column. If the primary key is non-unique, the uniqueness will not cause insertion failure. However, if the primary key is set as non-unique and if an INSERT operation fails part way through, all rows except the rows with writing errors will be added into the table. The duplicated rows will be added with different values for auto-incrementing columns.
Impala only allows PRIMARY KEY
clauses and NOT NULL
constraints on columns for Kudu tables. These constraints are enforced on the Kudu side.
Kudu-Specific Column Attributes for CREATE TABLE
For the general syntax of the CREATE TABLE
statement for Kudu tables, see CREATE TABLE Statement.
The following sections provide more detail for some of the
Kudu-specific keywords you can use in column definitions.
The column list in a CREATE TABLE
statement can include the following
attributes, which only apply to Kudu tables:
[NON UNIQUE] PRIMARY KEY
| [NOT] NULL
| ENCODING codec
| COMPRESSION algorithm
| DEFAULT constant_expression
| BLOCK_SIZE number
See the following sections for details about each column attribute.
PRIMARY KEY Attribute
The primary key for a Kudu table is a column, or set of columns, that uniquely identifies every row. The primary key value also is used as the natural sort order for the values from the table. The primary key value for each row is based on the combination of values for the columns.
Because all of the primary key columns must have non-null values, specifying a column in the PRIMARY KEY or NON-UNIQUE PRIMARY KEY clause implicitly adds the NOT NULL attribute to that column.
The primary key columns must be the first ones specified in the CREATE
TABLE
statement. For a single-column primary key, you can include a
PRIMARY KEY
attribute inline with the column definition. For a
multi-column primary key, you include a PRIMARY KEY (c1,
c2, ...)
clause as a separate entry at the end of the
column list.
You can specify the PRIMARY KEY
attribute either inline in a single
column definition, or as a separate clause at the end of the column list:
CREATE TABLE pk_inline
(
col1 BIGINT PRIMARY KEY,
col2 STRING,
col3 BOOLEAN
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;
CREATE TABLE pk_at_end
(
col1 BIGINT,
col2 STRING,
col3 BOOLEAN,
PRIMARY KEY (col1)
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;
CREATE TABLE pk_inline
(
col1 BIGINT [NON UNIQUE] PRIMARY KEY,
col2 STRING,
col3 BOOLEAN
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;
CREATE TABLE pk_at_end
(
col1 BIGINT,
col2 STRING,
col3 BOOLEAN,
[NON UNIQUE] PRIMARY KEY (col1)
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;
When the primary key is a single column, these two forms are equivalent. If the primary key consists of more than one column, you must specify the primary key using a separate entry in the column list:
CREATE TABLE pk_multiple_columns
(
col1 BIGINT,
col2 STRING,
col3 BOOLEAN,
PRIMARY KEY (col1, col2)
) PARTITION BY HASH(col2) PARTITIONS 2 STORED AS KUDU;
The SHOW CREATE TABLE
statement always represents the
PRIMARY KEY
specification as a separate item in the column list:
CREATE TABLE inline_pk_rewritten (id BIGINT PRIMARY KEY, s STRING)
PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;
SHOW CREATE TABLE inline_pk_rewritten;
+------------------------------------------------------------------------------+
| result |
+------------------------------------------------------------------------------+
| CREATE TABLE user.inline_pk_rewritten ( |
| id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| s STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (id) |
| ) |
| PARTITION BY HASH (id) PARTITIONS 2 |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='host.example.com') |
+------------------------------------------------------------------------------+
The notion of primary key only applies to Kudu tables. Every Kudu table requires a primary key. The primary key consists of one or more columns. You must specify any primary key columns first in the column list or specify partition key with the beginning columns of the table.
The contents of the primary key columns cannot be changed by an
UPDATE
or UPSERT
statement. Including too many
columns in the primary key (more than 5 or 6) can also reduce the performance of
write operations. Therefore, pick the most selective and most frequently
tested non-null columns for the primary key specification.
If a column must always have a value, but that value
might change later, leave it out of the primary key and use a NOT
NULL
clause for that column instead. If an existing row has an
incorrect or outdated key column value, delete the old row and insert an entirely
new row with the correct primary key.
NULL | NOT NULL Attribute
For Kudu tables, you can specify which columns can contain nulls or not. This
constraint offers an extra level of consistency enforcement for Kudu tables. If an
application requires a field to always be specified, include a NOT
NULL
clause in the corresponding column definition, and Kudu prevents rows
from being inserted with a NULL
in that column.
For example, a table containing geographic information might require the latitude
and longitude coordinates to always be specified. Other attributes might be allowed
to be NULL
. For example, a location might not have a designated
place name, its altitude might be unimportant, and its population might be initially
unknown, to be filled in later.
Because all of the primary key columns must have non-null values, specifying a column in
the PRIMARY KEY
clause implicitly adds the NOT NULL
attribute to that column.
For non-Kudu tables, Impala allows any column to contain NULL
values, because it is not practical to enforce a "not null" constraint on HDFS
data files that could be prepared using external tools and ETL processes.
CREATE TABLE required_columns
(
id BIGINT PRIMARY KEY,
latitude DOUBLE NOT NULL,
longitude DOUBLE NOT NULL,
place_name STRING,
altitude DOUBLE,
population BIGINT
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;
During performance optimization, Kudu can use the knowledge that nulls are not
allowed to skip certain checks on each input row, speeding up queries and join
operations. Therefore, specify NOT NULL
constraints when
appropriate.
The NULL
clause is the default condition for all columns that are not
part of the primary key. You can omit it, or specify it to clarify that you have made a
conscious design decision to allow nulls in a column.
Because primary key columns cannot contain any NULL
values, the
NOT NULL
clause is not required for the primary key columns,
but you might still specify it to make your code self-describing.
DEFAULT Attribute
You can specify a default value for columns in Kudu tables. The default value can be any constant expression, for example, a combination of literal values, arithmetic and string operations. It cannot contain references to columns or non-deterministic function calls.
The following example shows different kinds of expressions for the
DEFAULT
clause. The requirement to use a constant value means that
you can fill in a placeholder value such as NULL
, empty string,
0, -1, 'N/A'
and so on, but you cannot reference functions or
column names. Therefore, you cannot use DEFAULT
to do things such as
automatically making an uppercase copy of a string value, storing Boolean values based
on tests of other columns, or add or subtract one from another column representing a sequence number.
CREATE TABLE default_vals
(
id BIGINT PRIMARY KEY,
name STRING NOT NULL DEFAULT 'unknown',
address STRING DEFAULT upper('no fixed address'),
age INT DEFAULT -1,
earthling BOOLEAN DEFAULT TRUE,
planet_of_origin STRING DEFAULT 'Earth',
optional_col STRING DEFAULT NULL
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;
When designing an entirely new schema, prefer to use NULL
as the
placeholder for any unknown or missing values, because that is the universal convention
among database systems. Null values can be stored efficiently, and easily checked with the
IS NULL
or IS NOT NULL
operators. The DEFAULT
attribute is appropriate when ingesting data that already has an established convention for
representing unknown or missing values, or where the vast majority of rows have some common
non-null value.
ENCODING Attribute
Each column in a Kudu table can optionally use an encoding, a low-overhead form of compression that reduces the size on disk, then requires additional CPU cycles to reconstruct the original values during queries. Typically, highly compressible data benefits from the reduced I/O to read the data back from disk.
-
AUTO_ENCODING
: use the default encoding based on the column type, which are bitshuffle for the numeric type columns and dictionary for the string type columns. -
PLAIN_ENCODING
: leave the value in its original binary format. -
RLE
: compress repeated values (when sorted in primary key order) by including a count. -
DICT_ENCODING
: when the number of different string values is low, replace the original string with a numeric ID. -
BIT_SHUFFLE
: rearrange the bits of the values to efficiently compress sequences of values that are identical or vary only slightly based on primary key order. The resulting encoded data is also compressed with LZ4. -
PREFIX_ENCODING
: compress common prefixes in string values; mainly for use internally within Kudu.
The following example shows the Impala keywords representing the encoding types.
(The Impala keywords match the symbolic names used within Kudu.)
For usage guidelines on the different kinds of encoding, see
the Kudu documentation.
The DESCRIBE
output shows how the encoding is reported after
the table is created, and that omitting the encoding (in this case, for the
ID
column) is the same as specifying DEFAULT_ENCODING
.
CREATE TABLE various_encodings
(
id BIGINT PRIMARY KEY,
c1 BIGINT ENCODING PLAIN_ENCODING,
c2 BIGINT ENCODING AUTO_ENCODING,
c3 TINYINT ENCODING BIT_SHUFFLE,
c4 DOUBLE ENCODING BIT_SHUFFLE,
c5 BOOLEAN ENCODING RLE,
c6 STRING ENCODING DICT_ENCODING,
c7 STRING ENCODING PREFIX_ENCODING
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;
-- Some columns are omitted from the output for readability.
describe various_encodings;
+------+---------+-------------+----------+-----------------+
| name | type | primary_key | nullable | encoding |
+------+---------+-------------+----------+-----------------+
| id | bigint | true | false | AUTO_ENCODING |
| c1 | bigint | false | true | PLAIN_ENCODING |
| c2 | bigint | false | true | AUTO_ENCODING |
| c3 | tinyint | false | true | BIT_SHUFFLE |
| c4 | double | false | true | BIT_SHUFFLE |
| c5 | boolean | false | true | RLE |
| c6 | string | false | true | DICT_ENCODING |
| c7 | string | false | true | PREFIX_ENCODING |
+------+---------+-------------+----------+-----------------+
COMPRESSION Attribute
You can specify a compression algorithm to use for each column in a Kudu table. This
attribute imposes more CPU overhead when retrieving the values than the
ENCODING
attribute does. Therefore, use it primarily for columns with
long strings that do not benefit much from the less-expensive ENCODING
attribute.
The choices for COMPRESSION
are LZ4
,
SNAPPY
, and ZLIB
.
Columns that use the BITSHUFFLE
encoding are already compressed
using LZ4
, and so typically do not need any additional
COMPRESSION
attribute.
The following example shows design considerations for several
STRING
columns with different distribution characteristics, leading
to choices for both the ENCODING
and COMPRESSION
attributes. The country
values come from a specific set of strings,
therefore this column is a good candidate for dictionary encoding. The
post_id
column contains an ascending sequence of integers, where
several leading bits are likely to be all zeroes, therefore this column is a good
candidate for bitshuffle encoding. The body
column and the corresponding columns for translated versions tend to be long unique
strings that are not practical to use with any of the encoding schemes, therefore
they employ the COMPRESSION
attribute instead. The ideal compression
codec in each case would require some experimentation to determine how much space
savings it provided and how much CPU overhead it added, based on real-world data.
CREATE TABLE blog_posts
(
user_id STRING ENCODING DICT_ENCODING,
post_id BIGINT ENCODING BIT_SHUFFLE,
subject STRING ENCODING PLAIN_ENCODING,
body STRING COMPRESSION LZ4,
spanish_translation STRING COMPRESSION SNAPPY,
esperanto_translation STRING COMPRESSION ZLIB,
PRIMARY KEY (user_id, post_id)
) PARTITION BY HASH(user_id, post_id) PARTITIONS 2 STORED AS KUDU;
BLOCK_SIZE Attribute
Although Kudu does not use HDFS files internally, and thus is not affected by
the HDFS block size, it does have an underlying unit of I/O called the
block size. The BLOCK_SIZE
attribute lets you set the
block size for any column.
The block size attribute is a relatively advanced feature. Refer to the Kudu documentation for usage details.
Partitioning for Kudu Tables
Kudu tables use special mechanisms to distribute data among the underlying
tablet servers. Although we refer to such tables as partitioned tables, they are
distinguished from traditional Impala partitioned tables by use of different clauses
on the CREATE TABLE
statement. Kudu tables use
PARTITION BY
, HASH
, RANGE
, and
range specification clauses rather than the PARTITIONED BY
clause
for HDFS-backed tables, which specifies only a column name and creates a new partition for each
different value.
For background information and architectural details about the Kudu partitioning mechanism, see the Kudu white paper, section 3.2.
The Impala DDL syntax for Kudu tables is different than in early Kudu versions,
which used an experimental fork of the Impala code. For example, the
DISTRIBUTE BY
clause is now PARTITION BY
, the
INTO n BUCKETS
clause is now
PARTITIONS n
and the range partitioning syntax
is reworked to replace the SPLIT ROWS
clause with more expressive
syntax involving comparison operators.
Hash Partitioning
Hash partitioning is the simplest type of partitioning for Kudu tables.
For hash-partitioned Kudu tables, inserted rows are divided up between a fixed number
of "buckets" by applying a hash function to the values of the columns specified
in the HASH
clause.
Hashing ensures that rows with similar values are evenly distributed, instead of
clumping together all in the same bucket. Spreading new rows across the buckets this
way lets insertion operations work in parallel across multiple tablet servers.
Separating the hashed values can impose additional overhead on queries, where
queries with range-based predicates might have to read multiple tablets to retrieve
all the relevant values.
-- 1M rows with 50 hash partitions = approximately 20,000 rows per partition.
-- The values in each partition are not sequential, but rather based on a hash function.
-- Rows 1, 99999, and 123456 might be in the same partition.
CREATE TABLE million_rows (id string primary key, s string)
PARTITION BY HASH(id) PARTITIONS 50
STORED AS KUDU;
-- Because the ID values are unique, we expect the rows to be roughly
-- evenly distributed between the buckets in the destination table.
INSERT INTO million_rows SELECT * FROM billion_rows ORDER BY id LIMIT 1e6;
The largest number of buckets that you can create with a PARTITIONS
clause varies depending on the number of tablet servers in the cluster, while the smallest is 2.
For simplicity, some of the simple CREATE TABLE
statements throughout this section
use PARTITIONS 2
to illustrate the minimum requirements for a Kudu table.
For large tables, prefer to use roughly 10 partitions per server in the cluster.
Range Partitioning
Range partitioning lets you specify partitioning precisely, based on single values or ranges
of values within one or more columns. You add one or more RANGE
clauses to the
CREATE TABLE
statement, following the PARTITION BY
clause.
Range-partitioned Kudu tables use one or more range clauses, which include a
combination of constant expressions, VALUE
or VALUES
keywords, and comparison operators. (This syntax replaces the SPLIT
ROWS
clause used with early Kudu versions.)
For the full syntax, see CREATE TABLE Statement.
-- 50 buckets, all for IDs beginning with a lowercase letter.
-- Having only a single range enforces the allowed range of values
-- but does not add any extra parallelism.
create table million_rows_one_range (id string primary key, s string)
partition by hash(id) partitions 50,
range (partition 'a' <= values < '{')
stored as kudu;
-- 50 buckets for IDs beginning with a lowercase letter
-- plus 50 buckets for IDs beginning with an uppercase letter.
-- Total number of buckets = number in the PARTITIONS clause x number of ranges.
-- We are still enforcing constraints on the primary key values
-- allowed in the table, and the 2 ranges provide better parallelism
-- as rows are inserted or the table is scanned.
create table million_rows_two_ranges (id string primary key, s string)
partition by hash(id) partitions 50,
range (partition 'a' <= values < '{', partition 'A' <= values < '[')
stored as kudu;
-- Same as previous table, with an extra range covering the single key value '00000'.
create table million_rows_three_ranges (id string primary key, s string)
partition by hash(id) partitions 50,
range (partition 'a' <= values < '{', partition 'A' <= values < '[', partition value = '00000')
stored as kudu;
-- The range partitioning can be displayed with a SHOW command in impala-shell.
show range partitions million_rows_three_ranges;
+---------------------+
| RANGE (id) |
+---------------------+
| VALUE = "00000" |
| "A" <= VALUES < "[" |
| "a" <= VALUES < "{" |
+---------------------+
When defining ranges, be careful to avoid "fencepost errors" where values at the
extreme ends might be included or omitted by accident. For example, in the tables defined
in the preceding code listings, the range "a" <= VALUES < "{"
ensures that
any values starting with z
, such as za
or zzz
or zzz-ZZZ
, are all included, by using a less-than operator for the smallest
value after all the values starting with z
.
For range-partitioned Kudu tables, an appropriate range must exist before a data value can be created in the table.
Any INSERT
, UPDATE
, or UPSERT
statements fail if they try to
create column values that fall outside the specified ranges. The error checking for ranges is performed on the
Kudu side; Impala passes the specified range information to Kudu, and passes back any error or warning if the
ranges are not valid. (A nonsensical range specification causes an error for a DDL statement, but only a warning
for a DML statement.)
Ranges can be non-contiguous:
partition by range (year) (partition 1885 <= values <= 1889, partition 1893 <= values <= 1897)
partition by range (letter_grade) (partition value = 'A', partition value = 'B',
partition value = 'C', partition value = 'D', partition value = 'F')
The ALTER TABLE
statement with the ADD PARTITION
or
DROP PARTITION
clauses can be used to add or remove ranges from an
existing Kudu table.
ALTER TABLE foo ADD PARTITION 30 <= VALUES < 50;
ALTER TABLE foo DROP PARTITION 1 <= VALUES < 5;
When a range is added, the new range must not overlap with any of the previous ranges; that is, it can only fill in gaps within the previous ranges.
alter table test_scores add range partition value = 'E';
alter table year_ranges add range partition 1890 <= values < 1893;
When a range is removed, all the associated rows in the table are deleted. (This is true whether the table is internal or external.)
alter table test_scores drop range partition value = 'E';
alter table year_ranges drop range partition 1890 <= values < 1893;
Kudu tables can also use a combination of hash and range partitioning.
partition by hash (school) partitions 10,
range (letter_grade) (partition value = 'A', partition value = 'B',
partition value = 'C', partition value = 'D', partition value = 'F')
Working with Partitioning in Kudu Tables
To see the current partitioning scheme for a Kudu table, you can use the SHOW
CREATE TABLE
statement or the SHOW PARTITIONS
statement. The
CREATE TABLE
syntax displayed by this statement includes all the
hash, range, or both clauses that reflect the original table structure plus any
subsequent ALTER TABLE
statements that changed the table structure.
To see the underlying buckets and partitions for a Kudu table, use the
SHOW TABLE STATS
or SHOW PARTITIONS
statement.
Handling Date, Time, or Timestamp Data with Kudu
TIMESTAMP
columns in Kudu tables, instead of representing the date and
time as a BIGINT
value. The behavior of TIMESTAMP
for
Kudu tables has some special considerations:
-
Any nanoseconds in the original 96-bit value produced by Impala are not stored, because Kudu represents date/time columns using 64-bit values. The nanosecond portion of the value is rounded, not truncated. Therefore, a
TIMESTAMP
value that you store in a Kudu table might not be bit-for-bit identical to the value returned by a query. -
The conversion between the Impala 96-bit representation and the Kudu 64-bit representation introduces some performance overhead when reading or writing
TIMESTAMP
columns. You can minimize the overhead during writes by performing inserts through the Kudu API. Because the overhead during reads applies to each query, you might continue to use aBIGINT
column to represent date/time values in performance-critical applications. -
The Impala
TIMESTAMP
type has a narrower range for years than the underlying Kudu data type. Impala can represent years 1400-9999. If year values outside this range are written to a Kudu table by a non-Impala client, Impala returnsNULL
by default when reading thoseTIMESTAMP
values during a query. Or, if theABORT_ON_ERROR
query option is enabled, the query fails when it encounters a value with an out-of-range year.
--- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly))
partition by hash (id) partitions 20,
range (when_exactly)
(
partition '2015-01-01' <= values < '2016-01-01',
partition '2016-01-01' <= values < '2017-01-01',
partition '2017-01-01' <= values < '2018-01-01'
)
stored as kudu;
insert into native_timestamp values (12345, now(), 'Working on doc examples');
select * from native_timestamp;
+-------+-------------------------------+-------------------------+
| id | when_exactly | event |
+-------+-------------------------------+-------------------------+
| 12345 | 2017-05-31 16:27:42.667542000 | Working on doc examples |
+-------+-------------------------------+-------------------------+
Because Kudu tables have some performance overhead to convert TIMESTAMP
columns to the Impala 96-bit internal representation, for performance-critical
applications you might store date/time information as the number
of seconds, milliseconds, or microseconds since the Unix epoch date of January 1,
1970. Specify the column as BIGINT
in the Impala CREATE
TABLE
statement, corresponding to an 8-byte integer (an
int64
) in the underlying Kudu table). Then use Impala date/time
conversion functions as necessary to produce a numeric, TIMESTAMP
,
or STRING
value depending on the context.
For example, the unix_timestamp()
function returns an integer result
representing the number of seconds past the epoch. The now()
function
produces a TIMESTAMP
representing the current date and time, which can
be passed as an argument to unix_timestamp()
. And string literals
representing dates and date/times can be cast to TIMESTAMP
, and from there
converted to numeric values. The following examples show how you might store a date/time
column as BIGINT
in a Kudu table, but still use string literals and
TIMESTAMP
values for convenience.
-- now() returns a TIMESTAMP and shows the format for string literals you can cast to TIMESTAMP.
select now();
+-------------------------------+
| now() |
+-------------------------------+
| 2017-01-25 23:50:10.132385000 |
+-------------------------------+
-- unix_timestamp() accepts either a TIMESTAMP or an equivalent string literal.
select unix_timestamp(now());
+------------------+
| unix_timestamp() |
+------------------+
| 1485386670 |
+------------------+
select unix_timestamp('2017-01-01');
+------------------------------+
| unix_timestamp('2017-01-01') |
+------------------------------+
| 1483228800 |
+------------------------------+
-- Make a table representing a date/time value as BIGINT.
-- Construct 1 range partition and 20 associated hash partitions for each year.
-- Use date/time conversion functions to express the ranges as human-readable dates.
create table time_series(id bigint, when_exactly bigint, event string, primary key (id, when_exactly))
partition by hash (id) partitions 20,
range (when_exactly)
(
partition unix_timestamp('2015-01-01') <= values < unix_timestamp('2016-01-01'),
partition unix_timestamp('2016-01-01') <= values < unix_timestamp('2017-01-01'),
partition unix_timestamp('2017-01-01') <= values < unix_timestamp('2018-01-01')
)
stored as kudu;
-- On insert, we can transform a human-readable date/time into a numeric value.
insert into time_series values (12345, unix_timestamp('2017-01-25 23:24:56'), 'Working on doc examples');
-- On retrieval, we can examine the numeric date/time value or turn it back into a string for readability.
select id, when_exactly, from_unixtime(when_exactly) as 'human-readable date/time', event
from time_series order by when_exactly limit 100;
+-------+--------------+--------------------------+-------------------------+
| id | when_exactly | human-readable date/time | event |
+-------+--------------+--------------------------+-------------------------+
| 12345 | 1485386696 | 2017-01-25 23:24:56 | Working on doc examples |
+-------+--------------+--------------------------+-------------------------+
If you do high-precision arithmetic involving numeric date/time values,
when dividing millisecond values by 1000, or microsecond values by 1 million, always
cast the integer numerator to a DECIMAL
with sufficient precision
and scale to avoid any rounding or loss of precision.
-- 1 million and 1 microseconds = 1.000001 seconds.
select microseconds,
cast (microseconds as decimal(20,7)) / 1e6 as fractional_seconds
from table_with_microsecond_column;
+--------------+----------------------+
| microseconds | fractional_seconds |
+--------------+----------------------+
| 1000001 | 1.000001000000000000 |
+--------------+----------------------+
How Impala Handles Kudu Metadata
By default, much of the metadata for Kudu tables is handled by the underlying storage layer. Kudu tables have less reliance on the Metastore database, and require less metadata caching on the Impala side. For example, information about partitions in Kudu tables is managed by Kudu, and Impala does not cache any block locality metadata for Kudu tables. If the Kudu service is not integrated with the Hive Metastore, Impala will manage Kudu table metadata in the Hive Metastore.
The REFRESH
and INVALIDATE METADATA
statements are
needed less frequently for Kudu tables than for HDFS-backed tables. Neither statement is
needed when data is added to, removed, or updated in a Kudu table, even if the changes
are made directly to Kudu through a client program using the Kudu API. Run
REFRESH table_name
or INVALIDATE METADATA
table_name
for a Kudu table only after making a change to
the Kudu table schema, such as adding or dropping a column.
Because Kudu manages the metadata for its own tables separately from
the metastore database, there is a table name stored in the metastore
database for Impala to use, and a table name on the Kudu side, and
these names can be modified independently through ALTER
TABLE
statements.
To avoid potential name conflicts, the prefix
impala::
and the Impala database name are encoded
into the underlying Kudu table name:
create database some_database;
use some_database;
create table table_name_demo (x int primary key, y int)
partition by hash (x) partitions 2 stored as kudu;
describe formatted table_name_demo;
...
kudu.table_name | impala::some_database.table_name_demo
See Overview of Impala Tables for examples of how to change the name of the Impala table in the metastore database, the name of the underlying Kudu table, or both.
Working with Kudu Integrated with Hive Metastore
Starting from Kudu 1.10 and Impala 3.3, Impala supports Kudu services integrated with the Hive Metastore (HMS). See the HMS integration documentation for more details on Kudu’s Hive Metastore integration.
- When Kudu is integrated with the Hive Metastore, Impala must be configured to use the same HMS as Kudu.
- Since there may be no one-to-one mapping between Kudu tables and external tables, only internal tables are automatically synchronized.
- When you create a table in Kudu, Kudu will create an HMS entry for that table with the internal table type.
- When the Kudu service is integrated with the HMS, internal table
entries will be created automatically in the HMS when tables are
created in Kudu without Impala. To access these tables through
Impala, run
INVALIDATE METADATA
statement so Impala picks up the latest metadata.
Loading Data into Kudu Tables
Kudu tables are well-suited to use cases where data arrives continuously, in small or
moderate volumes. To bring data into Kudu tables, use the Impala INSERT
and UPSERT
statements. The LOAD DATA
statement does
not apply to Kudu tables.
Because Kudu manages its own storage layer that is optimized for smaller block sizes than HDFS, and performs its own housekeeping to keep data evenly distributed, it is not subject to the "many small files" issue and does not need explicit reorganization and compaction as the data grows over time. The partitions within a Kudu table can be specified to cover a variety of possible data distributions, instead of hardcoding a new partition for each new day, hour, and so on, which can lead to inefficient, hard-to-scale, and hard-to-manage partition schemes with HDFS tables.
Your strategy for performing ETL or bulk updates on Kudu tables should take into account the limitations on consistency for DML operations.
Make INSERT
, UPDATE
, and UPSERT
operations idempotent: that is, able to be applied multiple times and still
produce an identical result.
If a bulk operation is in danger of exceeding capacity limits due to timeouts or high memory usage, split it into a series of smaller operations.
Avoid running concurrent ETL operations where the end results depend on precise
ordering. In particular, do not rely on an INSERT ... SELECT
statement
that selects from the same table into which it is inserting, unless you include extra
conditions in the WHERE
clause to avoid reading the newly inserted rows
within the same statement.
Because relationships between tables cannot be enforced by Impala and Kudu, and cannot be committed or rolled back together, do not expect transactional semantics for multi-table operations.
Impala DML Support for Kudu Tables (INSERT, UPDATE, DELETE, UPSERT)
Impala supports certain DML statements for Kudu tables only. The UPDATE
and DELETE
statements let you modify data within Kudu tables without
rewriting substantial amounts of table data. The UPSERT
statement acts
as a combination of INSERT
and UPDATE
, inserting rows
where the primary key does not already exist, and updating the non-primary key columns
where the primary key does already exist in the table.
The INSERT
statement for Kudu tables honors the unique and NOT
NULL
requirements for the primary key columns.
Because Impala and Kudu do not support transactions, the effects of any
INSERT
, UPDATE
, or DELETE
statement
are immediately visible. For example, you cannot do a sequence of
UPDATE
statements and only make the changes visible after all the
statements are finished. Also, if a DML statement fails partway through, any rows that
were already inserted, deleted, or changed remain in the table; there is no rollback
mechanism to undo the changes.
In particular, an INSERT ... SELECT
statement that refers to the table
being inserted into might insert more rows than expected, because the
SELECT
part of the statement sees some of the new rows being inserted
and processes them again.
The LOAD DATA
statement, which involves manipulation of HDFS data files,
does not apply to Kudu tables.
Starting from Impala 2.9, the INSERT
or
UPSERT
operations into Kudu tables automatically add an exchange and a
sort node to the plan that partitions and sorts the rows according to the
partitioning/primary key scheme of the target table (unless the number of rows to be
inserted is small enough to trigger single node execution). Since Kudu partitions and
sorts rows on write, pre-partitioning and sorting takes some of the load off of Kudu and
helps large INSERT
operations to complete without timing out. However,
this default behavior may slow down the end-to-end performance of the
INSERT
or UPSERT
operations. Starting
fromImpala 2.10, you can use the /*
+NOCLUSTERED */
and /* +NOSHUFFLE */
hints together to disable
partitioning and sorting before the rows are sent to Kudu. Additionally, since sorting
may consume a large amount of memory, consider setting the MEM_LIMIT
query option for those queries.
Multi-row Transactions for Kudu Tables
When you use Impala to query Kudu tables, you can insert multiple rows into a Kudu table in a single transaction. This broader transactional support between Kudu and Impala is available to you at a query level and at a session level.
Using Multi-row Transaction Capability
You can control this multi-row transaction feature by using the following query option. You may set this option at per-query or per-session level. When the option is enabled for a session, Impala will open one Kudu transaction for each INSERT or CTAS statement.
set ENABLE_KUDU_TRANSACTION=true
The following example shows how to insert three rows into a table in a single transaction.
Example:
- Create table kudu-test-tbl-1.
create table kudu-test-tbl-1 (a int primary key, b string) partition by hash(a) partitions 8 stored as kudu;
- Enable the multi-row transaction feature at the query
level.
set ENABLE_KUDU_TRANSACTION=true;
- Insert three rows into the newly created table in a single transaction.
insert into kudu-test-tbl-1 values (0, 'a'), (1, 'b'), (2, 'c');
- Verify the number of rows of this table.
select count(*) from kudu-test-tbl-1;
Note:
If you insert multiple rows with duplicate keys into a table, the transaction is aborted.
To ignore the conflicts with duplicate keys during the transaction, start Impala daemons
with the flag --kudu_ignore_conflicts_in_transaction=true
. This flag is set
to False by default. Note that this flag takes effect only if the flag
--kudu_ignore_conflicts
is set as True. The flag
--kudu_ignore_conflicts
is set to True by default.
When you enable the option ENABLE_KUDU_TRANSACTION
, each Impala statement
is executed with a new opened transaction. If the statement is executed successfully, then
the Impala Coordinator commits the transaction. If there is an error returned by Kudu, then
Impala aborts the transaction.
This applies to the following statements:
- INSERT
- CREATE TABLE AS SELECT
Advantages of Using This Capability
You can now easily build and manage Kudu applications, especially when Impala is used to interact with the data in the Kudu table. With multi-row transaction, you can atomically ingest large number of rows into a Kudu table with INSERT-SELECT or CTAS statement.
Limitation
INSERT and CTAS statements are supported for Kudu tables in the context of a multi-row transaction, but UPDATE/UPSERT/DELETE statements are not supported in multi-row transaction as of now.
Consistency Considerations for Kudu Tables
Kudu tables have consistency characteristics such as uniqueness, controlled by the primary key columns, and non-nullable columns. The emphasis for consistency is on preventing duplicate or incomplete data from being stored in a table.
Currently, Kudu does not enforce strong consistency for order of operations, or data that is read while a write operation is in progress. If multi-rows transaction is enabled, insertion of multiple rows in one insertion statement will be atomic, i.e. total success or total failure. But if multi-row transaction is not enabled, changes are applied atomically to each row, not applied as a single unit to all rows affected by a multi-row DML statement.
When multi-row transaction is not enabled and if some rows are rejected during a DML
operation because of a mismatch with duplicate primary key values, NOT NULL
constraints, and so on, the statement succeeds with a warning. Impala still inserts,
deletes, or updates the other rows that are not affected by the constraint violation.
Consequently, the number of rows affected by a DML operation on a Kudu table might be different than you expect.
Because there is no strong consistency guarantee for information being inserted into with
separate INSERT statements, deleted from, or updated across multiple tables simultaneously,
consider denormalizing the data where practical. That is, if you run separate
INSERT
statements to insert related rows into two different tables, one
INSERT
might fail while the other succeeds, leaving the data in an
inconsistent state. Even if both inserts succeed, a join query might happen during the
interval between the completion of the first and second statements, and the query would
encounter incomplete inconsistent data. Denormalizing the data into a single wide table can
reduce the possibility of inconsistency due to multi-table operations.
Information about the number of rows affected by a DML operation is reported in
impala-shell output, and in the PROFILE
output, but is
not currently reported to HiveServer2 clients such as JDBC or ODBC applications.
Security Considerations for Kudu Tables
Security for Kudu tables involves:
-
Ranger authorization.
Access to Kudu tables must be granted to and revoked from principal with the following considerations:- Only users with the
ALL
privilege onSERVER
can create external Kudu tables. - The
ALL
privileges onSERVER
is required to specify thekudu.master_addresses
property in theCREATE TABLE
statements for managed tables as well as external tables. - Access to Kudu tables is enforced at the table level and at the column level.
- The
SELECT
- andINSERT
-specific permissions are supported. - The
DELETE
,UPDATE
, andUPSERT
operations require theALL
privilege.
- Only users with the
-
Kerberos authentication. See Kudu Security for details.
-
TLS encryption. See Kudu Security for details.
-
Lineage tracking.
-
Auditing.
-
Redaction of sensitive information from log files.
Impala Query Performance for Kudu Tables
For queries involving Kudu tables, Impala can delegate much of the work of filtering the
result set to Kudu, avoiding some of the I/O involved in full table scans of tables
containing HDFS data files. This type of optimization is especially effective for
partitioned Kudu tables, where the Impala query WHERE
clause refers to
one or more primary key columns that are also used as partition key columns. For
example, if a partitioned Kudu table uses a HASH
clause for
col1
and a RANGE
clause for col2
, a
query using a clause such as WHERE col1 IN (1,2,3) AND col2 > 100
can determine exactly which tablet servers contain relevant data, and therefore
parallelize the query very efficiently.
In Impala 2.11 and higher, Impala can push down additional
information to optimize join queries involving Kudu tables. If the join clause
contains predicates of the form
column = expression
,
after Impala constructs a hash table of possible matching values for the
join columns from the bigger table (either an HDFS table or a Kudu table), Impala
can "push down" the minimum and maximum matching column values to Kudu,
so that Kudu can more efficiently locate matching rows in the second (smaller) table.
These min/max filters are affected by the RUNTIME_FILTER_MODE
,
RUNTIME_FILTER_WAIT_TIME_MS
, and DISABLE_ROW_RUNTIME_FILTERING
query options; the min/max filters are not affected by the
RUNTIME_BLOOM_FILTER_SIZE
, RUNTIME_FILTER_MIN_SIZE
,
RUNTIME_FILTER_MAX_SIZE
, and MAX_NUM_RUNTIME_FILTERS
query options.
See EXPLAIN Statement for examples of evaluating the effectiveness of the predicate pushdown for a specific query against a Kudu table.
The TABLESAMPLE
clause of the SELECT
statement does
not apply to a table reference derived from a view, a subquery, or anything other than a
real base table. This clause only works for tables backed by HDFS or HDFS-like data
files, therefore it does not apply to Kudu or HBase tables.