TechnologyMarch 21, 2012

How to Move Data from Relational Databases to DataStax Enterprise / Cassandra using Sqoop

How to Move Data from Relational Databases to DataStax Enterprise / Cassandra using Sqoop
By Robin Schumacher | March 21, 2012

When I'm at conferences, I always have the same conversation. People come up to me who are excited about and sold on using Cassandra, but they want to migrate part or all of a particular RDBMS to Cassandra and they don't know how to go about it. In the past, I've always felt bad that I've never had a great answer for them, but with the release of DataStax Enterprise 2.0, things have gotten much easier. DataStax Enterprise 2.0 includes support for Sqoop, which is a tool designed to transfer data between an RDBMS and Hadoop. Given that DataStax Enterprise combines Cassandra, Hadoop, and Solr together into one big data platform, you can now move data to not only a Hadoop system with Sqoop, but Cassandra as well. Let me show you how it works.

Setting up

Sqoop works via JDBC, so really the only prerequisite you'll have to deal with is downloading the JDBC driver for your source RDBMS (e.g. Oracle, MySQL, SQL Server, etc.) and putting it in directory where sqoop has access to it (we recommend the /sqoop subdirectory of the main DataStax Enterprise installation). For this exercise, I'm going to migrate data from a MySQL database over to DataStax Enterprise, so I downloaded the JDBC driver from the MySQL website, unzipped it, and put it in my /sqoop subdirectory:

robinsmac:sqoop robin$ pwd
/Users/robin/dev/dse-2.0/resources/sqoop
robinsmac:sqoop robin$ ls -l
total 10296
-rw-r--r--@  1 robin  staff     4132 Mar  2 17:05 CHANGES.txt
-rw-r--r--@  1 robin  staff      719 Mar  2 17:05 DISCLAIMER.txt
-rw-r--r--@  1 robin  staff    15760 Mar  2 17:05 LICENSE.txt
-rw-r--r--@  1 robin  staff      251 Mar  2 17:05 NOTICE.txt
-rw-r--r--@  1 robin  staff     1096 Mar  2 17:05 README.txt
drwxr-xr-x@  6 robin  staff      204 Mar  2 17:05 bin
drwxr-xr-x@  3 robin  staff      102 Mar  2 17:05 conf
drwxr-xr-x@  3 robin  staff      102 Mar  2 17:05 lib
drwxr-xr-x@ 10 robin  staff      340 Oct  3 04:44 mysql-connector-java-5.1.18
-rw-r--r--@  1 robin  staff   789885 Oct  3 04:44 mysql-connector-java-5.1.18-bin.jar
-rw-r--r--@  1 robin  staff  3834947 Mar  5 16:42 mysql-connector-java-5.1.18.tar.gz
-rw-r--r--@  1 robin  staff   604406 Mar  2 17:05 sqoop-1.4.1-dse-20120216.054945-6.jar

Migrating Schema and Data

The MySQL source table that I'm migrating to DataStax Enterprise has a little over 100,000 rows in it and looks like this:

CREATE TABLE `npa_nxx` (
  `npa_nxx_key` varchar(16) NOT NULL,
  `npa` varchar(3) DEFAULT NULL,
  `nxx` varchar(3) DEFAULT NULL,
  `lat` varchar(8) DEFAULT NULL,
  `lon` varchar(8) DEFAULT NULL,
  `linetype` varchar(1) DEFAULT NULL,
  `state` varchar(2) DEFAULT NULL,
  `city` varchar(36) DEFAULT NULL,
  PRIMARY KEY (`npa_nxx_key`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1$$

The command I'll use to migrate both the table and data to a Cassandra column family is the following:

./dse sqoop import --connect jdbc:mysql://127.0.0.1/dev 	\
      --username root 						\
      --table npa_nxx                              	  	\
      --cassandra-keyspace dev              		  	\
      --cassandra-column-family npa_nxx_cf         	  	\
      --cassandra-row-key npa_nxx_key              	  	\
      --cassandra-thrift-host 127.0.0.1  			\
      --cassandra-create-schema

The dse command is located in the /bin directory of the DataStax Enterprise install. I first pass the IP address and database of the MySQL server I want to use, followed by the username (I'm not using a password for the super user on MySQL right now; yes, I know, bad practice…) I then indicate what MySQL table I want to migrate. After I've set all the MySQL parameters, I then designate a new Cassandra keyspace to use, followed by the name I want to give my new column family object. Lastly, I tell sqoop what the primary key of the column family will be, the IP address of the Cassandra node I want to connect to, and then pass a parameter telling sqoop to create my new keyspace (note that you can use existing keyspaces if you'd like). The submission produces the following output and end result:

12/03/06 08:58:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
12/03/06 08:58:56 INFO tool.CodeGenTool: Beginning code generation
12/03/06 08:58:56 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `npa_nxx` AS t LIMIT 1
12/03/06 08:58:56 INFO orm.CompilationManager: HADOOP_HOME is /Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/hadoop/bin/..
Note: /tmp/sqoop-robin/compile/2e2b8b85fba83ccf1f52a8ee77c3b12f/npa_nxx.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
12/03/06 08:58:56 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-robin/compile/2e2b8b85fba83ccf1f52a8ee77c3b12f/npa_nxx.jar
12/03/06 08:58:56 WARN manager.MySQLManager: It looks like you are importing from mysql.
12/03/06 08:58:56 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
12/03/06 08:58:56 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
12/03/06 08:58:56 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
12/03/06 08:58:57 INFO mapreduce.ImportJobBase: Beginning import of npa_nxx
12/03/06 08:58:58 INFO cfs.CassandraFileSystem: CassandraFileSystem.uri : cfs:///
12/03/06 08:58:58 INFO config.DatabaseDescriptor: Loading settings from file:/Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/cassandra/conf/cassandra.yaml
12/03/06 08:58:58 INFO config.DatabaseDescriptor: DiskAccessMode 'auto' determined to be mmap, indexAccessMode is mmap
12/03/06 08:58:58 INFO config.DatabaseDescriptor: Global memtable threshold is enabled at 329MB
12/03/06 08:58:58 INFO snitch.DseDelegateSnitch: Setting my role to Cassandra
12/03/06 08:58:58 INFO config.DseConfig: Loading settings from file:/Users/robin/dev/dse-2.0-EAP3-SNAPSHOT/resources/dse/conf/dse.yaml
12/03/06 08:58:58 INFO config.DseConfig: Load of settings is done.
12/03/06 08:58:58 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`npa_nxx_key`), MAX(`npa_nxx_key`) FROM `npa_nxx`
12/03/06 08:58:58 WARN db.TextSplitter: Generating splits for a textual index column.
12/03/06 08:58:58 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
12/03/06 08:58:58 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
12/03/06 08:58:58 INFO mapred.JobClient: Running job: job_201203051624_0002
12/03/06 08:58:59 INFO mapred.JobClient:  map 0% reduce 0%
12/03/06 08:59:05 INFO mapred.JobClient:  map 25% reduce 0%
12/03/06 08:59:06 INFO mapred.JobClient:  map 50% reduce 0%
12/03/06 08:59:07 INFO mapred.JobClient:  map 75% reduce 0%
12/03/06 08:59:08 INFO mapred.JobClient:  map 100% reduce 0%
12/03/06 08:59:08 INFO mapred.JobClient: Job complete: job_201203051624_0002
12/03/06 08:59:08 INFO mapred.JobClient: Counters: 14
12/03/06 08:59:08 INFO mapred.JobClient:   Job Counters 
12/03/06 08:59:08 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13439
12/03/06 08:59:08 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/03/06 08:59:08 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/03/06 08:59:08 INFO mapred.JobClient:     Launched map tasks=4
12/03/06 08:59:08 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
12/03/06 08:59:08 INFO mapred.JobClient:   File Output Format Counters 
12/03/06 08:59:08 INFO mapred.JobClient:     Bytes Written=0
12/03/06 08:59:08 INFO mapred.JobClient:   FileSystemCounters
12/03/06 08:59:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=88472
12/03/06 08:59:08 INFO mapred.JobClient:     CFS_BYTES_READ=587
12/03/06 08:59:08 INFO mapred.JobClient:   File Input Format Counters 
12/03/06 08:59:08 INFO mapred.JobClient:     Bytes Read=0
12/03/06 08:59:08 INFO mapred.JobClient:   Map-Reduce Framework
12/03/06 08:59:08 INFO mapred.JobClient:     Map input records=105291
12/03/06 08:59:08 INFO mapred.JobClient:     Spilled Records=0
12/03/06 08:59:08 INFO mapred.JobClient:     Total committed heap usage (bytes)=340000768
12/03/06 08:59:08 INFO mapred.JobClient:     Map output records=105291
12/03/06 08:59:08 INFO mapred.JobClient:     SPLIT_RAW_BYTES=587
12/03/06 08:59:08 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 10.724 seconds (0 bytes/sec)
12/03/06 08:59:08 INFO mapreduce.ImportJobBase: Retrieved 105291 records

I can then log into the Cassandra CQL utility and check that my column family and data are there:

robinsmac:bin robin$ ./cqlsh
Connected to Test Cluster at localhost:9160.
[cqlsh 2.0.0 | Cassandra 1.0.8 | CQL spec 2.0.0 | Thrift protocol 19.20.0]
Use HELP for help.
cqlsh> use dev;
cqlsh:dev> select count(*) from npa_nxx_cf limit 200000;
 count
--------
 105291

Conclusions

That's it. There are a lot more parameters you can use for sqoop; typing ./dse sqoop import help will list them all for you. To try out DataStax Enterprise with Sqoop, download a copy of the software – it's completely free for development use. Thanks for your support of DataStax and Cassandra!

Discover more
DataStax Enterprise
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.