How to Move Data from Relational Databases to DataStax Enterprise / Cassandra using Sqoop
When I'm at conferences, I always have the same conversation. People come up to me who are excited about and sold on using Cassandra, but they want to migrate part or all of a particular RDBMS to Cassandra and they don't know how to go about it. In the past, I've always felt bad that I've never had a great answer for them, but with the release of DataStax Enterprise 2.0, things have gotten much easier. DataStax Enterprise 2.0 includes support for Sqoop, which is a tool designed to transfer data between an RDBMS and Hadoop. Given that DataStax Enterprise combines Cassandra, Hadoop, and Solr together into one big data platform, you can now move data to not only a Hadoop system with Sqoop, but Cassandra as well. Let me show you how it works.
Setting up
Sqoop works via JDBC, so really the only prerequisite you'll have to deal with is downloading the JDBC driver for your source RDBMS (e.g. Oracle, MySQL, SQL Server, etc.) and putting it in directory where sqoop has access to it (we recommend the /sqoop subdirectory of the main DataStax Enterprise installation). For this exercise, I'm going to migrate data from a MySQL database over to DataStax Enterprise, so I downloaded the JDBC driver from the MySQL website, unzipped it, and put it in my /sqoop subdirectory:
robinsmac:sqoop robin$ pwd /Users/robin/dev/dse-2.0/resources/sqoop robinsmac:sqoop robin$ ls -l total 10296 -rw-r--r--@ 1 robin staff 4132 Mar 2 17:05 CHANGES.txt -rw-r--r--@ 1 robin staff 719 Mar 2 17:05 DISCLAIMER.txt -rw-r--r--@ 1 robin staff 15760 Mar 2 17:05 LICENSE.txt -rw-r--r--@ 1 robin staff 251 Mar 2 17:05 NOTICE.txt -rw-r--r--@ 1 robin staff 1096 Mar 2 17:05 README.txt drwxr-xr-x@ 6 robin staff 204 Mar 2 17:05 bin drwxr-xr-x@ 3 robin staff 102 Mar 2 17:05 conf drwxr-xr-x@ 3 robin staff 102 Mar 2 17:05 lib drwxr-xr-x@ 10 robin staff 340 Oct 3 04:44 mysql-connector-java-5.1.18 -rw-r--r--@ 1 robin staff 789885 Oct 3 04:44 mysql-connector-java-5.1.18-bin.jar -rw-r--r--@ 1 robin staff 3834947 Mar 5 16:42 mysql-connector-java-5.1.18.tar.gz -rw-r--r--@ 1 robin staff 604406 Mar 2 17:05 sqoop-1.4.1-dse-20120216.054945-6.jar
Migrating Schema and Data
The MySQL source table that I'm migrating to DataStax Enterprise has a little over 100,000 rows in it and looks like this:
CREATE TABLE `npa_nxx` ( `npa_nxx_key` varchar(16) NOT NULL, `npa` varchar(3) DEFAULT NULL, `nxx` varchar(3) DEFAULT NULL, `lat` varchar(8) DEFAULT NULL, `lon` varchar(8) DEFAULT NULL, `linetype` varchar(1) DEFAULT NULL, `state` varchar(2) DEFAULT NULL, `city` varchar(36) DEFAULT NULL, PRIMARY KEY (`npa_nxx_key`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1$$
The command I'll use to migrate both the table and data to a Cassandra column family is the following:
./dse sqoop import --connect jdbc:mysql://127.0.0.1/dev \ --username root \ --table npa_nxx \ --cassandra-keyspace dev \ --cassandra-column-family npa_nxx_cf \ --cassandra-row-key npa_nxx_key \ --cassandra-thrift-host 127.0.0.1 \ --cassandra-create-schema
The dse command is located in the /bin directory of the DataStax Enterprise install. I first pass the IP address and database of the MySQL server I want to use, followed by the username (I'm not using a password for the super user on MySQL right now; yes, I know, bad practice…) I then indicate what MySQL table I want to migrate. After I've set all the MySQL parameters, I then designate a new Cassandra keyspace to use, followed by the name I want to give my new column family object. Lastly, I tell sqoop what the primary key of the column family will be, the IP address of the Cassandra node I want to connect to, and then pass a parameter telling sqoop to create my new keyspace (note that you can use existing keyspaces if you'd like). The submission produces the following output and end result:
12/03/06 08:58:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 12/03/06 08:58:56 INFO tool.CodeGenTool: Beginning code generation 12/03/06 08:58:56 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `npa_nxx` AS t LIMIT 1 12/03/06 08:58:56 INFO orm.CompilationManager: HADOOP_HOME is /Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/hadoop/bin/.. Note: /tmp/sqoop-robin/compile/2e2b8b85fba83ccf1f52a8ee77c3b12f/npa_nxx.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/03/06 08:58:56 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-robin/compile/2e2b8b85fba83ccf1f52a8ee77c3b12f/npa_nxx.jar 12/03/06 08:58:56 WARN manager.MySQLManager: It looks like you are importing from mysql. 12/03/06 08:58:56 WARN manager.MySQLManager: This transfer can be faster! Use the --direct 12/03/06 08:58:56 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path. 12/03/06 08:58:56 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql) 12/03/06 08:58:57 INFO mapreduce.ImportJobBase: Beginning import of npa_nxx 12/03/06 08:58:58 INFO cfs.CassandraFileSystem: CassandraFileSystem.uri : cfs:/// 12/03/06 08:58:58 INFO config.DatabaseDescriptor: Loading settings from file:/Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/cassandra/conf/cassandra.yaml 12/03/06 08:58:58 INFO config.DatabaseDescriptor: DiskAccessMode 'auto' determined to be mmap, indexAccessMode is mmap 12/03/06 08:58:58 INFO config.DatabaseDescriptor: Global memtable threshold is enabled at 329MB 12/03/06 08:58:58 INFO snitch.DseDelegateSnitch: Setting my role to Cassandra 12/03/06 08:58:58 INFO config.DseConfig: Loading settings from file:/Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/dse/conf/dse.yaml 12/03/06 08:58:58 INFO config.DseConfig: Load of settings is done. 12/03/06 08:58:58 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`npa_nxx_key`), MAX(`npa_nxx_key`) FROM `npa_nxx` 12/03/06 08:58:58 WARN db.TextSplitter: Generating splits for a textual index column. 12/03/06 08:58:58 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records. 12/03/06 08:58:58 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column. 12/03/06 08:58:58 INFO mapred.JobClient: Running job: job_201203051624_0002 12/03/06 08:58:59 INFO mapred.JobClient: map 0% reduce 0% 12/03/06 08:59:05 INFO mapred.JobClient: map 25% reduce 0% 12/03/06 08:59:06 INFO mapred.JobClient: map 50% reduce 0% 12/03/06 08:59:07 INFO mapred.JobClient: map 75% reduce 0% 12/03/06 08:59:08 INFO mapred.JobClient: map 100% reduce 0% 12/03/06 08:59:08 INFO mapred.JobClient: Job complete: job_201203051624_0002 12/03/06 08:59:08 INFO mapred.JobClient: Counters: 14 12/03/06 08:59:08 INFO mapred.JobClient: Job Counters 12/03/06 08:59:08 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13439 12/03/06 08:59:08 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/03/06 08:59:08 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/03/06 08:59:08 INFO mapred.JobClient: Launched map tasks=4 12/03/06 08:59:08 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/03/06 08:59:08 INFO mapred.JobClient: File Output Format Counters 12/03/06 08:59:08 INFO mapred.JobClient: Bytes Written=0 12/03/06 08:59:08 INFO mapred.JobClient: FileSystemCounters 12/03/06 08:59:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=88472 12/03/06 08:59:08 INFO mapred.JobClient: CFS_BYTES_READ=587 12/03/06 08:59:08 INFO mapred.JobClient: File Input Format Counters 12/03/06 08:59:08 INFO mapred.JobClient: Bytes Read=0 12/03/06 08:59:08 INFO mapred.JobClient: Map-Reduce Framework 12/03/06 08:59:08 INFO mapred.JobClient: Map input records=105291 12/03/06 08:59:08 INFO mapred.JobClient: Spilled Records=0 12/03/06 08:59:08 INFO mapred.JobClient: Total committed heap usage (bytes)=340000768 12/03/06 08:59:08 INFO mapred.JobClient: Map output records=105291 12/03/06 08:59:08 INFO mapred.JobClient: SPLIT_RAW_BYTES=587 12/03/06 08:59:08 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 10.724 seconds (0 bytes/sec) 12/03/06 08:59:08 INFO mapreduce.ImportJobBase: Retrieved 105291 records
I can then log into the Cassandra CQL utility and check that my column family and data are there:
robinsmac:bin robin$ ./cqlsh Connected to Test Cluster at localhost:9160. [cqlsh 2.0.0 | Cassandra 1.0.8 | CQL spec 2.0.0 | Thrift protocol 19.20.0] Use HELP for help. cqlsh> use dev; cqlsh:dev> select count(*) from npa_nxx_cf limit 200000; count -------- 105291
Conclusions
That's it. There are a lot more parameters you can use for sqoop; typing ./dse sqoop import help will list them all for you. To try out DataStax Enterprise with Sqoop, download a copy of the software – it's completely free for development use. Thanks for your support of DataStax and Cassandra!