Learn the details about using Impala alongside Kudu.
Kudu (currently in beta), the new storage layer for the Apache Hadoop ecosystem, is tightly integrated with Impala, allowing you to insert, query, update, and delete data from Kudu tablets using Impala’s SQL syntax, as an alternative to using the Kudu APIs to build a custom Kudu application. In addition, you can use JDBC or ODBC to connect existing or new applications written in any language, framework, or business intelligence tool to your Kudu data, using Impala as the broker. This integration relies on features that released versions of Impala do not have yet, as of Impala 2.3, which is expected to ship in CDH 5.5. In the interim, you need to install a fork of Impala called Impala_Kudu.
In this post, you will learn about the various ways to create and partition tables as well as currently supported SQL operators. This post assumes a successful install of the Impala_Kudu package via Cloudera Manager or command line; see the docs for instructions. Note these prerequisites:
- Impala_Kudu depends upon CDH 5.4 or later. To use Cloudera Manager with Impala_Kudu, you need Cloudera Manager 5.4.3 or later. Cloudera Manager 5.4.7 is recommended, as it adds support for collecting metrics from Kudu.
- If you have an existing Impala instance on your cluster, you can install Impala_Kudu alongside the existing Impala instance if you use parcels. The new instance does not share configurations with the existing instance and is completely independent. A script is provided to automate this type of installation.
- It is especially important that the cluster has adequate unreserved RAM for the Impala_Kudu instance.
- Consider shutting down the original Impala service when testing Impala_Kudu if you want to be sure it is not impacted.
- Before installing Impala_Kudu, you must have already installed and configured services for HDFS, Apache Hive, and Kudu. You may need Apache HBase, YARN, Apache Sentry, and Apache ZooKeeper services as well.
Using Impala with Kudu
Neither Kudu nor Impala need special configuration for you to use the Impala Shell or the Impala API to insert, update, delete, or query Kudu data using Impala. However, you do need to create a mapping between the Impala and Kudu tables. Kudu provides the Impala query to map to an existing Kudu table in the web UI.
- Be sure you are using the impala-shell binary provided by the Impala_Kudu package, rather than the default CDH Impala binary. The following shows how to verify this using the alternatives command on a RHEL 6 host. Do not copy and paste the alternatives
--setcommand directly, because the file names are likely to differ.
$ sudo alternatives --display impala-shell impala-shell - status is auto. link currently points to /opt/cloudera/parcels/CDH-5.4.6-1.cdh5.4.6.p0.1007/bin/impala-shell /opt/cloudera/parcels/CDH-5.4.6-1.cdh5.4.6.p0.1007/bin/impala-shell - priority 10 /opt/cloudera/parcels/IMPALA_KUDU-2.3.0-1.cdh5.4.6.p0.119/bin/impala-shell - priority 5 Current `best' version is /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.6.0.p0.1007/bin/impala-shell. $ sudo alternatives --set impala-shell /opt/cloudera/parcels/IMPALA_KUDU-2.3.0-1.cdh5.4.6.p0.119/bin/impala-shell
- Start Impala Shell using the
impala-shellcommand. By default,
impala-shellattempts to connect to the Impala daemon on localhost on port 21000. To connect to a different host, use the
option. To automatically connect to a specific Impala database, use the
-d <database>option. For instance, if all your Kudu tables are in Impala in the database impala_kudu, use
-d impala_kuduto use this database.
- To quit the Impala Shell, use the following command:
Internal and External Impala Tables
When creating a new Kudu table using Impala, you can create the table as an internal table or an external table.
- Internal: An internal table (created by
CREATE TABLE) is managed by Impala, and can be dropped by Impala. When you create a new table using Impala, it is generally a internal table.
- External: An external table (created by
CREATE EXTERNAL TABL
E) is not managed by Impala, and dropping such a table does not drop the table from its source location (here, Kudu). Instead, it only removes the mapping between Impala and Kudu. This is the mode used in the syntax provided by Kudu for mapping an existing table to Impala.
See the Impala documentation for more information about internal and external tables.
Querying an Existing Kudu Table In Impala
- Go to http://kudu-master.example.com:8051/tables/, where kudu-master.example.com is the address of your Kudu master.
- Click the table ID link for the relevant table.
- Scroll to the bottom of the page, or search for the text Impala
CREATE TABLEstatement. Copy the entire statement.
- Paste the statement into Impala Shell. Impala now has a mapping to your Kudu table.
Creating a New Kudu Table From Impala
Creating a new table in Kudu from Impala is similar to mapping an existing Kudu table to an Impala table, except that you need to write the
CREATE statement yourself. Use the following example as a guideline. Impala first creates the table, then creates the mapping.
This example does not use a partitioning schema. However, you will almost always want to define a schema to pre-split your table.
CREATE TABLE `my_first_table` ( `id` BIGINT, `name` STRING ) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'my_first_table', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'id' );
CREATE TABLE statement, the columns that comprise the primary key must be listed first. Additionally, primary key columns are implicitly marked
The following table properties are required, and the
kudu.key_columns property must contain at least one column.
storage_handler: the mechanism used by Impala to determine the type of data source. For Kudu tables, this must be com.cloudera.kudu.hive.KuduStorageHandler.
kudu.table_name: the name of the table that Impala will create (or map to) in Kudu
kudu.master_addresses: the list of Kudu masters with which Impala should communicate
kudu.key_columns: the comma-separated list of primary key columns, whose contents should not be nullable
CREATE TABLE AS SELECT. You can create a table by querying any other table or tables in Impala, using a
CREATE TABLE AS SELECT query.
The following example imports all rows from an existing table old_table into a Kudu table new_table. The columns in new_table will have the same names and types as the columns in old_table, but you need to populate the
kudu.key_columns property. In this example, the primary key columns are ts and name.
CREATE TABLE new_table AS SELECT * FROM old_table TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'new_table', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'ts, name' );
You can refine the
SELECT statement to only match the rows and columns you want to be inserted into the new table. You can also rename the columns by using syntax like
SELECT name as new_name.
Tables are partitioned into tablets according to a partition schema on the primary key columns. Each tablet is served by at least one tablet server. Ideally, a table should be split into tablets that are distributed across a number of tablet servers to maximize parallel operations. The details of the partitioning schema you use will depend entirely on the type of data you store and how you access it.
Kudu currently has no mechanism for splitting or merging tablets after the table has been created. Until this feature has been implemented, you must provide a partition schema for your table when you create it. When designing your tables, consider using primary keys that will allow you to partition your table into tablets which grow at similar rates
You can partition your table using Impala’s
DISTRIBUTE BY keyword, which supports distribution by
HASH. The partition scheme can contain zero or more
HASH definitions, followed by an optional
RANGE definition. The
RANGE definition can refer to one or more primary key columns. Examples of basic and advanced partitioning are shown below. Note: Impala keywords, such as group, are enclosed by back-tick characters when they are used as identifiers, rather than as keywords.
DISTRIBUTE BY RANGE. You can specify split rows for one or more primary key columns that contain integer or string values. Range partitioning in Kudu allows splitting a table based on the lexicographic order of its primary keys. This allows you to balance parallelism in writes with scan efficiency.
The split row does not need to exist. It defines an exclusive bound in the form of:
(START_KEY, SplitRow), [SplitRow, STOP_KEY)
In other words, the split row, if it exists, is included in the tablet after the split point. For instance, if you specify a split row abc, a row abca would be in the second tablet, while a row abb would be in the first.
Suppose you have a table that has columns state, name, and purchase_count. The following example creates 50 tablets, one per US state. Note: If you partition by range on a column whose values are monotonically increasing, the last tablet will grow much larger than the others. Additionally, all data being inserted will be written to a single tablet at a time, limiting the scalability of data ingest. In that case, consider distributing by HASH instead of, or in addition to, RANGE.
CREATE TABLE customers ( state STRING, name STRING, purchase_count int32, ) DISTRIBUTE BY RANGE(state) SPLIT ROWS(('al'), ('ak'), ('ar'), .., ('wv'), ('wy')) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'customers', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'state, name' );
DISTRIBUTE BY HASH. Instead of distributing by an explicit range, or in combination with range distribution, you can distribute into a specific number of “buckets” by hash. You specify the primary key columns you want to partition by, and the number of buckets you want to use. Rows are distributed by hashing the specified key columns. Assuming that the values being hashed do not themselves exhibit significant skew, this will serve to distribute the data evenly across buckets.
You can specify multiple definitions, and you can specify definitions which use compound primary keys. However, one column cannot be mentioned in multiple hash definitions. Consider two columns, a and b:
- HASH(a), HASH(b) — will succeed
- HASH(a,b) — will succeed
- HASH(a), HASH(a,b) — will fail
DISTRIBUTE BY HASH with no column specified is a shortcut to create the desired number of buckets by hashing all primary key columns.
Hash partitioning is a reasonable approach if primary key values are evenly distributed in their domain and no data skew is apparent, such as timestamps or serial IDs.
The following example creates 16 tablets by hashing the id column. A maximum of 16 tablets can be written to in parallel. In this example, a query for a range of sku values is likely to need to read from all 16 tablets, so this may not be the optimum schema for this table. See Advanced Partitioning for an extended example.
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT ) DISTRIBUTE BY HASH (id) INTO 16 BUCKETS TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'cust_behavior', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'id, sku' );
You can use zero or more
HASH definitions, followed by zero or one
RANGE definitions to partition a table. Each definition can encompass one or more columns. While every possible distribution schema is out of the scope of this document, a few demonstrations follow.
DISTRIBUTE BY RANGE Using Compound Split Rows. This example creates 100 tablets, two for each US state. Per state, the first tablet holds names starting with characters before m, and the second tablet holds names starting with m-z. At least 50 tablets (and up to 100) can be written to in parallel. A query for a range of names in a given state is likely to only need to read from one tablet, while a query for a range of names across every state will likely only read from 50 tablets.
CREATE TABLE customers ( state STRING, name STRING, purchase_count int32, ) DISTRIBUTE BY RANGE(state, name) SPLIT ROWS(('al', ''), ('al', 'm'), ('ak', ''), ('ak', 'm'), .., ('wy', ''), ('wy', 'm')) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'customers', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'state, name' );
DISTRIBUTE BY HASH and RANGE. Let’s go back to the hashing example above. If you often query for a range of sku values, you can optimize the example by combining hash partitioning with range partitioning. The following example still creates 16 tablets, by first hashing the `id` column into 4 buckets, and then applying range partitioning to split each bucket into four tablets, based upon the value of the skustring. At least four tablets (and possibly up to 16) can be written to in parallel, and when you query for a contiguous range of sku values, you have a good chance of only needing to read from 1/4 of the tablets to fulfill the query.
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT ) DISTRIBUTE BY HASH (id) INTO 4 BUCKETS, RANGE (sku) SPLIT ROWS(('g'), ('o'), ('u')) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'cust_behavior', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'id, sku' );
DISTRIBUTE BY HASH Definitions. Again expanding the example above, suppose that the query pattern will be unpredictable, but you want to maximize parallelism of writes. You can achieve even distribution across the entire primary key by hashing on both primary key columns.
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT ) DISTRIBUTE BY HASH (id) INTO 4 BUCKETS, HASH (sku) INTO 4 BUCKETS TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'cust_behavior', 'kudu.master_addresses' = 'kudu-master.example.com:7051', 'kudu.key_columns' = 'id, sku' );
The example creates 16 buckets. You could also use
HASH (id, sku) INTO 16 BUCKETS. However, a scan for sku values would almost always impact all 16 buckets, rather than possibly being limited to 4.
Impala Database Containment Model
Impala uses a database containment model. You can create a table within a specific scope, referred to as a database. To create the database, use a
CREATE DATABASE statement. To use the database for further Impala operations such as
CREATE TABLE, use the
USE statement. For example, to create a table in a database called impala_kudu, use the following statements:
CREATE DATABASE impala_kudu; USE impala_kudu; CREATE TABLE my_first_table ( ...
The my_first_table table is created within the impala_kudu database. To refer to this database in the future, without using a specific USE statement, you can refer to the table using<database>:<table> syntax. For example, to specify the my_first_table table in database impala_kudu, as opposed to any other table with the same name in another database, refer to the table as impala_kudu:my_first_table. This also applies to INSERT, UPDATE, DELETE, and DROP statements.
(Warning: Currently, Kudu does not encode the Impala database into the table name in any way. This means that even though you can create Kudu tables within Impala databases, the actual Kudu tables need to be unique within Kudu. For example, if you create database_1:my_kudu_table and database_2:my_kudu_table, you will have a naming collision within Kudu, even though this would not cause a problem in Impala.)
Impala Keywords Not Support for Kudu Tables
The following Impala keywords are not supported for Kudu tables:
Understanding SQL Operators and Kudu
If your query includes the operators =, <=, or >=, Kudu evaluates the condition directly and only returns the relevant results. Kudu does not yet support <, >, !=, or any other operator not listed.
For these unsupported operations, Kudu returns all results regardless of the condition, and Impala performs the filtering. Since Impala must receive a larger amount of data from Kudu, these operations are less efficient. In some cases, creating and periodically updating materialized views may be the right solution to work around these inefficiencies.
Inserting a Row
The syntax for inserting one or more rows using Impala is shown below.
INSERT INTO my_first_table VALUES (99, "sarah"); INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");
The primary key must not be null.
Inserting in Bulk
When insert in bulk, there are at least three common choices. Each may have advantages and disadvantages, depending on your data and circumstances.
- Multiple Single
INSERTstatements: This approach has the advantage of being easy to understand and implement. This approach is likely to be inefficient because Impala has a high query start-up cost compared to Kudu’s insertion performance. This will lead to relatively high latency and poor throughput.
INSERTstatement with multiple
VALUESsubclauses: If you include more than 1024
VALUESstatements, Impala batches them into groups of 1024 (or the value of batch_size) before sending the requests to Kudu. This approach may perform slightly better than multiple sequential
INSERTstatements by amortizing the query start-up penalties on the Impala side. To set the batch size for the current Impala Shell session, use this syntax:
set batch_size=10000;.(Note: Increasing the Impala batch size causes Impala to use more memory. You should verify the impact on your cluster and tune accordingly.)The “batch insert’ approach that usually performs best, from the standpoint of both Impala and Kudu, is usually to import the data using a
SELECT FROMsubclause in Impala.
- If your data is not already in Impala, one strategy is to import it from a text file, such as a TSV or CSV file.
- Create the Kudu table, being mindful that the columns designated as primary keys cannot have null values.
- Insert values into the Kudu table by querying the table containing the original data, as in the following example:
INSERT INTO my_kudu_table SELECT * FROM legacy_data_import_table;
- Ingest using the C++ or Java API: In many cases, the appropriate ingest path is to use the C++ or Java API to insert directly into Kudu tables. Unlike other Impala tables, data inserted into Kudu tables via the API becomes available for query in Impala without the need for any
INVALIDATE METADATAstatements or other statements needed for other Impala storage types.
INSERT and the IGNORE Keyword
Normally, if you try to insert a row that has already been inserted, the insertion will fail because the primary key would be duplicated (see “Failures During INSERT, UPDATE, and DELETE Operations”.) If an insert fails part of the way through, you can re-run the insert, using the
IGNORE keyword, which will ignore only those errors returned from Kudu indicating a duplicate key.
The first example will cause an error if a row with the primary key `99` already exists. The second example will still not insert the row, but will ignore any error and continue on to the next SQL statement.
INSERT INTO my_kudu_table SELECT * FROM legacy_data_import_table;
Updating a Row
The syntax for updating one or more rows using Impala is shown below.
UPDATE my_first_table SET name="bob" where id = 3;
You cannot change or null the primary key value. (Important: The
UPDATE statement only works in Impala when the underlying data source is Kudu.)
Updating in Bulk
You can update in bulk using the same approaches outlined in “Inserting in Bulk” above.
UPDATE and the IGNORE Keyword
Similar to INSERT and the IGNORE Keyword, you can use the
IGNORE operation to ignore an
UPDATE which would otherwise fail. For instance, a row may be deleted while you are attempting to update it. In Impala, this would cause an error. The IGNORE keyword causes the error to be ignored.
INSERT INTO my_first_table VALUES (99, "sarah"); INSERT IGNORE INTO my_first_table VALUES (99, "sarah");
Deleting a Row
You can delete Kudu rows in near real time using Impala. You can even use more complex joins when deleting.
DELETE FROM my_first_table WHERE id < 3; DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;
Important: The DELETE statement only works in Impala when the underlying data source is Kudu.
Deleting in Bulk
You can delete in bulk using the same approaches outlined in “Inserting in Bulk” above.
DELETE and the IGNORE Keyword
Similar to INSERT and the IGNORE Keyword, you can use the
`IGNORE` operation to ignore an
`DELETE` which would otherwise fail. For instance, a row may be deleted by another process while you are attempting to delete it. In Impala, this would cause an error. The
`IGNORE` keyword causes the error to be ignored.
DELETE IGNORE FROM my_first_table WHERE id < 3;
Failures During INSERT, UPDATE, and DELETE Operations
DELETE statements cannot be considered transactional as a whole. If one of these operations fails part of the way through, the keys may have already been created (in the case of
INSERT) or the records may have already been modified or removed by another process (in the case of
DELETE). You should design your application with this in mind. See INSERT and the IGNORE Keyword.
Altering Table Properties
You can change Impala’s metadata relating to a given Kudu table by altering the table’s properties. These properties include the table name, the list of Kudu master addresses, and whether the table is managed by Impala (internal) or externally. You cannot modify a table’s split rows after table creation. (Important: Altering table properties only changes Impala’s metadata about the table, not the underlying table itself. These statements do not modify any Kudu data.)
Rename a Table
ALTER TABLE my_table RENAME TO my_new_table;
Change the Kudu Master Addresses
ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-original-master.example.com:7051,kudu-new-master.example.com:7051');
Change an Internally-Managed Table to External
ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');
Dropping a Table
If the table was created as an internal table in Impala, using
CREATE TABLE, the standard
DROP TABLE syntax drops the underlying Kudu table and all its data. If the table was created as an external table, using
CREATE EXTERNAL TABLE, the mapping between Impala and Kudu is dropped, but the Kudu table is left intact, with all its data. To change an external table to internal, or vice versa, see Altering Table Properties.
DROP TABLE my_first_table;
The examples above have only explored a fraction of what you can do with Impala Shell. Read about Impala internals or learn how to contribute to Impala on the Impala Wiki.
Misty Stanley-Jones is a Technical Writer at Cloudera, and an Apache HBase committer.