TechnologyOctober 27, 2015

Using Brian’s cassandra-loader/unloader to migrate C* Maps for DSE Search compatibility

Using Brian’s cassandra-loader/unloader to migrate C* Maps for DSE Search compatibility

Intro

Using map collections in DSE Search takes advantage of dynamic fields in Apache Solr™ for indexing. For this to work, every key in your map has to be prefixed with the name of the collection. Using an example, this article aims to demonstrate:

  1. How to create and populate map collections that are compatible with DSE Search
  2. How to use generateResources to generate the schema and index maps as dynamic fields, and
  3. How to perform a data migration using Brian's cassandra-loader/unloader for existing data that lacks the prefix required by DSE Search

Note: This same methodology (cassandra-unloader|awk|cassandra-loader) can be used in many different ETL workloads, this is just a common example of that larger group of situations where this may be handy.

Note: This blog post was written targeting DSE 4.8. Please refer to the DataStax documentation for your specific version of DSE if different.

Something to watch out for: Dynamic fields, like Cassandra collections, are not meant to store large amounts of data. The odds are, if you are misusing Apache Cassandra® collections, you will also have problems on the search side with dynamic fields because they tend to create significant heap pressure due to their memory footprint.

Creating and Populating the maps

If you are using a map to store contact information and the name of your map is called contact_info_, you may have the following table definition:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
CREATE TABLE autogeneratedtest.customers_by_channel (  
    customer_id uuid,
    customer_type text,
    channel_id text,
    contact_info_ map<text, text>,
    country_code text,
    PRIMARY KEY ((customer_id), channel_id)
);

and you may have some rows as follows:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
insert into autogeneratedtest.customers_by_channel (  
    customer_id,
    customer_type,
    channel_id,
    contact_info_,        
    country_code
)
VALUES (  
    uuid(), 
    'subscription', 
    'web-direct',
    {
        'email': 'betrio@gmail.com',
        'first_name': 'Bill',
        'last_name': 'Evans'
    },
    'USA'
);

insert into autogeneratedtest.customers_by_channel (  
    customer_id,
    customer_type,
    channel_id,
    contact_info_,
    country_code
) 
VALUES (  
    uuid(),
    'subscription',
    'web-direct',
    {
        'email': 'messengers@gmail.com',
        'first_name': 'Art',
        'last_name': 'Blakey'
    },
    'USA'
);

In order to index the map with DSE Search, the keys in the map would have to include the prefix contact_info_ as follows:

  • 1
  • 2
  • 3
  • 4
  • 5
{
    'contact_info_email': 'messengers@gmail.com', 
    'contact_info_first_name': 'Art',
    'contact_info_last_name': 'Blakey'
}

Note: for existing systems, adding a prefix to the map's key will require changes in your application code.

Indexing the field with generateResources

In previous version of DSE Search, users had to manually create and upload their own schema.xml and solrconfig.xml files with which to index their tables. This process was rather painful because hand crafting xml files is quite error prone. DSP-5373 (released with DSE 4.6.8 and 4.7.1) made it so that you can index a table with a single API call and DSE will take care of generating both your schema.xml and your solrconfig.xml automagically.

Use dsetool or curl to index a core for the table in one fell swoop:

dsetool create_core autogeneratedtest.customers_by_channel generateResources=true

or

curl "http://<host>:8983/solr/admin/cores?action=CREATE&name=autogeneratedtest.customers_by_channel &generateResources=true"

Protip: If you're using Cassandra authentication, dsetool does not yet work and you'll have to use the curl command.

Data Migration with cassandra-loader/unloader

If your data set is very large, a spark job is a good way of migrating your data (here's an example by Ryan Svhila). That is a topic for another post.

This post will focus on small to medium datasets and simple transformations that are implementable in awk. Because we can use input and output from stdin / stdout, the combination of the loader, the unloader, and some sed - awk magic can be used as a quick and dirty ETL tool.

Brian's cassandra-loader and cassandra-unloader are a pair of java applications (built using the DataStax java driver). They are easy to use, full featured delimiter bulk loading / unloading tools, built following all the Cassandra / java driver best practices.

Note: Use this source code as a reference architecture when building Java (and other) applications that interact with Cassandra.

First download the binaries and set permissions:

  • 1
  • 2
  • 3
  • 4
  • 5
wget "https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.17/cassandra-loader"

wget "https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.17/cassandra-unloader"

sudo chmod +x cassandra*

Thanks Brian for helping optimize the awk script so that we can pipe directly from unloader to awk to the loader, this makes it so that we don't have to fit the entire dataset in RAM.

Here's how you would run it:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
./cassandra-unloader -f stdout \
    -delim "|" \
    -host localhost \
    -schema "autogeneratedtest.customers_by_channel \
    (    \
        customer_id,    \
        customer_type,    \
        channel_id,    \
        contact_info_,    \
        country_code    \
    )" | \
awk -F "|" '{  \  
    a=substr($4, 3, length($4)-4);    \
    nb=split(a, b, ",");    \
    d=""; sep="";     \
    for (i=1; i<=nb; i+=2) {    \
        c=substr(b[i], 2);    \
        b[i]="\"contact_info_" c;    \
        d=d sep b[i] " : " b[i+1];    \
        sep=", ";    \
    }     \
    for (i=1;i<=3;i++) {    \
        printf(%s|",$i);    \
    }     \
    printf("%s",d);    \
    for (i=5;i<=NF;i++) {    \
        printf("|%s", $i);    \
    }     \
    printf("\n");    \
}' |    \
./cassandra-loader    \
    -f stdin    \
    -delim "|"    \
    -host localhost    \
    -schema "autogeneratedtest.customers_by_channel2(    \
    customer_id,    \
    customer_type,    \
    channel_id,    \
    contact_info_,    \
    country_code    \
)"

The result is a new table with the map keys prefixed by the name of the map column contactinfo.

The loader and unloader will use the number of threads = cpu cores in your box and will handle 1000 in flight futures. This and other advanced options are configurable but the defaults should work fine (especially if you run this from a separate box).

Enjoy!

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.