Queries involving join operations often require more tuning than queries that refer to only one table. The maximum size of the result set from a join query is the product of the number of rows in all the joined tables. When joining several tables with millions or billions of rows, any missed opportunity to filter the result set, or other inefficiency in the query, could lead to an operation that does not finish in a practical time and has to be cancelled.
The simplest technique for tuning an Impala join query is to collect statistics on each table involved in the
join using the COMPUTE STATS
statement, and then let Impala automatically optimize the query based on the size of each table, number of
distinct values of each column, and so on. The COMPUTE STATS
statement and the join
optimization are new features introduced in Impala 1.2.2. For accurate statistics about each table, issue the
COMPUTE STATS
statement after loading the data into that table, and again if the amount of
data changes substantially due to an INSERT
, LOAD DATA
, adding a partition,
and so on.
If statistics are not available for all the tables in the join query, or if Impala chooses a join order that
is not the most efficient, you can override the automatic join order optimization by specifying the
STRAIGHT_JOIN
keyword immediately after the SELECT
and any DISTINCT
or ALL
keywords. In this case, Impala uses the order the tables appear in the query to guide how the
joins are processed.
When you use the STRAIGHT_JOIN
technique, you must order the tables in the join query
manually instead of relying on the Impala optimizer. The optimizer uses sophisticated techniques to estimate
the size of the result set at each stage of the join. For manual ordering, use this heuristic approach to
start with, and then experiment to fine-tune the order:
For example, if you had tables BIG
, MEDIUM
, SMALL
, and
TINY
, the logical join order to try would be BIG
, TINY
,
SMALL
, MEDIUM
.
The terms "largest" and "smallest" refers to the size of the intermediate result set based on the
number of rows and columns from each table that are part of the result set. For example, if you join one
table sales
with another table customers
, a query might find results from
100 different customers who made a total of 5000 purchases. In that case, you would specify SELECT
... FROM sales JOIN customers ...
, putting customers
on the right side because it
is smaller in the context of this query.
The Impala query planner chooses between different techniques for performing join queries, depending on the
absolute and relative sizes of the tables. Broadcast joins are the default, where the right-hand table
is considered to be smaller than the left-hand table, and its contents are sent to all the other nodes
involved in the query. The alternative technique is known as a partitioned join (not related to a
partitioned table), which is more suitable for large tables of roughly equal size. With this technique,
portions of each table are sent to appropriate other nodes where those subsets of rows can be processed in
parallel. The choice of broadcast or partitioned join also depends on statistics being available for all
tables in the join, gathered by the COMPUTE STATS
statement.
To see which join strategy is used for a particular query, issue an EXPLAIN
statement for
the query. If you find that a query uses a broadcast join when you know through benchmarking that a
partitioned join would be more efficient, or vice versa, add a hint to the query to specify the precise join
mechanism to use. See Optimizer Hints for details.
If table or column statistics are not available for some tables in a join, Impala still reorders the tables using the information that is available. Tables with statistics are placed on the left side of the join order, in descending order of cost based on overall size and cardinality. Tables without statistics are treated as zero-size, that is, they are always placed on the right side of the join order.
If an Impala join query is inefficient because of outdated statistics or unexpected data distribution, you
can keep Impala from reordering the joined tables by using the STRAIGHT_JOIN
keyword
immediately after the SELECT
and any DISTINCT
or ALL
keywords. The STRAIGHT_JOIN
keyword turns off
the reordering of join clauses that Impala does internally, and produces a plan that relies on the join
clauses being ordered optimally in the query text.
The STRAIGHT_JOIN
hint affects the join order of table references in
the query block containing the hint. It does not affect the join order of nested
queries, such as views, inline views, or WHERE
-clause subqueries. To
use this hint for performance tuning of complex queries, apply the hint to all query
blocks that need a fixed join order.
In this example, the subselect from the BIG
table produces a very small result set, but
the table might still be treated as if it were the biggest and placed first in the join order. Using
STRAIGHT_JOIN
for the last join clause prevents the final table from being reordered,
keeping it as the rightmost table in the join order.
select straight_join x from medium join small join (select * from big where c1 < 10) as big
where medium.id = small.id and small.id = big.id;
-- If the query contains [DISTINCT | ALL], the hint goes after those keywords.
select distinct straight_join x from medium join small join (select * from big where c1 < 10) as big
where medium.id = small.id and small.id = big.id;
Here are examples showing joins between tables with 1 billion, 200 million, and 1 million rows. (In this
case, the tables are unpartitioned and using Parquet format.) The smaller tables contain subsets of data
from the largest one, for convenience of joining on the unique ID
column. The smallest
table only contains a subset of columns from the others.
[localhost:21000] > create table big stored as parquet as select * from raw_data;
+----------------------------+
| summary |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 671.56s
[localhost:21000] > desc big;
+-----------+---------+---------+
| name | type | comment |
+-----------+---------+---------+
| id | int | |
| val | int | |
| zfill | string | |
| name | string | |
| assertion | boolean | |
+-----------+---------+---------+
Returned 5 row(s) in 0.01s
[localhost:21000] > create table medium stored as parquet as select * from big limit 200 * floor(1e6);
+---------------------------+
| summary |
+---------------------------+
| Inserted 200000000 row(s) |
+---------------------------+
Returned 1 row(s) in 138.31s
[localhost:21000] > create table small stored as parquet as select id,val,name from big where assertion = true limit 1 * floor(1e6);
+-------------------------+
| summary |
+-------------------------+
| Inserted 1000000 row(s) |
+-------------------------+
Returned 1 row(s) in 6.32s
For any kind of performance experimentation, use the EXPLAIN
statement to see how any
expensive query will be performed without actually running it, and enable verbose EXPLAIN
plans containing more performance-oriented detail: The most interesting plan lines are highlighted in bold,
showing that without statistics for the joined tables, Impala cannot make a good estimate of the number of
rows involved at each stage of processing, and is likely to stick with the BROADCAST
join
mechanism that sends a complete copy of one of the tables to each node.
[localhost:21000] > set explain_level=verbose;
EXPLAIN_LEVEL set to verbose
[localhost:21000] > explain select count(*) from big join medium where big.id = medium.id;
+----------------------------------------------------------+
| Explain String |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=2.10GB VCores=2 |
| |
| PLAN FRAGMENT 0 |
| PARTITION: UNPARTITIONED |
| |
| 6:AGGREGATE (merge finalize) |
| | output: SUM(COUNT(*)) |
| | cardinality: 1 |
| | per-host memory: unavailable |
| | tuple ids: 2 |
| | |
| 5:EXCHANGE |
| cardinality: 1 |
| per-host memory: unavailable |
| tuple ids: 2 |
| |
| PLAN FRAGMENT 1 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 5 |
| UNPARTITIONED |
| |
| 3:AGGREGATE |
| | output: COUNT(*) |
| | cardinality: 1 |
| | per-host memory: 10.00MB |
| | tuple ids: 2 |
| | |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | big.id = medium.id |
| | cardinality: unavailable |
| | per-host memory: 2.00GB |
| | tuple ids: 0 1 |
| | |
| |----4:EXCHANGE |
| | cardinality: unavailable |
| | per-host memory: 0B |
| | tuple ids: 1 |
| | |
| 0:SCAN HDFS |
| table=join_order.big #partitions=1/1 size=23.12GB |
| table stats: unavailable |
| column stats: unavailable |
| cardinality: unavailable |
| per-host memory: 88.00MB |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 2 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 4 |
| UNPARTITIONED |
| |
| 1:SCAN HDFS |
| table=join_order.medium #partitions=1/1 size=4.62GB |
| table stats: unavailable |
| column stats: unavailable |
| cardinality: unavailable |
| per-host memory: 88.00MB |
| tuple ids: 1 |
+----------------------------------------------------------+
Returned 64 row(s) in 0.04s
Gathering statistics for all the tables is straightforward, one COMPUTE STATS
statement
per table:
[localhost:21000] > compute stats small;
+-----------------------------------------+
| summary |
+-----------------------------------------+
| Updated 1 partition(s) and 3 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 4.26s
[localhost:21000] > compute stats medium;
+-----------------------------------------+
| summary |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 42.11s
[localhost:21000] > compute stats big;
+-----------------------------------------+
| summary |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 165.44s
With statistics in place, Impala can choose a more effective join order rather than following the
left-to-right sequence of tables in the query, and can choose BROADCAST
or
PARTITIONED
join strategies based on the overall sizes and number of rows in the table:
[localhost:21000] > explain select count(*) from medium join big where big.id = medium.id;
Query: explain select count(*) from medium join big where big.id = medium.id
+-----------------------------------------------------------+
| Explain String |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
| |
| PLAN FRAGMENT 0 |
| PARTITION: UNPARTITIONED |
| |
| 6:AGGREGATE (merge finalize) |
| | output: SUM(COUNT(*)) |
| | cardinality: 1 |
| | per-host memory: unavailable |
| | tuple ids: 2 |
| | |
| 5:EXCHANGE |
| cardinality: 1 |
| per-host memory: unavailable |
| tuple ids: 2 |
| |
| PLAN FRAGMENT 1 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 5 |
| UNPARTITIONED |
| |
| 3:AGGREGATE |
| | output: COUNT(*) |
| | cardinality: 1 |
| | per-host memory: 10.00MB |
| | tuple ids: 2 |
| | |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | big.id = medium.id |
| | cardinality: 1443004441 |
| | per-host memory: 839.23MB |
| | tuple ids: 1 0 |
| | |
| |----4:EXCHANGE |
| | cardinality: 200000000 |
| | per-host memory: 0B |
| | tuple ids: 0 |
| | |
| 1:SCAN HDFS |
| table=join_order.big #partitions=1/1 size=23.12GB |
| table stats: 1000000000 rows total |
| column stats: all |
| cardinality: 1000000000 |
| per-host memory: 88.00MB |
| tuple ids: 1 |
| |
| PLAN FRAGMENT 2 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 4 |
| UNPARTITIONED |
| |
| 0:SCAN HDFS |
| table=join_order.medium #partitions=1/1 size=4.62GB |
| table stats: 200000000 rows total |
| column stats: all |
| cardinality: 200000000 |
| per-host memory: 88.00MB |
| tuple ids: 0 |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.04s
[localhost:21000] > explain select count(*) from small join big where big.id = small.id;
Query: explain select count(*) from small join big where big.id = small.id
+-----------------------------------------------------------+
| Explain String |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
| |
| PLAN FRAGMENT 0 |
| PARTITION: UNPARTITIONED |
| |
| 6:AGGREGATE (merge finalize) |
| | output: SUM(COUNT(*)) |
| | cardinality: 1 |
| | per-host memory: unavailable |
| | tuple ids: 2 |
| | |
| 5:EXCHANGE |
| cardinality: 1 |
| per-host memory: unavailable |
| tuple ids: 2 |
| |
| PLAN FRAGMENT 1 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 5 |
| UNPARTITIONED |
| |
| 3:AGGREGATE |
| | output: COUNT(*) |
| | cardinality: 1 |
| | per-host memory: 10.00MB |
| | tuple ids: 2 |
| | |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | big.id = small.id |
| | cardinality: 1000000000 |
| | per-host memory: 3.15MB |
| | tuple ids: 1 0 |
| | |
| |----4:EXCHANGE |
| | cardinality: 1000000 |
| | per-host memory: 0B |
| | tuple ids: 0 |
| | |
| 1:SCAN HDFS |
| table=join_order.big #partitions=1/1 size=23.12GB |
| table stats: 1000000000 rows total |
| column stats: all |
| cardinality: 1000000000 |
| per-host memory: 88.00MB |
| tuple ids: 1 |
| |
| PLAN FRAGMENT 2 |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 4 |
| UNPARTITIONED |
| |
| 0:SCAN HDFS |
| table=join_order.small #partitions=1/1 size=17.93MB |
| table stats: 1000000 rows total |
| column stats: all |
| cardinality: 1000000 |
| per-host memory: 32.00MB |
| tuple ids: 0 |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.03s
When queries like these are actually run, the execution times are relatively consistent regardless of the
table order in the query text. Here are examples using both the unique ID
column and the
VAL
column containing duplicate values:
[localhost:21000] > select count(*) from big join small on (big.id = small.id);
Query: select count(*) from big join small on (big.id = small.id)
+----------+
| count(*) |
+----------+
| 1000000 |
+----------+
Returned 1 row(s) in 21.68s
[localhost:21000] > select count(*) from small join big on (big.id = small.id);
Query: select count(*) from small join big on (big.id = small.id)
+----------+
| count(*) |
+----------+
| 1000000 |
+----------+
Returned 1 row(s) in 20.45s
[localhost:21000] > select count(*) from big join small on (big.val = small.val);
+------------+
| count(*) |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 108.85s
[localhost:21000] > select count(*) from small join big on (big.val = small.val);
+------------+
| count(*) |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 100.76s