Hadoop MapReduce in the Cassandra Cluster
NOTE: This post pre-dates our Brisk announcement. Brisk is a new distribution that enhances the Hadoop and Hive platform with scalable low-latency data capabilities.
In our December post, we introduced the class ColumnFamilyInputFormat and fleshed it out in the context of a detailed MapReduce example. If you are interested in running MapReduce with Cassandra and haven’t read that post yet, it is highly recommended. The word count example available from the Cassandra source download in ‘contrib/word_count’ is an excellent resource for getting started.
However, when the time comes to run your own MapReduce job in a large Cassandra cluster, you may find that you still have some questions about configuring all the moving parts. This blog describes how to deploy and distribute the required Hadoop componentry within your Cassandra cluster in a way that maximizes gains in efficiency and processing times.
Hadoop/Cassandra Cluster Configuration
The recommended cluster configuration essentially overlays Hadoop over Cassandra. This involves installing a Hadoop TaskTracker on each Cassandra node. Also – and this is important – one server in the cluster should be dedicated to the following Hadoop components:
- JobTracker
- datanode
- namenode
This dedicated server is required because Hadoop uses HDFS to store JAR dependencies for your job, static data, and other required information. In the overall context of your cluster, this is a very small amount of data, but it is critical to running a MapReduce job.
Hadoop TaskTrackers and Cassandra Nodes
Running a Hadoop TaskTracker on a Cassandra node requires you to update the HADOOP_CLASSPATH in <hadoop>/conf/hadoop-env.sh to include the Cassandra libraries. For example, add an entry like the following in the hadoop-env.sh on each of the task tracker nodes:
export HADOOP_CLASSPATH=/opt/cassandra/lib/*:$HADOOP_CLASSPATH
When a Hadoop TaskTracker runs on the same servers as the Cassandra nodes, each TaskTracker is sent tasks only for data belonging to the token range in the local Cassandra node. This allows tremendous gains in efficiency and processing times, as the Cassandra nodes receive only queries for which they are the primary replica, avoiding the overhead of the Gossip protocol.
Handling Input and Output from Cassandra
As we noted in the December post, the input/output formats introduced in 0.6 and 0.7 are central to Cassandra’s Hadoop support. The class org.apache.cassandra.hadoop.ColumnFamilyInputFormat allows you to read data stored in Cassandra from a Hadoop MapReduce job, and its companion class org.apache.cassandra.hadoop.ColumnFamilyOutputFormat allows you to write the results back into Cassandra. These two classes should be properly set in your code as the format class for input and/or output:
job.setInputFormatClass(ColumnFamilyInputFormat.class); job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
In the MapReduce job, Cassandra rows or row fragments (pairs of key + SortedMap of columns) can be input to Map tasks for processing, as specified by a SlicePredicate that describes which columns to fetch from each row. For example:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Beyond MapReduce
In addition to MapReduce, Cassandra currently supports Pig with its own implementation of LoadFunc. For an example of this functionality, see the ‘contrib/pig’ example in the Cassandra source download.
Also, work is underway in the Cassandra project to add support for Hive. Though the details are still TBD, Hive integration looks very promising for the future.
In fact, your author would not be suprised to soon see Hadoop/Cassandra cluster configuration packaged and automated in ways that make the setup tasks much simpler than what we have described above. This post, like so much in the fast-moving Cassandra world, is racing against obsolescence.