This section includes tutorial scenarios that demonstrate how to begin using Impala once the software is installed. It focuses on techniques for loading data, because once you have some data in tables and can query that data, you can quickly progress to more advanced Impala features.
Where practical, the tutorials take you from "ground zero" to having the desired Impala tables and data. In some cases, you might need to download additional files from outside sources, set up additional software components, modify commands or scripts to fit your own configuration, or substitute your own sample data.
Before trying these tutorial lessons, install Impala using one of these procedures:
These tutorials demonstrate the basics of using Impala. They are intended for first-time users, and for trying out Impala on any new cluster to make sure the major components are working correctly.
This tutorial demonstrates techniques for finding your way around the tables and databases of an unfamiliar (possibly empty) Impala instance.
When you connect to an Impala instance for the first time, you use the SHOW DATABASES
and SHOW TABLES
statements to view the most common types of objects. Also, call the
version()
function to confirm which version of Impala you are running; the version
number is important when consulting documentation and dealing with support issues.
A completely empty Impala instance contains no tables, but still has two databases:
default
, where new tables are created when you do not specify any other database.
_impala_builtins
, a system database used to hold all the built-in functions.
The following example shows how to see the available databases, and the tables in each. If the list of databases or tables is long, you can use wildcard notation to locate specific databases or tables based on their names.
$ impala-shell -i localhost --quiet
Starting Impala Shell without Kerberos authentication
Welcome to the Impala shell. Press TAB twice to see a list of available commands.
...
(Shell
build version: Impala Shell v3.4.x (hash) built on
date)
[localhost:21000] > select version();
+-------------------------------------------
| version()
+-------------------------------------------
| impalad version ...
| Built on ...
+-------------------------------------------
[localhost:21000] > show databases;
+--------------------------+
| name |
+--------------------------+
| _impala_builtins |
| ctas |
| d1 |
| d2 |
| d3 |
| default |
| explain_plans |
| external_table |
| file_formats |
| tpc |
+--------------------------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| default |
+--------------------+
[localhost:21000] > show tables;
+-------+
| name |
+-------+
| ex_t |
| t1 |
+-------+
[localhost:21000] > show tables in d3;
[localhost:21000] > show tables in tpc;
+------------------------+
| name |
+------------------------+
| city |
| customer |
| customer_address |
| customer_demographics |
| household_demographics |
| item |
| promotion |
| store |
| store2 |
| store_sales |
| ticket_view |
| time_dim |
| tpc_tables |
+------------------------+
[localhost:21000] > show tables in tpc like 'customer*';
+-----------------------+
| name |
+-----------------------+
| customer |
| customer_address |
| customer_demographics |
+-----------------------+
Once you know what tables and databases are available, you descend into a database with the
USE
statement. To understand the structure of each table, you use the
DESCRIBE
command. Once inside a database, you can issue statements such as
INSERT
and SELECT
that operate on particular tables.
The following example explores a database named TPC
whose name we learned in the
previous example. It shows how to filter the table names within a database based on a search string,
examine the columns of a table, and run queries to examine the characteristics of the table data. For
example, for an unfamiliar table you might want to know the number of rows, the number of different
values for a column, and other properties such as whether the column contains any NULL
values. When sampling the actual data values from a table, use a LIMIT
clause to avoid
excessive output if the table contains more rows or distinct values than you expect.
[localhost:21000] > use tpc;
[localhost:21000] > show tables like '*view*';
+-------------+
| name |
+-------------+
| ticket_view |
+-------------+
[localhost:21000] > describe city;
+-------------+--------+---------+
| name | type | comment |
+-------------+--------+---------+
| id | int | |
| name | string | |
| countrycode | string | |
| district | string | |
| population | int | |
+-------------+--------+---------+
[localhost:21000] > select count(*) from city;
+----------+
| count(*) |
+----------+
| 0 |
+----------+
[localhost:21000] > desc customer;
+------------------------+--------+---------+
| name | type | comment |
+------------------------+--------+---------+
| c_customer_sk | int | |
| c_customer_id | string | |
| c_current_cdemo_sk | int | |
| c_current_hdemo_sk | int | |
| c_current_addr_sk | int | |
| c_first_shipto_date_sk | int | |
| c_first_sales_date_sk | int | |
| c_salutation | string | |
| c_first_name | string | |
| c_last_name | string | |
| c_preferred_cust_flag | string | |
| c_birth_day | int | |
| c_birth_month | int | |
| c_birth_year | int | |
| c_birth_country | string | |
| c_login | string | |
| c_email_address | string | |
| c_last_review_date | string | |
+------------------------+--------+---------+
[localhost:21000] > select count(*) from customer;
+----------+
| count(*) |
+----------+
| 100000 |
+----------+
[localhost:21000] > select count(distinct c_birth_month) from customer;
+-------------------------------+
| count(distinct c_birth_month) |
+-------------------------------+
| 12 |
+-------------------------------+
[localhost:21000] > select count(*) from customer where c_email_address is null;
+----------+
| count(*) |
+----------+
| 0 |
+----------+
[localhost:21000] > select distinct c_salutation from customer limit 10;
+--------------+
| c_salutation |
+--------------+
| Mr. |
| Ms. |
| Dr. |
| |
| Miss |
| Sir |
| Mrs. |
+--------------+
When you graduate from read-only exploration, you use statements such as CREATE DATABASE
and CREATE TABLE
to set up your own database objects.
The following example demonstrates creating a new database holding a new table. Although the last example
ended inside the TPC
database, the new EXPERIMENTS
database is not
nested inside TPC
; all databases are arranged in a single top-level list.
[localhost:21000] > create database experiments;
[localhost:21000] > show databases;
+--------------------------+
| name |
+--------------------------+
| _impala_builtins |
| ctas |
| d1 |
| d2 |
| d3 |
| default |
| experiments |
| explain_plans |
| external_table |
| file_formats |
| tpc |
+--------------------------+
[localhost:21000] > show databases like 'exp*';
+---------------+
| name |
+---------------+
| experiments |
| explain_plans |
+---------------+
The following example creates a new table, T1
. To illustrate a common mistake, it creates this table inside
the wrong database, the TPC
database where the previous example ended. The ALTER
TABLE
statement lets you move the table to the intended database, EXPERIMENTS
, as part of a rename operation.
The USE
statement is always needed to switch to a new database, and the
current_database()
function confirms which database the session is in, to avoid these
kinds of mistakes.
[localhost:21000] > create table t1 (x int);
[localhost:21000] > show tables;
+------------------------+
| name |
+------------------------+
| city |
| customer |
| customer_address |
| customer_demographics |
| household_demographics |
| item |
| promotion |
| store |
| store2 |
| store_sales |
| t1 |
| ticket_view |
| time_dim |
| tpc_tables |
+------------------------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| tpc |
+--------------------+
[localhost:21000] > alter table t1 rename to experiments.t1;
[localhost:21000] > use experiments;
[localhost:21000] > show tables;
+------+
| name |
+------+
| t1 |
+------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| experiments |
+--------------------+
For your initial experiments with tables, you can use ones with just a few columns and a few rows, and text-format data files.
LOAD DATA
or INSERT ... SELECT
statements to operate on millions or billions of rows at once.
The following example sets up a couple of simple tables with a few rows, and performs queries involving sorting, aggregate functions and joins.
[localhost:21000] > insert into t1 values (1), (3), (2), (4);
[localhost:21000] > select x from t1 order by x desc;
+---+
| x |
+---+
| 4 |
| 3 |
| 2 |
| 1 |
+---+
[localhost:21000] > select min(x), max(x), sum(x), avg(x) from t1;
+--------+--------+--------+--------+
| min(x) | max(x) | sum(x) | avg(x) |
+--------+--------+--------+--------+
| 1 | 4 | 10 | 2.5 |
+--------+--------+--------+--------+
[localhost:21000] > create table t2 (id int, word string);
[localhost:21000] > insert into t2 values (1, "one"), (3, "three"), (5, 'five');
[localhost:21000] > select word from t1 join t2 on (t1.x = t2.id);
+-------+
| word |
+-------+
| one |
| three |
+-------+
After completing this tutorial, you should now know:
This scenario illustrates how to create some very small tables, suitable for first-time users to
experiment with Impala SQL features. TAB1
and TAB2
are loaded with data
from files in HDFS. A subset of data is copied from TAB1
into TAB3
.
Populate HDFS with the data you want to query. To begin this process, create one or more new
subdirectories underneath your user directory in HDFS. The data for each table resides in a separate
subdirectory. Substitute your own username for username
where appropriate. This example
uses the -p
option with the mkdir
operation to create any necessary
parent directories if they do not already exist.
$ whoami
username
$ hdfs dfs -ls /user
Found 3 items
drwxr-xr-x - username username 0 2013-04-22 18:54 /user/username
drwxrwx--- - mapred mapred 0 2013-03-15 20:11 /user/history
drwxr-xr-x - hue supergroup 0 2013-03-15 20:10 /user/hive
$ hdfs dfs -mkdir -p /user/username/sample_data/tab1 /user/username/sample_data/tab2
Here is some sample data, for two tables named TAB1
and TAB2
.
Copy the following content to .csv
files in your local filesystem:
tab1.csv:
1,true,123.123,2012-10-24 08:55:00
2,false,1243.5,2012-10-25 13:40:00
3,false,24453.325,2008-08-22 09:33:21.123
4,false,243423.325,2007-05-12 22:32:21.33454
5,true,243.325,1953-04-22 09:11:33
tab2.csv:
1,true,12789.123
2,false,1243.5
3,false,24453.325
4,false,2423.3254
5,true,243.325
60,false,243565423.325
70,true,243.325
80,false,243423.325
90,true,243.325
Put each .csv
file into a separate HDFS directory using commands like the following,
which use paths available in the Impala Demo VM:
$ hdfs dfs -put tab1.csv /user/username/sample_data/tab1
$ hdfs dfs -ls /user/username/sample_data/tab1
Found 1 items
-rw-r--r-- 1 username username 192 2013-04-02 20:08 /user/username/sample_data/tab1/tab1.csv
$ hdfs dfs -put tab2.csv /user/username/sample_data/tab2
$ hdfs dfs -ls /user/username/sample_data/tab2
Found 1 items
-rw-r--r-- 1 username username 158 2013-04-02 20:09 /user/username/sample_data/tab2/tab2.csv
The name of each data file is not significant. In fact, when Impala examines the contents of the data directory for the first time, it considers all files in the directory to make up the data of the table, regardless of how many files there are or what the files are named.
To understand what paths are available within your own HDFS filesystem and what the permissions are for
the various directories and files, issue hdfs dfs -ls /
and work your way down the tree
doing -ls
operations for the various directories.
Use the impala-shell
command to create tables, either interactively or through a SQL
script.
The following example shows creating three tables. For each table, the example shows creating columns
with various attributes such as Boolean or integer types. The example also includes commands that provide
information about how the data is formatted, such as rows terminating with commas, which makes sense in
the case of importing data from a .csv
file. Where we already have .csv
files containing data in the HDFS directory tree, we specify the location of the directory containing the
appropriate .csv
file. Impala considers all the data from all the files in that
directory to represent the data for the table.
DROP TABLE IF EXISTS tab1;
-- The EXTERNAL clause means the data is located outside the central location
-- for Impala data files and is preserved when the associated Impala table is dropped.
-- We expect the data to already exist in the directory specified by the LOCATION clause.
CREATE EXTERNAL TABLE tab1
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE,
col_3 TIMESTAMP
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/username/sample_data/tab1';
DROP TABLE IF EXISTS tab2;
-- TAB2 is an external table, similar to TAB1.
CREATE EXTERNAL TABLE tab2
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/username/sample_data/tab2';
DROP TABLE IF EXISTS tab3;
-- Leaving out the EXTERNAL clause means the data will be managed
-- in the central Impala data directory tree. Rather than reading
-- existing data files when the table is created, we load the
-- data after creating the table.
CREATE TABLE tab3
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE,
month INT,
day INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
CREATE TABLE
statements successfully is an important validation
step to confirm everything is configured correctly with the Hive metastore and HDFS permissions. If you
receive any errors during the CREATE TABLE
statements:
hive.metastore.warehouse.dir
property points to a directory that
Impala can write to. The ownership should be hive:hive
, and the
impala
user should also be a member of the hive
group.
A convenient way to set up data for Impala to access is to use an external table, where the data already
exists in a set of HDFS files and you just point the Impala table at the directory containing those
files. For example, you might run in impala-shell
a *.sql
file with
contents similar to the following, to create an Impala table that accesses an existing data file used by
Hive.
The following examples set up 2 tables, referencing the paths and sample data from the sample TPC-DS kit for Impala. For historical reasons, the data physically resides in an HDFS directory tree under /user/hive, although this particular data is entirely managed by Impala rather than Hive. When we create an external table, we specify the directory containing one or more data files, and Impala queries the combined content of all the files inside that directory. Here is how we examine the directories and files within the HDFS filesystem:
$ cd ~/username/datasets
$ ./tpcds-setup.sh
... Downloads and unzips the kit, builds the data and loads it into HDFS ...
$ hdfs dfs -ls /user/hive/tpcds/customer
Found 1 items
-rw-r--r-- 1 username supergroup 13209372 2013-03-22 18:09 /user/hive/tpcds/customer/customer.dat
$ hdfs dfs -cat /user/hive/tpcds/customer/customer.dat | more
1|AAAAAAAABAAAAAAA|980124|7135|32946|2452238|2452208|Mr.|Javier|Lewis|Y|9|12|1936|CHILE||Javie
r.Lewis@VFAxlnZEvOx.org|2452508|
2|AAAAAAAACAAAAAAA|819667|1461|31655|2452318|2452288|Dr.|Amy|Moses|Y|9|4|1966|TOGO||Amy.Moses@
Ovk9KjHH.com|2452318|
3|AAAAAAAADAAAAAAA|1473522|6247|48572|2449130|2449100|Miss|Latisha|Hamilton|N|18|9|1979|NIUE||
Latisha.Hamilton@V.com|2452313|
4|AAAAAAAAEAAAAAAA|1703214|3986|39558|2450030|2450000|Dr.|Michael|White|N|7|6|1983|MEXICO||Mic
hael.White@i.org|2452361|
5|AAAAAAAAFAAAAAAA|953372|4470|36368|2449438|2449408|Sir|Robert|Moran|N|8|5|1956|FIJI||Robert.
Moran@Hh.edu|2452469|
...
Here is a SQL script to set up Impala tables pointing to some of these data files in HDFS. (The script in the VM sets up tables like this through Hive; ignore those tables for purposes of this demonstration.) Save the following as customer_setup.sql:
--
-- store_sales fact table and surrounding dimension tables only
--
create database tpcds;
use tpcds;
drop table if exists customer;
create external table customer
(
c_customer_sk int,
c_customer_id string,
c_current_cdemo_sk int,
c_current_hdemo_sk int,
c_current_addr_sk int,
c_first_shipto_date_sk int,
c_first_sales_date_sk int,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string,
c_last_review_date string
)
row format delimited fields terminated by '|'
location '/user/hive/tpcds/customer';
drop table if exists customer_address;
create external table customer_address
(
ca_address_sk int,
ca_address_id string,
ca_street_number string,
ca_street_name string,
ca_street_type string,
ca_suite_number string,
ca_city string,
ca_county string,
ca_state string,
ca_zip string,
ca_country string,
ca_gmt_offset float,
ca_location_type string
)
row format delimited fields terminated by '|'
location '/user/hive/tpcds/customer_address';
impala-shell -i localhost -f customer_setup.sql
Now that you have updated the database metadata that Impala caches, you can confirm that the expected
tables are accessible by Impala and examine the attributes of one of the tables. We created these tables
in the database named default
. If the tables were in a database other than the default,
we would issue a command use db_name
to switch to that database
before examining or querying its tables. We could also qualify the name of a table by prepending the
database name, for example default.customer
and default.customer_name
.
[impala-host:21000] > show databases
Query finished, fetching results ...
default
Returned 1 row(s) in 0.00s
[impala-host:21000] > show tables
Query finished, fetching results ...
customer
customer_address
Returned 2 row(s) in 0.00s
[impala-host:21000] > describe customer_address
+------------------+--------+---------+
| name | type | comment |
+------------------+--------+---------+
| ca_address_sk | int | |
| ca_address_id | string | |
| ca_street_number | string | |
| ca_street_name | string | |
| ca_street_type | string | |
| ca_suite_number | string | |
| ca_city | string | |
| ca_county | string | |
| ca_state | string | |
| ca_zip | string | |
| ca_country | string | |
| ca_gmt_offset | float | |
| ca_location_type | string | |
+------------------+--------+---------+
Returned 13 row(s) in 0.01
You can query data contained in the tables. Impala coordinates the query execution across a single node or multiple nodes depending on your configuration, without the overhead of running MapReduce jobs to perform the intermediate processing.
There are a variety of ways to execute queries on Impala:
impala-shell
command in interactive mode:
$ impala-shell -i impala-host
Connected to localhost:21000
[impala-host:21000] > select count(*) from customer_address;
50000
Returned 1 row(s) in 0.37s
$ impala-shell -i impala-host -f myquery.sql
Connected to localhost:21000
50000
Returned 1 row(s) in 0.19s
impala-shell
command. The query is executed, the
results are returned, and the shell exits. Make sure to quote the command, preferably with single
quotation marks to avoid shell expansion of characters such as *
.
$ impala-shell -i impala-host -q 'select count(*) from customer_address'
Connected to localhost:21000
50000
Returned 1 row(s) in 0.29s
This section describes how to create some sample tables and load data into them. These tables can then be queried using the Impala shell.
Loading data involves:
.csv
files.
To run these sample queries, create a SQL query file query.sql
, copy and paste each
query into the query file, and then run the query file using the shell. For example, to run
query.sql
on impala-host
, you might use the command:
impala-shell.sh -i impala-host -f query.sql
The examples and results below assume you have loaded the sample data into the tables as described above.
Let's start by verifying that the tables do contain the data we expect. Because Impala often deals
with tables containing millions or billions of rows, when examining tables of unknown size, include
the LIMIT
clause to avoid huge amounts of unnecessary output, as in the final query.
(If your interactive query starts displaying an unexpected volume of data, press
Ctrl-C
in impala-shell
to cancel the query.)
SELECT * FROM tab1;
SELECT * FROM tab2;
SELECT * FROM tab2 LIMIT 5;
Results:
+----+-------+------------+-------------------------------+
| id | col_1 | col_2 | col_3 |
+----+-------+------------+-------------------------------+
| 1 | true | 123.123 | 2012-10-24 08:55:00 |
| 2 | false | 1243.5 | 2012-10-25 13:40:00 |
| 3 | false | 24453.325 | 2008-08-22 09:33:21.123000000 |
| 4 | false | 243423.325 | 2007-05-12 22:32:21.334540000 |
| 5 | true | 243.325 | 1953-04-22 09:11:33 |
+----+-------+------------+-------------------------------+
+----+-------+---------------+
| id | col_1 | col_2 |
+----+-------+---------------+
| 1 | true | 12789.123 |
| 2 | false | 1243.5 |
| 3 | false | 24453.325 |
| 4 | false | 2423.3254 |
| 5 | true | 243.325 |
| 60 | false | 243565423.325 |
| 70 | true | 243.325 |
| 80 | false | 243423.325 |
| 90 | true | 243.325 |
+----+-------+---------------+
+----+-------+-----------+
| id | col_1 | col_2 |
+----+-------+-----------+
| 1 | true | 12789.123 |
| 2 | false | 1243.5 |
| 3 | false | 24453.325 |
| 4 | false | 2423.3254 |
| 5 | true | 243.325 |
+----+-------+-----------+
SELECT tab1.col_1, MAX(tab2.col_2), MIN(tab2.col_2)
FROM tab2 JOIN tab1 USING (id)
GROUP BY col_1 ORDER BY 1 LIMIT 5;
Results:
+-------+-----------------+-----------------+
| col_1 | max(tab2.col_2) | min(tab2.col_2) |
+-------+-----------------+-----------------+
| false | 24453.325 | 1243.5 |
| true | 12789.123 | 243.325 |
+-------+-----------------+-----------------+
SELECT tab2.*
FROM tab2,
(SELECT tab1.col_1, MAX(tab2.col_2) AS max_col2
FROM tab2, tab1
WHERE tab1.id = tab2.id
GROUP BY col_1) subquery1
WHERE subquery1.max_col2 = tab2.col_2;
Results:
+----+-------+-----------+
| id | col_1 | col_2 |
+----+-------+-----------+
| 1 | true | 12789.123 |
| 3 | false | 24453.325 |
+----+-------+-----------+
INSERT OVERWRITE TABLE tab3
SELECT id, col_1, col_2, MONTH(col_3), DAYOFMONTH(col_3)
FROM tab1 WHERE YEAR(col_3) = 2012;
Query TAB3
to check the result:
SELECT * FROM tab3;
Results:
+----+-------+---------+-------+-----+
| id | col_1 | col_2 | month | day |
+----+-------+---------+-------+-----+
| 1 | true | 123.123 | 10 | 24 |
| 2 | false | 1243.5 | 10 | 25 |
+----+-------+---------+-------+-----+
These tutorials walk you through advanced scenarios or specialized features.
This tutorial shows how you might set up a directory tree in HDFS, put data files into the lowest-level subdirectories, and then use an Impala external table to query the data files from their original locations.
The tutorial uses a table with web log data, with separate subdirectories for the year, month, day, and host. For simplicity, we use a tiny amount of CSV data, loading the same data into each partition.
First, we make an Impala partitioned table for CSV data, and look at the underlying HDFS directory
structure to understand the directory structure to re-create elsewhere in HDFS. The columns
field1
, field2
, and field3
correspond to the contents
of the CSV data files. The year
, month
, day
, and
host
columns are all represented as subdirectories within the table structure, and are
not part of the CSV files. We use STRING
for each of these columns so that we can
produce consistent subdirectory names, with leading zeros for a consistent length.
create database external_partitions;
use external_partitions;
create table logs (field1 string, field2 string, field3 string)
partitioned by (year string, month string , day string, host string)
row format delimited fields terminated by ',';
insert into logs partition (year="2013", month="07", day="28", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="28", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="08", day="01", host="host1") values ("foo","foo","foo");
Back in the Linux shell, we examine the HDFS directory structure. (Your Impala data directory might be in
a different location; for historical reasons, it is sometimes under the HDFS path
/user/hive/warehouse.) We use the hdfs dfs -ls
command to examine
the nested subdirectories corresponding to each partitioning column, with separate subdirectories at each
level (with =
in their names) representing the different values for each partitioning
column. When we get to the lowest level of subdirectory, we use the hdfs dfs -cat
command to examine the data file and see CSV-formatted data produced by the INSERT
statement in Impala.
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db
Found 1 items
drwxrwxrwt - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs
Found 1 items
drwxr-xr-x - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs/year=2013
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:23 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07
drwxr-xr-x - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=08
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:22 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28
drwxr-xr-x - impala hive 0 2013-08-07 12:23 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=29
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:21 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host1
drwxr-xr-x - impala hive 0 2013-08-07 12:22 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host1
Found 1 items
-rw-r--r-- 3 impala hive 12 2013-08-07 12:21 /user/impala/warehouse/external_partiti
ons.db/logs/year=2013/month=07/day=28/host=host1/3981726974111751120--8907184999369517436_822630111_data.0
$ hdfs dfs -cat /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/\
host=host1/3981726974111751120--8 907184999369517436_822630111_data.0
foo,foo,foo
Still in the Linux shell, we use hdfs dfs -mkdir
to create several data directories
outside the HDFS directory tree that Impala controls (/user/impala/warehouse in this
example, maybe different in your case). Depending on your configuration, you might need to log in as a
user with permission to write into this HDFS directory tree; for example, the commands shown here were
run while logged in as the hdfs
user.
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=29/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=08/day=01/host=host1
We make a tiny CSV file, with values different than in the INSERT
statements used
earlier, and put a copy within each subdirectory that we will use as an Impala partition.
$ cat >dummy_log_data
bar,baz,bletch
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=08/day=01/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=28/host=host2
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=29/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=29/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=08/day=01/host=host1
Back in the impala-shell interpreter, we move the original Impala-managed table aside,
and create a new external table with a LOCATION
clause pointing to the directory
under which we have set up all the partition subdirectories and data files.
use external_partitions;
alter table logs rename to logs_original;
create external table logs (field1 string, field2 string, field3 string)
partitioned by (year string, month string, day string, host string)
row format delimited fields terminated by ','
location '/user/impala/data/logs';
Because partition subdirectories and data files come and go during the data lifecycle, you must identify
each of the partitions through an ALTER TABLE
statement before Impala recognizes the
data files they contain.
alter table logs add partition (year="2013",month="07",day="28",host="host1")
alter table log_type add partition (year="2013",month="07",day="28",host="host2");
alter table log_type add partition (year="2013",month="07",day="29",host="host1");
alter table log_type add partition (year="2013",month="08",day="01",host="host1");
We issue a REFRESH
statement for the table, always a safe practice when data files have
been manually added, removed, or changed. Then the data is ready to be queried. The SELECT
*
statement illustrates that the data from our trivial CSV file was recognized in each of the
partitions where we copied it. Although in this case there are only a few rows, we include a
LIMIT
clause on this test query just in case there is more data than we expect.
refresh log_type;
select * from log_type limit 100;
+--------+--------+--------+------+-------+-----+-------+
| field1 | field2 | field3 | year | month | day | host |
+--------+--------+--------+------+-------+-----+-------+
| bar | baz | bletch | 2013 | 07 | 28 | host1 |
| bar | baz | bletch | 2013 | 08 | 01 | host1 |
| bar | baz | bletch | 2013 | 07 | 29 | host1 |
| bar | baz | bletch | 2013 | 07 | 28 | host2 |
+--------+--------+--------+------+-------+-----+-------+
Sometimes, you might find it convenient to switch to the Hive shell to perform some data loading or transformation operation, particularly on file formats such as RCFile, SequenceFile, and Avro that Impala currently can query but not write to.
Whenever you create, drop, or alter a table or other kind of object through Hive, the next time you
switch back to the impala-shell interpreter, issue a one-time INVALIDATE
METADATA
statement so that Impala recognizes the new or changed object.
Whenever you load, insert, or change data in an existing table through Hive (or even through manual HDFS
operations such as the hdfs command), the next time you switch back to the
impala-shell interpreter, issue a one-time REFRESH
table_name
statement so that Impala recognizes the new or changed data.
For examples showing how this process works for the REFRESH
statement, look at the
examples of creating RCFile and SequenceFile tables in Impala, loading data through Hive, and then
querying the data through Impala. See Using the RCFile File Format with Impala Tables and
Using the SequenceFile File Format with Impala Tables for those examples.
For examples showing how this process works for the INVALIDATE METADATA
statement, look
at the example of creating and loading an Avro table in Hive, and then querying the data through Impala.
See Using the Avro File Format with Impala Tables for that example.
Originally, Impala did not support UDFs, but this feature is available in Impala starting in Impala
1.2. Some INSERT ... SELECT
transformations that you originally did through Hive can
now be done through Impala. See User-Defined Functions (UDFs) for details.
Prior to Impala 1.2, the REFRESH
and INVALIDATE METADATA
statements
needed to be issued on each Impala node to which you connected and issued queries. In Impala 1.2 and
higher, when you issue either of those statements on any Impala node, the results are broadcast to all
the Impala nodes in the cluster, making it truly a one-step operation after each round of DDL or ETL
operations in Hive.
Originally, Impala restricted join queries so that they had to include at least one equality comparison between the columns of the tables on each side of the join operator. With the huge tables typically processed by Impala, any miscoded query that produced a full Cartesian product as a result set could consume a huge amount of cluster resources.
In Impala 1.2.2 and higher, this restriction is lifted when you use the CROSS JOIN
operator in the query. You still cannot remove all WHERE
clauses from a query like
SELECT * FROM t1 JOIN t2
to produce all combinations of rows from both tables. But you
can use the CROSS JOIN
operator to explicitly request such a Cartesian product.
Typically, this operation is applicable for smaller tables, where the result set still fits within the
memory of a single Impala node.
The following example sets up data for use in a series of comic books where characters battle each other. At first, we use an equijoin query, which only allows characters from the same time period and the same planet to meet.
[localhost:21000] > create table heroes (name string, era string, planet string);
[localhost:21000] > create table villains (name string, era string, planet string);
[localhost:21000] > insert into heroes values
> ('Tesla','20th century','Earth'),
> ('Pythagoras','Antiquity','Earth'),
> ('Zopzar','Far Future','Mars');
Inserted 3 rows in 2.28s
[localhost:21000] > insert into villains values
> ('Caligula','Antiquity','Earth'),
> ('John Dillinger','20th century','Earth'),
> ('Xibulor','Far Future','Venus');
Inserted 3 rows in 1.93s
[localhost:21000] > select concat(heroes.name,' vs. ',villains.name) as battle
> from heroes join villains
> where heroes.era = villains.era and heroes.planet = villains.planet;
+--------------------------+
| battle |
+--------------------------+
| Tesla vs. John Dillinger |
| Pythagoras vs. Caligula |
+--------------------------+
Returned 2 row(s) in 0.47s
Readers demanded more action, so we added elements of time travel and space travel so that any hero could face any villain. Prior to Impala 1.2.2, this type of query was impossible because all joins had to reference matching values between the two tables:
[localhost:21000] > -- Cartesian product not possible in Impala 1.1.
> select concat(heroes.name,' vs. ',villains.name) as battle from heroes join villains;
ERROR: NotImplementedException: Join between 'heroes' and 'villains' requires at least one conjunctive equality predicate between the two tables
With Impala 1.2.2, we rewrite the query slightly to use CROSS JOIN
rather than
JOIN
, and now the result set includes all combinations:
[localhost:21000] > -- Cartesian product available in Impala 1.2.2 with the CROSS JOIN syntax.
> select concat(heroes.name,' vs. ',villains.name) as battle from heroes cross join villains;
+-------------------------------+
| battle |
+-------------------------------+
| Tesla vs. Caligula |
| Tesla vs. John Dillinger |
| Tesla vs. Xibulor |
| Pythagoras vs. Caligula |
| Pythagoras vs. John Dillinger |
| Pythagoras vs. Xibulor |
| Zopzar vs. Caligula |
| Zopzar vs. John Dillinger |
| Zopzar vs. Xibulor |
+-------------------------------+
Returned 9 row(s) in 0.33s
The full combination of rows from both tables is known as the Cartesian product. This type of result set
is often used for creating grid data structures. You can also filter the result set by including
WHERE
clauses that do not explicitly compare columns between the two tables. The
following example shows how you might produce a list of combinations of year and quarter for use in a
chart, and then a shorter list with only selected quarters.
[localhost:21000] > create table x_axis (x int);
[localhost:21000] > create table y_axis (y int);
[localhost:21000] > insert into x_axis values (1),(2),(3),(4);
Inserted 4 rows in 2.14s
[localhost:21000] > insert into y_axis values (2010),(2011),(2012),(2013),(2014);
Inserted 5 rows in 1.32s
[localhost:21000] > select y as year, x as quarter from x_axis cross join y_axis;
+------+---------+
| year | quarter |
+------+---------+
| 2010 | 1 |
| 2011 | 1 |
| 2012 | 1 |
| 2013 | 1 |
| 2014 | 1 |
| 2010 | 2 |
| 2011 | 2 |
| 2012 | 2 |
| 2013 | 2 |
| 2014 | 2 |
| 2010 | 3 |
| 2011 | 3 |
| 2012 | 3 |
| 2013 | 3 |
| 2014 | 3 |
| 2010 | 4 |
| 2011 | 4 |
| 2012 | 4 |
| 2013 | 4 |
| 2014 | 4 |
+------+---------+
Returned 20 row(s) in 0.38s
[localhost:21000] > select y as year, x as quarter from x_axis cross join y_axis where x in (1,3);
+------+---------+
| year | quarter |
+------+---------+
| 2010 | 1 |
| 2011 | 1 |
| 2012 | 1 |
| 2013 | 1 |
| 2014 | 1 |
| 2010 | 3 |
| 2011 | 3 |
| 2012 | 3 |
| 2013 | 3 |
| 2014 | 3 |
+------+---------+
Returned 10 row(s) in 0.39s
As data pipelines start to include more aspects such as NoSQL or loosely specified schemas, you might encounter situations where you have data files (particularly in Parquet format) where you do not know the precise table definition. This tutorial shows how you can build an Impala table around data that comes from non-Impala or even non-SQL sources, where you do not have control of the table layout and might not be familiar with the characteristics of the data.
The data used in this tutorial represents airline on-time arrival statistics, from October 1987 through April 2008. See the details on the 2009 ASA Data Expo web site. You can also see the explanations of the columns; for purposes of this exercise, wait until after following the tutorial before examining the schema, to better simulate a real-life situation where you cannot rely on assumptions and assertions about the ranges and representations of data values.
First, we download and unpack the data files. There are 8 files totalling 1.4 GB.
$ wget -O airlines_parquet.tar.gz https://home.apache.org/~arodoni/airlines_parquet.tar.gz
$ wget https://home.apache.org/~arodoni/airlines_parquet.tar.gz.sha512
$ shasum -a 512 -c airlines_parquet.tar.gz.sha512
airlines_parquet.tar.gz: OK
$ tar xvzf airlines_parquet.tar.gz
$ cd airlines_parquet/
$ du -kch *.parq
253M 4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq
14M 4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq
253M 4345e5eef217aa1b-c8f16177f35fd984_501176748_data.0.parq
64M 4345e5eef217aa1b-c8f16177f35fd984_501176748_data.1.parq
184M 4345e5eef217aa1b-c8f16177f35fd985_1199995767_data.0.parq
241M 4345e5eef217aa1b-c8f16177f35fd986_2086627597_data.0.parq
212M 4345e5eef217aa1b-c8f16177f35fd987_1048668565_data.0.parq
152M 4345e5eef217aa1b-c8f16177f35fd988_1432111844_data.0.parq
1.4G total
Next, we put the Parquet data files in HDFS, all together in a single
directory, with permissions on the directory and the files so that the
impala
user will be able to read them.
After unpacking, we saw the largest Parquet file was 253 MB. When
copying Parquet files into HDFS for Impala to use, for maximum query
performance, make sure that each file resides in a single HDFS data
block. Therefore, we pick a size larger than any single file and
specify that as the block size, using the argument
-Ddfs.block.size=253m
on the hdfs dfs
-put
command.
$ sudo -u hdfs hdfs dfs -mkdir -p /user/impala/staging/airlines
$ sudo -u hdfs hdfs dfs -Ddfs.block.size=253m -put *.parq /user/impala/staging/airlines
$ sudo -u hdfs hdfs dfs -ls /user/impala/staging
Found 1 items
$ sudo -u hdfs hdfs dfs -ls /user/impala/staging/airlines
Found 8 items
CREATE EXTERNAL
syntax and the
LOCATION
attribute point Impala at the
appropriate HDFS directory.LIKE PARQUET
'path_to_any_parquet_file'
clause
means we skip the list of column names and types; Impala
automatically gets the column names and data types straight from
the data files. (Currently, this technique only works for Parquet
files.) READ_WRITE
access to the files in HDFS; the impala
user can
read the files, which will be sufficient for us to experiment with
queries and perform some copy and transform operations into other
tables. $ impala-shell
> CREATE DATABASE airlines_data;
USE airlines_data;
CREATE EXTERNAL TABLE airlines_external
LIKE PARQUET 'hdfs:staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq'
STORED AS PARQUET LOCATION 'hdfs:staging/airlines';
WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://myhost.com:8020/user/impala/staging'
SHOW TABLE STATS
statement gives a very
high-level summary of the table, showing how many files and how
much total data it contains. Also, it confirms that the table is
expecting all the associated data files to be in Parquet format.
(The ability to work with all kinds of HDFS data files in
different formats means that it is possible to have a mismatch
between the format of the data files, and the format that the
table expects the data files to be in.) SHOW FILES
statement confirms that the data
in the table has the expected number, names, and sizes of the
original Parquet files.DESCRIBE
statement (or its abbreviation
DESC
) confirms the names and types of the
columns that Impala automatically created after reading that
metadata from the Parquet file. DESCRIBE FORMATTED
statement prints out
some extra detail along with the column definitions. The pieces we
care about for this exercise are: > SHOW TABLE STATS airlines_external;
+-------+--------+--------+--------------+-------------------+---------+-------------------+-------------------------------+-----------+
| #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats | Location | EC Policy |
+-------+--------+--------+--------------+-------------------+---------+-------------------+-------------------------------+-----------+
| -1 | 8 | 1.34GB | NOT CACHED | NOT CACHED | PARQUET | false | /user/impala/staging/airlines | NONE |
+-------+--------+--------+--------------+-------------------+---------+-------------------+-------------------------------+-----------+
> SHOW FILES IN airlines_external;
+----------------------------------------------------------------------------------------+----------+-----------+-----------+
| Path | Size | Partition | EC Policy |
+----------------------------------------------------------------------------------------+----------+-----------+-----------+
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq | 252.99MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq | 13.43MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd984_501176748_data.0.parq | 252.84MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd984_501176748_data.1.parq | 63.92MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd985_1199995767_data.0.parq | 183.64MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd986_2086627597_data.0.parq | 240.04MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd987_1048668565_data.0.parq | 211.35MB | | NONE |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd988_1432111844_data.0.parq | 151.46MB | | NONE |
+----------------------------------------------------------------------------------------+----------+-----------+-----------+
> DESCRIBE airlines_external;
+---------------------+--------+-----------------------------+
| name | type | comment |
+---------------------+--------+-----------------------------+
| year | int | Inferred from Parquet file. |
| month | int | Inferred from Parquet file. |
| day | int | Inferred from Parquet file. |
| dayofweek | int | Inferred from Parquet file. |
| dep_time | int | Inferred from Parquet file. |
| crs_dep_time | int | Inferred from Parquet file. |
| arr_time | int | Inferred from Parquet file. |
| crs_arr_time | int | Inferred from Parquet file. |
| carrier | string | Inferred from Parquet file. |
| flight_num | int | Inferred from Parquet file. |
| tail_num | int | Inferred from Parquet file. |
| actual_elapsed_time | int | Inferred from Parquet file. |
| crs_elapsed_time | int | Inferred from Parquet file. |
| airtime | int | Inferred from Parquet file. |
| arrdelay | int | Inferred from Parquet file. |
| depdelay | int | Inferred from Parquet file. |
| origin | string | Inferred from Parquet file. |
| dest | string | Inferred from Parquet file. |
| distance | int | Inferred from Parquet file. |
| taxi_in | int | Inferred from Parquet file. |
| taxi_out | int | Inferred from Parquet file. |
| cancelled | int | Inferred from Parquet file. |
| cancellation_code | string | Inferred from Parquet file. |
| diverted | int | Inferred from Parquet file. |
| carrier_delay | int | Inferred from Parquet file. |
| weather_delay | int | Inferred from Parquet file. |
| nas_delay | int | Inferred from Parquet file. |
| security_delay | int | Inferred from Parquet file. |
| late_aircraft_delay | int | Inferred from Parquet file. |
+---------------------+--------+-----------------------------+
> DESCRIBE FORMATTED airlines_external;
+------------------------------+-------------------------------
| name | type
+------------------------------+-------------------------------
...
| # Detailed Table Information | NULL
| Database: | airlines_data
| Owner: | impala
...
| Location: | /user/impala/staging/airlines
| Table Type: | EXTERNAL_TABLE
...
| # Storage Information | NULL
| SerDe Library: | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
| InputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForma
| OutputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
...
Now that we are confident that the connections are solid between the Impala table and the underlying Parquet files, we run some initial queries to understand the characteristics of the data: the overall number of rows, and the ranges and how many different values are in certain columns.
> SELECT COUNT(*) FROM airlines_external;
+-----------+
| count(*) |
+-----------+
| 123534969 |
+-----------+
The NDV()
function returns a number of distinct values,
which, for performance reasons, is an estimate when there are lots of
different values in the column, but is precise when the cardinality is
less than 16 K. Use NDV()
function for this kind of
exploration rather than COUNT(DISTINCT
colname)
, because Impala can evaluate
multiple NDV()
functions in a single query, but only
a single instance of COUNT DISTINCT
.
> SElECT NDV(carrier), NDV(flight_num), NDV(tail_num),
NDV(origin), NDV(dest) FROM airlines_external;
+--------------+-----------------+---------------+-------------+-----------+
| ndv(carrier) | ndv(flight_num) | ndv(tail_num) | ndv(origin) | ndv(dest) |
+--------------+-----------------+---------------+-------------+-----------+
| 29 | 8463 | 3 | 342 | 349 |
+--------------+-----------------+---------------+-------------+-----------+
> SELECT tail_num, COUNT(*) AS howmany FROM airlines_external
GROUP BY tail_num;
+----------+-----------+
| tail_num | howmany |
+----------+-----------+
| NULL | 123122001 |
| 715 | 1 |
| 0 | 406405 |
| 112 | 6562 |
+----------+-----------+
> SELECT DISTINCT dest FROM airlines_external
WHERE dest NOT IN (SELECT origin FROM airlines_external);
+------+
| dest |
+------+
| CBM |
| SKA |
| LAR |
| RCA |
| LBF |
+------+
> SELECT DISTINCT dest FROM airlines_external
WHERE dest NOT IN (SELECT DISTINCT origin FROM airlines_external);
+------+
| dest |
+------+
| CBM |
| SKA |
| LAR |
| RCA |
| LBF |
+------+
> SELECT DISTINCT origin FROM airlines_external
WHERE origin NOT IN (SELECT DISTINCT dest FROM airlines_external);
Fetched 0 row(s) in 2.63
With the above queries, we see that there are modest numbers of
different airlines, flight numbers, and origin and destination
airports. Two things jump out from this query: the number of
tail_num
values is much smaller than we might have
expected, and there are more destination airports than origin
airports. Let's dig further. What we find is that most
tail_num
values are NULL
. It looks
like this was an experimental column that wasn't filled in accurately.
We make a mental note that if we use this data as a starting point,
we'll ignore this column. We also find that certain airports are
represented in the ORIGIN
column but not the
DEST
column; now we know that we cannot rely on the
assumption that those sets of airport codes are identical.
SELECT DISTINCT DEST
query takes
almost 40 seconds. We expect all queries on such a small data set,
less than 2 GB, to take a few seconds at most. The reason is because
the expression NOT IN (SELECT origin FROM
airlines_external)
produces an intermediate result set of
123 million rows, then runs 123 million comparisons on each data node
against the tiny set of destination airports. The way the NOT
IN
operator works internally means that this intermediate
result set with 123 million rows might be transmitted across the
network to each data node in the cluster. Applying another
DISTINCT
inside the NOT IN
subquery means that the intermediate result set is only 340 items,
resulting in much less network traffic and fewer comparison
operations. The more efficient query with the added
DISTINCT
is approximately 7 times as fast. Next, we try doing a simple calculation, with results broken down by year.
This reveals that some years have no data in the
airtime
column. That means we might be able to use
that column in queries involving certain date ranges, but we cannot
count on it to always be reliable. The question of whether a column
contains any NULL
values, and if so what is their
number, proportion, and distribution, comes up again and again when
doing initial exploration of a data set.
> SELECT year, SUM(airtime) FROM airlines_external
GROUP BY year ORDER BY year DESC;
+------+--------------+
| year | sum(airtime) |
+------+--------------+
| 2008 | 713050445 |
| 2007 | 748015545 |
| 2006 | 720372850 |
| 2005 | 708204026 |
| 2004 | 714276973 |
| 2003 | 665706940 |
| 2002 | 549761849 |
| 2001 | 590867745 |
| 2000 | 583537683 |
| 1999 | 561219227 |
| 1998 | 538050663 |
| 1997 | 536991229 |
| 1996 | 519440044 |
| 1995 | 513364265 |
| 1994 | NULL |
| 1993 | NULL |
| 1992 | NULL |
| 1991 | NULL |
| 1990 | NULL |
| 1989 | NULL |
| 1988 | NULL |
| 1987 | NULL |
+------+--------------+
With the notion of NULL
values in mind, let's come back to
the tail_num
column that we discovered had a lot of
NULL
s. Let's quantify the NULL
and
non-NULL
values in that column for better
understanding. First, we just count the overall number of rows versus
the non-NULL
values in that column. That initial
result gives the appearance of relatively few
non-NULL
values, but we can break it down more
clearly in a single query. Once we have the COUNT(*)
and the COUNT(colname)
numbers, we
can encode that initial query in a WITH
clause, then
run a follow-on query that performs multiple arithmetic operations on
those values. Seeing that only one-third of one percent of all rows
have non-NULL
values for the
tail_num
column clearly illustrates that column is
not of much use.
> SELECT COUNT(*) AS 'rows', COUNT(tail_num) AS 'non-null tail numbers'
FROM airlines_external;
+-----------+-----------------------+
| rows | non-null tail numbers |
+-----------+-----------------------+
| 123534969 | 412968 |
+-----------+-----------------------+
> WITH t1 AS
(SELECT COUNT(*) AS 'rows', COUNT(tail_num) AS 'nonnull'
FROM airlines_external)
SELECT `rows`, `nonnull`, `rows` - `nonnull` AS 'nulls',
(`nonnull` / `rows`) * 100 AS 'percentage non-null'
FROM t1;
+-----------+---------+-----------+---------------------+
| rows | nonnull | nulls | percentage non-null |
+-----------+---------+-----------+---------------------+
| 123534969 | 412968 | 123122001 | 0.3342923897119365 |
+-----------+---------+-----------+---------------------+
By examining other columns using these techniques, we can form a mental
picture of the way data is distributed throughout the table, and which
columns are most significant for query purposes. For this tutorial, we
focus mostly on the fields likely to hold discrete values, rather than
columns such as actual_elapsed_time
whose names
suggest they hold measurements. We would dig deeper into those columns
once we had a clear picture of which questions were worthwhile to ask,
and what kinds of trends we might look for. For the final piece of
initial exploration, let's look at the year
column. A
simple GROUP BY
query shows that it has a
well-defined range, a manageable number of distinct values, and
relatively even distribution of rows across the different years.
> SELECT MIN(year), MAX(year), NDV(year) FROM airlines_external;
+-----------+-----------+-----------+
| min(year) | max(year) | ndv(year) |
+-----------+-----------+-----------+
| 1987 | 2008 | 22 |
+-----------+-----------+-----------+
> SELECT year, COUNT(*) howmany FROM airlines_external
GROUP BY year ORDER BY year DESC;
+------+---------+
| year | howmany |
+------+---------+
| 2008 | 7009728 |
| 2007 | 7453215 |
| 2006 | 7141922 |
| 2005 | 7140596 |
| 2004 | 7129270 |
| 2003 | 6488540 |
| 2002 | 5271359 |
| 2001 | 5967780 |
| 2000 | 5683047 |
| 1999 | 5527884 |
| 1998 | 5384721 |
| 1997 | 5411843 |
| 1996 | 5351983 |
| 1995 | 5327435 |
| 1994 | 5180048 |
| 1993 | 5070501 |
| 1992 | 5092157 |
| 1991 | 5076925 |
| 1990 | 5270893 |
| 1989 | 5041200 |
| 1988 | 5202096 |
| 1987 | 1311826 |
+------+---------+
We could go quite far with the data in this initial raw format, just as we
downloaded it from the web. If the data set proved to be useful and
worth persisting in Impala for extensive queries, we might want to
copy it to an internal table, letting Impala manage the data files and
perhaps reorganizing a little for higher efficiency. In this next
stage of the tutorial, we copy the original data into a partitioned
table, still in Parquet format. Partitioning based on the
year
column lets us run queries with clauses such
as WHERE year = 2001
or WHERE year BETWEEN
1989 AND 1999
, which can dramatically cut down on I/O by
ignoring all the data from years outside the desired range. Rather
than reading all the data and then deciding which rows are in the
matching years, Impala can zero in on only the data files from
specific year
partitions. To do this, Impala
physically reorganizes the data files, putting the rows from each year
into data files in a separate HDFS directory for each
year
value. Along the way, we'll also get rid of
the tail_num
column that proved to be almost entirely
NULL
.
The first step is to create a new table with a layout very similar to the
original airlines_external
table. We'll do that by
reverse-engineering a CREATE TABLE
statement for the
first table, then tweaking it slightly to include a PARTITION
BY
clause for year
, and excluding the
tail_num
column. The SHOW CREATE
TABLE
statement gives us the starting point.
Although we could edit that output into a new SQL statement, all the ASCII box characters
make such editing inconvenient. To get a more stripped-down CREATE TABLE
to start with, we restart the impala-shell command with the
-B
option, which turns off the box-drawing behavior.
$ impala-shell -i localhost -B -d airlines_data;
> SHOW CREATE TABLE airlines_external;
"CREATE EXTERNAL TABLE airlines_data.airlines_external (
year INT COMMENT 'inferred from: optional int32 year',
month INT COMMENT 'inferred from: optional int32 month',
day INT COMMENT 'inferred from: optional int32 day',
dayofweek INT COMMENT 'inferred from: optional int32 dayofweek',
dep_time INT COMMENT 'inferred from: optional int32 dep_time',
crs_dep_time INT COMMENT 'inferred from: optional int32 crs_dep_time',
arr_time INT COMMENT 'inferred from: optional int32 arr_time',
crs_arr_time INT COMMENT 'inferred from: optional int32 crs_arr_time',
carrier STRING COMMENT 'inferred from: optional binary carrier',
flight_num INT COMMENT 'inferred from: optional int32 flight_num',
tail_num INT COMMENT 'inferred from: optional int32 tail_num',
actual_elapsed_time INT COMMENT 'inferred from: optional int32 actual_elapsed_time',
crs_elapsed_time INT COMMENT 'inferred from: optional int32 crs_elapsed_time',
airtime INT COMMENT 'inferred from: optional int32 airtime',
arrdelay INT COMMENT 'inferred from: optional int32 arrdelay',
depdelay INT COMMENT 'inferred from: optional int32 depdelay',
origin STRING COMMENT 'inferred from: optional binary origin',
dest STRING COMMENT 'inferred from: optional binary dest',
distance INT COMMENT 'inferred from: optional int32 distance',
taxi_in INT COMMENT 'inferred from: optional int32 taxi_in',
taxi_out INT COMMENT 'inferred from: optional int32 taxi_out',
cancelled INT COMMENT 'inferred from: optional int32 cancelled',
cancellation_code STRING COMMENT 'inferred from: optional binary cancellation_code',
diverted INT COMMENT 'inferred from: optional int32 diverted',
carrier_delay INT COMMENT 'inferred from: optional int32 carrier_delay',
weather_delay INT COMMENT 'inferred from: optional int32 weather_delay',
nas_delay INT COMMENT 'inferred from: optional int32 nas_delay',
security_delay INT COMMENT 'inferred from: optional int32 security_delay',
late_aircraft_delay INT COMMENT 'inferred from: optional int32 late_aircraft_delay'
)
STORED AS PARQUET
LOCATION 'hdfs://a1730.example.com:8020/user/impala/staging/airlines'
TBLPROPERTIES ('numFiles'='0', 'COLUMN_STATS_ACCURATE'='false',
'transient_lastDdlTime'='1439425228', 'numRows'='-1', 'totalSize'='0',
'rawDataSize'='-1')"
After copying and pasting the CREATE TABLE
statement into a text editor for fine-tuning, we quit and restart impala-shell
without the -B
option, to switch back to regular
output.
Next we run the CREATE TABLE
statement that we adapted from
the SHOW CREATE TABLE
output. We kept the
STORED AS PARQUET
clause because we want to
rearrange the data somewhat but still keep it in the high-performance
Parquet format. The LOCATION
and
TBLPROPERTIES
clauses are not relevant for this new
table, so we edit those out. Because we are going to partition the new
table based on the year
column, we move that column
name (and its type) into a new PARTITIONED BY
clause.
> CREATE TABLE airlines_data.airlines
(month INT,
day INT,
dayofweek INT,
dep_time INT,
crs_dep_time INT,
arr_time INT,
crs_arr_time INT,
carrier STRING,
flight_num INT,
actual_elapsed_time INT,
crs_elapsed_time INT,
airtime INT,
arrdelay INT,
depdelay INT,
origin STRING,
dest STRING,
distance INT,
taxi_in INT,
taxi_out INT,
cancelled INT,
cancellation_code STRING,
diverted INT,
carrier_delay INT,
weather_delay INT,
nas_delay INT,
security_delay INT,
late_aircraft_delay INT)
PARTITIONED BY (year INT)
STORED AS PARQUET
;
Next, we copy all the rows from the original table into this new one with an
INSERT
statement. (We edited the CREATE
TABLE
statement to make an INSERT
statement with the column names in the same order.) The only change is
to add a PARTITION(year)
clause, and move the
year
column to the very end of the
SELECT
list of the INSERT
statement. Specifying PARTITION(year)
, rather than a
fixed value such as PARTITION(year=2000)
, means that
Impala figures out the partition value for each row based on the value
of the very last column in the SELECT
list. This is
the first SQL statement that legitimately takes any substantial time,
because the rows from different years are shuffled around the cluster;
the rows that go into each partition are collected on one node, before
being written to one or more new data files.
> INSERT INTO airlines_data.airlines
PARTITION (year)
SELECT
month,
day,
dayofweek,
dep_time,
crs_dep_time,
arr_time,
crs_arr_time,
carrier,
flight_num,
actual_elapsed_time,
crs_elapsed_time,
airtime,
arrdelay,
depdelay,
origin,
dest,
distance,
taxi_in,
taxi_out,
cancelled,
cancellation_code,
diverted,
carrier_delay,
weather_delay,
nas_delay,
security_delay,
late_aircraft_delay,
year
FROM airlines_data.airlines_external;
Once partitioning or join queries come into play, it's important to have statistics
that Impala can use to optimize queries on the corresponding tables.
The COMPUTE INCREMENTAL STATS
statement is the way to collect
statistics for partitioned tables.
Then the SHOW TABLE STATS
statement confirms that the statistics
are in place for each partition, and also illustrates how many files and how much raw data
is in each partition.
> COMPUTE INCREMENTAL STATS airlines;
+-------------------------------------------+
| summary |
+-------------------------------------------+
| Updated 22 partition(s) and 27 column(s). |
+-------------------------------------------+
> SHOW TABLE STATS airlines;
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+-------------------------------------------------------------------------------+-----------+
| year | #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats | Location | EC Policy |
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+-------------------------------------------------------------------------------+-----------+
| 1987 | 1311826 | 1 | 11.75MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1987 | NONE |
| 1988 | 5202096 | 1 | 44.04MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1988 | NONE |
| 1989 | 5041200 | 1 | 46.07MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1989 | NONE |
| 1990 | 5270893 | 1 | 46.25MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1990 | NONE |
| 1991 | 5076925 | 1 | 46.77MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1991 | NONE |
| 1992 | 5092157 | 1 | 48.21MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1992 | NONE |
| 1993 | 5070501 | 1 | 47.46MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1993 | NONE |
| 1994 | 5180048 | 1 | 47.47MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1994 | NONE |
| 1995 | 5327435 | 1 | 62.40MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1995 | NONE |
| 1996 | 5351983 | 1 | 62.93MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1996 | NONE |
| 1997 | 5411843 | 1 | 65.05MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1997 | NONE |
| 1998 | 5384721 | 1 | 62.21MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1998 | NONE |
| 1999 | 5527884 | 1 | 65.10MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1999 | NONE |
| 2000 | 5683047 | 1 | 67.68MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2000 | NONE |
| 2001 | 5967780 | 1 | 74.03MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2001 | NONE |
| 2002 | 5271359 | 1 | 74.00MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2002 | NONE |
| 2003 | 6488540 | 1 | 99.35MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2003 | NONE |
| 2004 | 7129270 | 1 | 123.29MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2004 | NONE |
| 2005 | 7140596 | 1 | 120.72MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2005 | NONE |
| 2006 | 7141922 | 1 | 121.88MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2006 | NONE |
| 2007 | 7453215 | 1 | 130.87MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2007 | NONE |
| 2008 | 7009728 | 1 | 123.14MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2008 | NONE |
| Total | 123534969 | 22 | 1.55GB | 0B | | | | | |
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+-------------------------------------------------------------------------------+-----------+
At this point, we sanity check the partitioning we did. All the partitions
have exactly one file, which is on the low side. A query that includes
a clause WHERE year=2004
will only read a single data
block; that data block will be read and processed by a single data
node; therefore, for a query targeting a single year, all the other
nodes in the cluster will sit idle while all the work happens on a
single machine. It's even possible that by chance (depending on HDFS
replication factor and the way data blocks are distributed across the
cluster), that multiple year partitions selected by a filter such as
WHERE year BETWEEN 1999 AND 2001
could all be read
and processed by the same data node. The more data files each
partition has, the more parallelism you can get and the less
probability of "hotspots" occurring on particular nodes,
therefore a bigger performance boost by having a big cluster.
However, the more data files, the less data goes in each one. The overhead of dividing the work in a parallel query might not be worth it if each node is only reading a few megabytes. 50 or 100 megabytes is a decent size for a Parquet data block; 9 or 37 megabytes is on the small side. Which is to say, the data distribution we ended up with based on this partitioning scheme is on the borderline between sensible (reasonably large files) and suboptimal (few files in each partition). The way to see how well it works in practice is to run the same queries against the original flat table and the new partitioned table, and compare times.
Spoiler: in this case, with my particular 4-node cluster with its specific
distribution of data blocks and my particular exploratory queries,
queries against the partitioned table do consistently run faster than
the same queries against the unpartitioned table. But I could not be
sure that would be the case without some real measurements. Here are
some queries I ran to draw that conclusion, first against
airlines_external
(no partitioning), then against
AIRLINES
(partitioned by year). The
AIRLINES
queries are consistently faster. Changing
the volume of data, changing the size of the cluster, running queries
that did or didn't refer to the partition key columns, or other
factors could change the results to favor one table layout or the
other.
> SELECT SUM(airtime) FROM airlines_external;
+--------------+
| 8662859484 |
+--------------+
> SELECT SUM(airtime) FROM airlines;
+--------------+
| 8662859484 |
+--------------+
> SELECT SUM(airtime) FROM airlines_external WHERE year = 2005;
+--------------+
| 708204026 |
+--------------+
> SELECT SUM(airtime) FROM airlines WHERE year = 2005;
+--------------+
| 708204026 |
+--------------+
Now we can finally analyze this data set that from the raw data files and we
didn't know what columns they contained. Let's see whether the
airtime
of a flight tends to be different depending
on the day of the week. We can see that the average is a little higher
on day number 6; perhaps Saturday is a busy flying day and planes have
to circle for longer at the destination airport before landing.
> SELECT dayofweek, AVG(airtime) FROM airlines
GROUP BY dayofweek ORDER BY dayofweek;
+-----------+-------------------+
| dayofweek | avg(airtime) |
+-----------+-------------------+
| 1 | 102.1560425016671 |
| 2 | 102.1582931538807 |
| 3 | 102.2170009256653 |
| 4 | 102.37477661846 |
| 5 | 102.2697358763511 |
| 6 | 105.3627448363705 |
| 7 | 103.4144351202054 |
+-----------+-------------------+
To see if the apparent trend holds up over time, let's do the same breakdown by day of week, but also
split up by year. Now we can see that day number 6 consistently has a higher average air time in each
year. We can also see that the average air time increased over time across the board. And the presence
of NULL
for this column in years 1987 to 1994 shows that queries involving this column
need to be restricted to a date range of 1995 and higher.
> SELECT year, dayofweek, AVG(airtime) FROM airlines
GROUP BY year, dayofweek ORDER BY year DESC, dayofweek;
+------+-----------+-------------------+
| year | dayofweek | avg(airtime) |
+------+-----------+-------------------+
| 2008 | 1 | 103.1821651651355 |
| 2008 | 2 | 103.2149301386094 |
| 2008 | 3 | 103.0585076622796 |
| 2008 | 4 | 103.4671383539038 |
| 2008 | 5 | 103.5575385182659 |
| 2008 | 6 | 107.4006306562128 |
| 2008 | 7 | 104.8648851041755 |
| 2007 | 1 | 102.2196114337825 |
| 2007 | 2 | 101.9317791906348 |
| 2007 | 3 | 102.0964767689043 |
| 2007 | 4 | 102.6215927201686 |
| 2007 | 5 | 102.4289399000661 |
| 2007 | 6 | 105.1477448215756 |
| 2007 | 7 | 103.6305945644095 |
...
| 1996 | 1 | 99.33860750862108 |
| 1996 | 2 | 99.54225446396656 |
| 1996 | 3 | 99.41129336113134 |
| 1996 | 4 | 99.5110373340348 |
| 1996 | 5 | 99.22120745027595 |
| 1996 | 6 | 101.1717447111921 |
| 1996 | 7 | 99.95410136133704 |
| 1995 | 1 | 96.93779698300494 |
| 1995 | 2 | 96.93458674589712 |
| 1995 | 3 | 97.00972311337051 |
| 1995 | 4 | 96.90843832024412 |
| 1995 | 5 | 96.78382115425562 |
| 1995 | 6 | 98.70872826057003 |
| 1995 | 7 | 97.85570478374616 |
| 1994 | 1 | NULL |
| 1994 | 2 | NULL |
| 1994 | 3 | NULL |
...
| 1987 | 5 | NULL |
| 1987 | 6 | NULL |
| 1987 | 7 | NULL |
+------+-----------+-------------------+