Using Brian’s cassandra-loader/unloader to migrate C* Maps for DSE Search compatibility
Intro
Using map collections in DSE Search takes advantage of dynamic fields in Apache Solr™ for indexing. For this to work, every key in your map has to be prefixed with the name of the collection. Using an example, this article aims to demonstrate:
- How to create and populate map collections that are compatible with DSE Search
- How to use generateResources to generate the schema and index maps as dynamic fields, and
- How to perform a data migration using Brian's cassandra-loader/unloader for existing data that lacks the prefix required by DSE Search
Note: This same methodology (cassandra-unloader|awk|cassandra-loader) can be used in many different ETL workloads, this is just a common example of that larger group of situations where this may be handy.
Note: This blog post was written targeting DSE 4.8. Please refer to the DataStax documentation for your specific version of DSE if different.
Something to watch out for: Dynamic fields, like Cassandra collections, are not meant to store large amounts of data. The odds are, if you are misusing Apache Cassandra® collections, you will also have problems on the search side with dynamic fields because they tend to create significant heap pressure due to their memory footprint.
Creating and Populating the maps
If you are using a map to store contact information and the name of your map is called contact_info_
, you may have the following table definition:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
CREATE TABLE autogeneratedtest.customers_by_channel ( customer_id uuid, customer_type text, channel_id text, contact_info_ map<text, text>, country_code text, PRIMARY KEY ((customer_id), channel_id) );
and you may have some rows as follows:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
insert into autogeneratedtest.customers_by_channel ( customer_id, customer_type, channel_id, contact_info_, country_code ) VALUES ( uuid(), 'subscription', 'web-direct', { 'email': 'betrio@gmail.com', 'first_name': 'Bill', 'last_name': 'Evans' }, 'USA' ); insert into autogeneratedtest.customers_by_channel ( customer_id, customer_type, channel_id, contact_info_, country_code ) VALUES ( uuid(), 'subscription', 'web-direct', { 'email': 'messengers@gmail.com', 'first_name': 'Art', 'last_name': 'Blakey' }, 'USA' );
In order to index the map with DSE Search, the keys in the map would have to include the prefix contact_info_
as follows:
- 1
- 2
- 3
- 4
- 5
{ 'contact_info_email': 'messengers@gmail.com', 'contact_info_first_name': 'Art', 'contact_info_last_name': 'Blakey' }
Note: for existing systems, adding a prefix to the map's key will require changes in your application code.
Indexing the field with generateResources
In previous version of DSE Search, users had to manually create and upload their own schema.xml
and solrconfig.xml
files with which to index their tables. This process was rather painful because hand crafting xml files is quite error prone. DSP-5373 (released with DSE 4.6.8 and 4.7.1) made it so that you can index a table with a single API call and DSE will take care of generating both your schema.xml
and your solrconfig.xml
automagically.
Use dsetool
or curl
to index a core for the table in one fell swoop:
dsetool create_core autogeneratedtest.customers_by_channel generateResources=true
or
curl "http://<host>:8983/solr/admin/cores?action=CREATE&name=autogeneratedtest.customers_by_channel &generateResources=true"
Protip: If you're using Cassandra authentication, dsetool does not yet work and you'll have to use the curl command.
Data Migration with cassandra-loader/unloader
If your data set is very large, a spark job is a good way of migrating your data (here's an example by Ryan Svhila). That is a topic for another post.
This post will focus on small to medium datasets and simple transformations that are implementable in awk
. Because we can use input and output from stdin / stdout, the combination of the loader, the unloader, and some sed - awk magic can be used as a quick and dirty ETL tool.
Brian's cassandra-loader and cassandra-unloader are a pair of java applications (built using the DataStax java driver). They are easy to use, full featured delimiter bulk loading / unloading tools, built following all the Cassandra / java driver best practices.
Note: Use this source code as a reference architecture when building Java (and other) applications that interact with Cassandra.
First download the binaries and set permissions:
- 1
- 2
- 3
- 4
- 5
wget "https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.17/cassandra-loader" wget "https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.17/cassandra-unloader" sudo chmod +x cassandra*
Thanks Brian for helping optimize the awk
script so that we can pipe directly from unloader to awk to the loader, this makes it so that we don't have to fit the entire dataset in RAM.
Here's how you would run it:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
./cassandra-unloader -f stdout \ -delim "|" \ -host localhost \ -schema "autogeneratedtest.customers_by_channel \ ( \ customer_id, \ customer_type, \ channel_id, \ contact_info_, \ country_code \ )" | \ awk -F "|" '{ \ a=substr($4, 3, length($4)-4); \ nb=split(a, b, ","); \ d=""; sep=""; \ for (i=1; i<=nb; i+=2) { \ c=substr(b[i], 2); \ b[i]="\"contact_info_" c; \ d=d sep b[i] " : " b[i+1]; \ sep=", "; \ } \ for (i=1;i<=3;i++) { \ printf(%s|",$i); \ } \ printf("%s",d); \ for (i=5;i<=NF;i++) { \ printf("|%s", $i); \ } \ printf("\n"); \ }' | \ ./cassandra-loader \ -f stdin \ -delim "|" \ -host localhost \ -schema "autogeneratedtest.customers_by_channel2( \ customer_id, \ customer_type, \ channel_id, \ contact_info_, \ country_code \ )"
The result is a new table with the map keys prefixed by the name of the map column contactinfo.
The loader and unloader will use the number of threads = cpu cores in your box and will handle 1000 in flight futures. This and other advanced options are configurable but the defaults should work fine (especially if you run this from a separate box).
Enjoy!