TechnologyJuly 22, 2013

Support CQL3 tables in Hadoop, Pig and Hive

Alex Liu
Alex Liu
Support CQL3 tables in Hadoop, Pig and Hive

The evolutions of Cassandra querying mechanism

 

Thrift API

 

The first generation is through the thrift API such as batch_mutate, get, get_slice, insert, multiget_slice, remove and other methods. It's a low level querying mechanism which directly queries on the Cassandra baseline storage such as ColumnOrSuperColumn, key, SlicePredicate, ColumnParent and more which makes it's not efficient to develop application on top of it though we have a few of Cassandra client libraries which simplify the user experience of querying Cassandra storage.

 

CQL

 

The fist step to address the inefficience of low level thrift querying API is CQL which is introduced in Cassandra 0.8 release. It creates a modern query language with schema and it's fully backward compatible with thrift column families. It uses higher level database like query language to query Cassandra storage, so tradition database developers feel much easier to adopt Cassandra.

 

CQL3

 

The third generation is CQL3, It "transposes" wide rows and unpacks them are into named columns. CQL3 under the hood uses composite columns and composite keys. CQL3 also has other features such as collectionsbinary cql protocol and many other features.

 

The first generation of Cassandra Hadoop driver

 

The first generation is based on the thrift column family based first generation of Cassandra querying mechanism. Because of CQL is backward compatible to thrift column family, the thrift column family based Hadoop support works pretty well with CQL. But it doesn't provide any higher level abstraction for composite keys and composite columns. The decomposing composite keys or composite columns is done at client side code such as UDF. To decompose the composite keys and composite columns, the developer must use the Cassandra internal data type java classes which is hard for many application developers. The second generation of Cassandra Hadoop driver addresses this issue by using CQL3 as a high level abstraction layer to access Cassadnra storage.

 

The second generation of Cassandra Hadoop driver

 

The second generation is based on CQL3. The decomposing composite keys and composite columns is done at CQL3 under the hood. It also uses CQL3 paging mechanism to page through wide rows in a more natural way.

We creates a few new classes: CqlPagingInputFormat, CqlPagingRecordReader, CqlOutputFormat and CqlRecordWriter.

 

Input format

 

The input format is Map<String, ByteBuffer>, Map<String, ByteBuffer>. The first map is the name to value mapping of partition keys (columns) and cluster columns. The second map is the name to value mapping of other columns.

 

CQL3 pagination

 

Up to 1.2.6 release CQL3 doesn't provide an auto pagination, so I use the following algorithm to page through the CQL3 tables. The basic idea is to use CQL3 query on the partition columns and cluster columns and store the value of partition columns and cluster columns when the page ends, then use those values to compose the CQL3 query for next page.

Let say we have a table with the following keys

PRIMARY KEY (m, n , o, p)
where m is partition column.
n, o, p are the cluster columns


We want to run the following query
select * from test

The first query is

SELECT * from test
WHERE token(m) > start_token_of_split
AND token(m) < end_token_of_split LIMIT 1000

We store the last value of m, n, o, p as m_end, n_end, o_end and p_end after the first query is done.

The next query is

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o = o_end
AND p > p_end
LIMIT 1000

If it reaches the end of p, the next query

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o > o_end
LIMIT 1000


else, we use the next value of p as p_end1 for the next query

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o = o_end
AND p > p_end1
LIMIT 1000


Until it reaches the end of n, the next query is

SELECT * from test
WHERE token(m) > token(m_end)
AND token(m) < end_token_of_split LIMIT 1000

then we continue the same loop until it reaches the end of split.

For the tables has more than one columns in the partition columns

PRIMARY KEY((m, n), o, p)
where m and n are partition columns.
o and p are clustering columns.


We use the following query

SELECT * from test
WHERE token(m, n) > token(m_end, n_end)
AND token(m, n) < end_token_of_split LIMIT 1000

If cluster columns are defined as descending, the above queries should change to use less comparing operator.

CQL3 auto paging through native protocol had been developed, so we will have this auto paging mechanism plugged into CqlPagingRecordReader soon.

 

Input parameters


Some input parameters can be configured through CqlConfigHelper.

CqlConfigHelper.setInputColumns -- select specific columns
CqlConfigHelper.setInputCQLPageRowSize -- the number rows per page
CqlConfigHelper.setInputWhereClauses -- the where clauses on the indexed columns

 

 

Output format

 

The input format is Map<String, ByteBuffer>, List<ByteBuffer>. The map is the name to value mapping of partition keys (columns) and cluster columns. The list is the list of values of other columns. CqlRecordWriter takes the values of columns and bind them to the prepared CQL query.

 

Output parameters


The output CQL query be configured through CqlConfigHelper.

CqlConfigHelper.setOutputCql

 

 

The first generation of Pig Cassandra driver

 

The firs generation Pig Cassandra driver is based on the thrift column family based first generation of Hadoop Cassandra driver which uses CassandraStorage class.

 

CQL3 table support

 

The first generation uses thrift describe_keyspace to get the metadata of the column families. Because CQL3 table is not shown in the result list of decribe_keyspace thrift API call, the first generation Pig Cassandra driver can't query on the CQL3 tables. We query on system tables of system.schema_columnfamilies and system.schema_columns to retrieve the metadata of CQL3 tables to fix that. Because CQL3 uses composite columns and composite keys under the hood, it's not efficient to use the first generation Pig Cassandra driver for CQL3 tables.

 

The second generation of Pig Cassandra driver

 

The second generation is based on the CQL3 based second generation of Hadoop Cassandra driver which uses CqlStorage class.

 

CQL3 table support

 

Because the second generation of Hadoop Cassandra driver uses CQL3 under the hood, we can easy decompose the composite keys and composite columns by using CQL3 queries.

The url format for the CqlStorage is

cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
[&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
[&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]


Where

page_size -- the number of rows per page
columns -- the select columns of CQL query
where_clause -- the where clause on the index columns, it needs url encoding
split_size -- number of rows per split
partitioner -- cassandra partitioner
use_secondary -- enable pig filter partition push down
output_query -- the CQL query for write in a prepared statement format

 

Schema


Input schema is

(value, value, value) where each value schema has the name of the column name, and value of the column value.

Output schema is

(((name, value), (name, value)), (value ... value), (value...value))
where the first tuple is the map of partition columns and clustering columns.
the rest tuples are the list of binded values for the output prepared CQL query

 

 

Pig partition filter push down

 

Set use_seconday to true to enable it.
We generate the where_clause from Pig partition filter queries, and set it to CqlConfigHelp.setInputWhereClauses. The partition filter queries are pushed down to CqlPagingRecordRead which sends back less data to Pig.

 

The first generation of Hive Cassandra driver

 

The first generation is based on the first generation of Hadoop Cassandra driver which uses the thrift column families. We need use the second generation of Hadoop Cassandra driver to improve the query on composite columns which CQL3 table use under the hood.

 

The second generation of Hive Cassandra driver

 

The second generation uses the second generation of Hadoop Cassandra driver to query on CQL3 tables. Basically It set the input and output CQL query and map the input and output value to Hive data type.
All metadata are retrieved from system tables of system.schema_columnfamilies and system.schems_columns.
All CQL3 tables have auto generated Hive tables using CqlStorageHandler which has the following parameters


cassandra.page.size -- the number of rows per page for CqlPagingRecordReader
cassandra.cql3.type -- the CQL data types for the mapped Hive columns
cassandra.columns.mapping -- the CQL columns for the mapped Hive columns

The push down condition will be implemented the similar way as Pig partition filter push down. We will also expand the default mappings to include collections.
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.