Cassandra and Pig Tutorial
Exploring Data in Cassandra using Pig with DataStax Enterprise 3.1
Introduction
Often my wife and I like to play a a little guessing game. One of us will think up an odd question like “How many sidewalk squares are in Oakland?” or “How much bread was eaten in California today” and then we’ll work together to get a first order estimate on what that number might be. Usually we agree and we move on to another question, but sometimes we need to find an exact answer to settle who was the better guesser. Luckily, we live in an age of data and it is possible to find the answers to some problems with great precision.
Take for example the question “What portion of California is covered by libraries?” My guess would be 10^-6 of California is covered by libraries [1], but can we know for sure? Thanks to the Institute of Library and Museum Services this question can be answered quite accurately and with high precision. While the test set and computation required to determine the answer are both relatively small, they provide a general example of how one would go about using Cassandra and Pig for analysis.
I’ll be doing the actual computation using Datastax Enterprise 3.1.2 (Free for development purposes, so feel free to follow along!) I'll be running on a standard Macbook Pro as a stand-alone process. We’ll be using Pig so make sure to start your cluster with Analytics enabled.
Loading the data into Cassandra
First thing to do is download the latest library datafile . I choose a CSV file to demonstrate the COPY function available in Cassandra, although we could have used the other versions after converting them to JSON. We also need to do a quick encoding change from Latin-1 to UTF-8, the default encoding for text in Cassandra.
From a shell prompt
unzip pupld11a_csv.zip iconv -f LATIN1 -t UTF-8 puout11a.csv > puout11a.utf8.csv
Now that our file is prepared, we open cqlsh, the python based shell for Cassandra.
[~/BlogPosts/CassPig_Libraries]cqlsh Connected to Test Cluster at localhost:9160. [cqlsh 3.1.2 | Cassandra 1.2.6.3 | CQL spec 3.0.0 | Thrift protocol 19.36.0] Use HELP for help. cqlsh>
Now let's make a keyspace to hold our data. Since we are running locally and we won’t be using this for any production purposes, I'll be using just a SimpleStrategy with a replication factor of 1. This means there will only be a single copy of the data in our datacenter. Since our datacenter is only a single machine all of the data we insert will be held locally. We then use the USE command to access our newly configured keyspace.
cqlsh> CREATE KEYSPACE libdata WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; cqlsh> use libdata ;
Now we’ll load in the file containing all library service outlets (includes “central, branch, bookmobile, and books-by-mail-only outlets”). Since our data file is small (only ~18k records) it is appropriate to use the cqlsh COPY command. If we had more data we would use a bulk loading process . To prepare Cassandra for our data we build a table using the CREATE TABLE command. Once this scaffolding is in place, the data is upserted into the table with the COPY command.
You’ll notice we set the PRIMARY KEY in the table as a combination of FSCSKEY and FSCS_SEQ. Together, these two codes represent a unique identifier for an individual library branch/outlet or bookmobile. If we used a non-unique combination for our primary key, key duplicates would overwrite previously inserted values. I also account for every column in the CSV in the table creation but this isn't necessary.
cqlsh:libdata> CREATE TABLE libout ("STABR" TEXT,"FSCSKEY" TEXT,"FSCS_SEQ" TEXT,"LIBID" TEXT,"LIBNAME" TEXT, "ADDRESS" TEXT,"CITY" TEXT,"ZIP" TEXT,"ZIP4" TEXT,"CNTY" TEXT, "PHONE" TEXT, "C_OUT_TY" TEXT,"C_MSA" TEXT,"SQ_FEET" INT,"F_SQ_FT" TEXT,"L_NUM_BM" INT, "F_BKMOB" TEXT,"HOURS" INT,"F_HOURS" TEXT,"WKS_OPEN" INT, "F_WKSOPN" TEXT, "YR_SUB" INT,"STATSTRU" INT,"STATNAME" INT,"STATADDR" INT,"LONGITUD" FLOAT, "LATITUDE" FLOAT,"FIPSST" INT,"FIPSCO" INT,"FIPSPLAC" INT,"CNTYPOP" INT, "LOCALE" TEXT,"CENTRACT" FLOAT, "CENBLOCK" INT,"CDCODE" TEXT,"MAT_CENT" TEXT, "MAT_TYPE" INT,"CBSA" INT,"MICROF" TEXT, PRIMARY KEY ("FSCSKEY","FSCS_SEQ")); cqlsh:libdata> cqlsh:libdata> COPY libout ("STABR","FSCSKEY","FSCS_SEQ","LIBID","LIBNAME","ADDRESS","CITY", "ZIP","ZIP4","CNTY","PHONE","C_OUT_TY","C_MSA","SQ_FEET","F_SQ_FT", "L_NUM_BM","F_BKMOB","HOURS","F_HOURS","WKS_OPEN","F_WKSOPN","YR_SUB", "STATSTRU","STATNAME","STATADDR","LONGITUD","LATITUDE","FIPSST", "FIPSCO","FIPSPLAC","CNTYPOP","LOCALE","CENTRACT","CENBLOCK","CDCODE", "MAT_CENT","MAT_TYPE","CBSA","MICROF") FROM 'puout11a.utf8.csv' WITH HEADER=TRUE; 17598 rows imported in 24.118 seconds. cqlsh:libdata>
To check that all the rows are there we can do a select from CQLSH. Note that we have to specify LIMIT 20000 to override the default max of 10000 returned rows in CQLSH.
cqlsh:libdata> SELECT count(*) FROM libdata.libout LIMIT 20000; count ------- 17598
All of the data is in Cassandra! We can do some queries like
cqlsh:libdata> SELECT "CITY","C_OUT_TY" FROM libout LIMIT 10; CITY | C_OUT_TY ----------------+---------- MANTI | CE GREENSBURG | BR CLINTON | CE JACKSON | BR CLINTON | BS OGDEN | CE FREDERICKSBURG | CE CRESWELL | CE SHERIDAN | CE FITCHBURG | CE
But you’ll notice that CQL3 won’t let us do queries with filters and alike on non-indexed columns. For example:
cqlsh:libdata> SELECT * FROM libout WHERE "CITY" = 'HIGHLAND PARK'; Bad Request: No indexed columns present in by-columns clause with Equal operator
We could fix this by indexing rows or by changing the structure of our table schema, but instead we’ll move over to PIG and perform our more advanced queries there. For more information on how to model your data in Cassandra watch "The Data Model is Dead, Long Live the Data Model". It and other helpful webinars can also be found in our helpful resources section www.datastax.com.
Querying Cassandra Using Pig
We'll begin by starting the pig client through Datastax Enterprise. This requires no setup beyond having started the cluster in Analytics mode.
(14:52:17)[~/BlogPosts/CassPig_Libraries]dse pig 2013-08-26 14:52:27,166 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/russellspitzer/BlogPosts/CassPig_Libraries/pig_1377553947163.log 2013-08-26 14:52:27,421 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: cfs://127.0.0.1/ 2013-08-26 14:52:27.488 java[64588:1503] Unable to load realm info from SCDynamicStore 2013-08-26 14:52:28,348 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 127.0.0.1:8012 grunt>
Next we construct our pig commands, starting with loading our data from Cassandra. We’ll be using the cql:// url and the CqlStorage() connector. The format of the command is basically load ‘cql://keyspace/table’. More info on CQL3 and Pig.
grunt> libdata = load 'cql://libdata/libout' USING CqlStorage(); grunt> DESCRIBE libdata; libdata: {FSCSKEY: (name: chararray,value: chararray),ADDRESS: (name: chararray,value: chararray),CBSA: (name: chararray,value: int),CDCODE: (name: chararray,value: chararray),CENBLOCK: (name: chararray,value: int),CENTRACT: (name: chararray,value: float),CITY: (name: chararray,value: chararray),CNTY: (name: chararray,value: chararray),CNTYPOP: (name: chararray,value: int),C_MSA: (name: chararray,value: chararray),C_OUT_TY: (name: chararray,value: chararray),FIPSCO: (name: chararray,value: int),FIPSPLAC: (name: chararray,value: int),FIPSST: (name: chararray,value: int),FSCS_SEQ: (name: chararray,value: chararray),F_BKMOB: (name: chararray,value: chararray),F_HOURS: (name: chararray,value: chararray),F_SQ_FT: (name: chararray,value: chararray),F_WKSOPN: (name: chararray,value: chararray),HOURS: (name: chararray,value: int),LATITUDE: (name: chararray,value: float),LIBID: (name: chararray,value: chararray),LIBNAME: (name: chararray,value: chararray),LOCALE: (name: chararray,value: chararray),LONGITUD: (name: chararray,value: float),L_NUM_BM: (name: chararray,value: int),MAT_CENT: (name: chararray,value: chararray),MAT_TYPE: (name: chararray,value: int),MICROF: (name: chararray,value: chararray),PHONE: (name: chararray,value: chararray),SQ_FEET: (name: chararray,value: int),STABR: (name: chararray,value: chararray),STATADDR: (name: chararray,value: int),STATNAME: (name: chararray,value: int),STATSTRU: (name: chararray,value: int),WKS_OPEN: (name: chararray,value: int),YR_SUB: (name: chararray,value: int),ZIP: (name: chararray,value: chararray),ZIP4: (name: chararray,value: chararray)}
Currently CqlStorage() returns tuples of the form (name:chararray, value:type) so whenever we want to access the value of a column we need to add a “.value” to our column. We can now perform almost any query on the data regardless of indices. For example if we wanted to figure out how many libraries are “Books By Mail Only” in our database. We could check to see how many entries have their outlet type column (C_OUT_TY) set to book by mail (“BM”).
DSE 3.1.4 includes a newer version of CqlStorage which utilizes a different format for outputting data. In this version rows will be returned as tuples with the following schema (keyname:value colname1:value colname2:value ...) This makes it simpler to access values and takes up less space. The only difference in accessing the data is that you no longer need to add a .value when obtaining the value of a column.
grunt> BookByMail = FILTER libdata BY C_OUT_TY.value == 'BM'; DSE 3.1.4 grunt> BookByMail = FILTER libdata BY C_OUT_TY == 'BM';
You’ll notice that this doesn’t actually have any output. This is because Pig won’t actually run anything until you tell it to save or write data somewhere. This allows pig to group and optimize operations, but since we aren't doing anything else with this query let’s tell Pig to output to the screen with the DUMP command.
grunt> DUMP BookByMail; ... Input(s): Successfully read 17598 records from: "cql://libdata/libout" Output(s): Successfully stored 3 records in: "cfs://127.0.0.1/tmp/temp-426125012/tmp-1452303022" Counters: Total records written : 3 Total bytes written : 0 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_201308261035_0005 2013-08-26 15:32:12,831 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 1 time(s). 2013-08-26 15:32:12,831 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2013-08-26 15:32:12,836 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2013-08-26 15:32:12,836 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 ((FSCSKEY,OR0115),(FSCS_SEQ,002),(ADDRESS,1988 NEWMARK, TIOGA 3RD FLOOR),(CBSA,18300),(CDCODE,4104),(CENBLOCK,2015),(CENTRACT,5.04),(CITY,COOS BAY),(CNTY,COOS),(CNTYPOP,62795),(C_MSA,NO),(C_OUT_TY,BM),(FIPSCO,11),(FIPSPLAC,15250),(FIPSST,41),(F_BKMOB,R_11),(F_HOURS,R_11),(F_SQ_FT,U_11),(F_WKSOPN,R_11),(HOURS,1981),(LATITUDE,43.38449),(LIBID,OR0115),(LIBNAME,COOS COUNTY LIBRARY SERVICE DISTRICT),(LOCALE,33),(LONGITUD,-124.25983),(L_NUM_BM,0),(MAT_CENT,4),(MAT_TYPE,14),(MICROF,1),(PHONE,5418887393),(SQ_FEET,-3),(STABR,OR),(STATADDR,0),(STATNAME,0),(STATSTRU,0),(WKS_OPEN,52),(YR_SUB,2012),(ZIP,97420),(ZIP4,2971)) ((FSCSKEY,AZ0042),(FSCS_SEQ,014),(ADDRESS,2153 GORDON AVE, STE. F),(CBSA,29420),(CDCODE,0404),(CENBLOCK,1027),(CENTRACT,9539.0),(CITY,KINGMAN),(CNTY,MOHAVE),(CNTYPOP,202592),(C_MSA,NC),(C_OUT_TY,BM),(FIPSCO,15),(FIPSPLAC,0),(FIPSST,4),(F_BKMOB,R_11),(F_HOURS,R_11),(F_SQ_FT,U_11),(F_WKSOPN,R_11),(HOURS,2806),(LATITUDE,35.239407),(LIBID,BM-BOOKS),(LIBNAME,BOOKS BY MAIL),(LOCALE,33),(LONGITUD,-114.02741),(L_NUM_BM,0),(MAT_CENT,0),(MAT_TYPE,1),(MICROF,0),(PHONE,8005258987),(SQ_FEET,-3),(STABR,AZ),(STATADDR,0),(STATNAME,0),(STATSTRU,0),(WKS_OPEN,0),(YR_SUB,2012),(ZIP,86409),(ZIP4,2561)) ((FSCSKEY,MO0004),(FSCS_SEQ,037),(ADDRESS,15616 E. US HWY 24),(CBSA,28140),(CDCODE,2905),(CENBLOCK,1019),(CENTRACT,113.0),(CITY,INDEPENDENCE),(CNTY,JACKSON),(CNTYPOP,675300),(C_MSA,NC),(C_OUT_TY,BM),(FIPSCO,95),(FIPSPLAC,35000),(FIPSST,29),(F_BKMOB,R_11),(F_HOURS,R_11),(F_SQ_FT,U_11),(F_WKSOPN,R_11),(HOURS,2080),(LATITUDE,39.111526),(LIBID,MO0004-037),(LIBNAME,BOOKS BY MAIL),(LOCALE,21),(LONGITUD,-94.39066),(L_NUM_BM,0),(MAT_CENT,0),(MAT_TYPE,1),(MICROF,0),(PHONE,8168365200),(SQ_FEET,-3),(STABR,MO),(STATADDR,0),(STATNAME,0),(STATSTRU,0),(WKS_OPEN,52),(YR_SUB,2012),(ZIP,64050),(ZIP4,2057))
There we go! We see that there were only 3 book by mail only locations in the 2011 survey. But if we look at the Square Footage for this example we see (SQ_FEET,-3). These locations don’t actually have any physical library land so the survey notes a -3 refers to "Not Applicable" value. We’ll have to make sure that we filter out these negative values.
grunt> libdata_buildings = FILTER libdata BY SQ_FEET.value > 0;
DSE 3.1.4
grunt> libdata_buildings = FILTER libdata BY SQ_FEET > 0;
As a quick aside, we can also do that previous query which was not compatible with our Cassandra schema. In addition we can remove various columns from our output if we wish.
grunt> HP_Library = FILTER libdata BY CITY.value == 'HIGHLAND PARK'; grunt> HP_Names = FOREACH HP_Library GENERATE STABR.value as State, CITY.value as City, LIBNAME.value as Name; grunt> dump HP_Names; ... (TX,HIGHLAND PARK,HIGHLAND PARK LIBRARY) (IL,HIGHLAND PARK,HIGHLAND PARK PUBLIC LIBRARY) (NJ,HIGHLAND PARK,HIGHLAND PARK PUBLIC LIBRARY)
Now to actually get down to computing how many square feet of each state are actually libraries. This requires a few steps:
- Extract from each row tuple only the values for state and square footage
- Group those tuples into bags by state
- Sum all the results for each state.
- Print the results
grunt> state_flat = FOREACH libdata_buildings GENERATE STABR.value as State,SQ_FEET.value as SquareFeet; grunt> state_grouped = GROUP state_flat BY State; grunt> state_footage = FOREACH state_grouped GENERATE group as State, SUM(state_flat.SquareFeet)as TotalFeet:int; grunt> dump state_footage; ... Input(s): Successfully read 17598 records from: "cql://libdata/libout" Output(s): Successfully stored 55 records in: "cfs://127.0.0.1/tmp/temp-426125012/tmp-2106217306" Counters: Total records written : 55 Total bytes written : 0 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_201308261035_0007 2013-08-26 16:06:40,666 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2013-08-26 16:06:40,672 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2013-08-26 16:06:40,672 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (AK,570178) ... (AZ,2849136) (CA,15975597) (CO,3648347) (CT,3849651) (DC,842633) ... (WY,724821) grunt>
DSE 3.1.4
grunt> state_flat = FOREACH libdata_buildings GENERATE STABR as State,SQ_FEET as SquareFeet; grunt> state_grouped = GROUP state_flat BY State; grunt> state_footage = FOREACH state_grouped GENERATE group as State, SUM(state_flat.SquareFeet)as TotalFeet:int; grunt> dump state_footage;
Looks like California has a grand total of 15,975,597 square feet of libraries. According to google it has an area of 163,696 sq miles or 4.56358e12 sq feet. Which means that approximately 0.0000035 (3.5 * 10^-6) of California is libraries. My initial guess of 10^-6 panned out!
Update: Inserting Into Cassandra Using Pig
Having the data only in Pig is fun, but what if I wanted to persist it forever? CqlStorage will let us save our data to Cassandra, but to do so we first need to build a table in CQLSH.
CREATE TABLE libsqft ( year int , state text, sqft bigint , PRIMARY KEY (year,state));
CqlStorage in Pig, uses CQL prepared statements to move data from Pig back to Cassandra. Half of the prepared statement is written by CqlStorage and the other portion is specified by the user as an 'output_query'. The automatic portion is the 'WHERE KEY =' clause which specifies what key should be used in the command. The rest of the arguments, the "?"s, for the prepared statement will be filled in by whatever values are paired with that key in Pig. This requires your data to be in the following format.
((PartitionKey_Name,Value),(ClusteringKey_1_name,Value)...)(ArgValue1,ArgValue2,ArgValue3,...)
For our example I can use the following code to place the information in that format
insert_format= FOREACH state_footage GENERATE TOTUPLE(TOTUPLE('year',2011),TOTUPLE('state',State)),TOTUPLE(TotalFeet);
Now I can create a prepared statement which will insert the values into the Cassandra Table. The query needs to be URL Encoded so the special characters will not disrupt the Pig URL.
UPDATE libdata.libsqft SET sqft = ? Becomes UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F
A Pig STORE statement is used to actually insert the information back into Cassandra. The URL looks identical to the original LOAD url but adds the 'output_query' option. As I stated before, this is only the portion of the prepared statement prior to the 'WHERE key ...' because that portion will be filled in automatically by CqlStorage.
store insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F' USING CqlStorage;
Now we can view the results back in CQLSH
SELECT * FROM libdata.libsqft; year | state | sqft ------+-------+-------------------- 2011 | AK | 570178 2011 | AL | 2792246 ... 2011 | WI | 5661236 2011 | WV | 1075356 2011 | WY | 724821
I hope this has given you a general overview about how to manipulate data in Cassandra with Pig and that you will be able take advantage of these technologies in the future.
Commands Used In This Tutorial
CQL3 Commands
CREATE KEYSPACE libdata WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; USE libdata; CREATE TABLE libout ("STABR" TEXT,"FSCSKEY" TEXT,"FSCS_SEQ" TEXT,"LIBID" TEXT,"LIBNAME" TEXT,"ADDRESS" TEXT,"CITY" TEXT,"ZIP" TEXT,"ZIP4" TEXT,"CNTY" TEXT,"PHONE" TEXT,"C_OUT_TY" TEXT,"C_MSA" TEXT,"SQ_FEET" INT,"F_SQ_FT" TEXT,"L_NUM_BM" INT,"F_BKMOB" TEXT,"HOURS" INT,"F_HOURS" TEXT,"WKS_OPEN" INT,"F_WKSOPN" TEXT,"YR_SUB" INT,"STATSTRU" INT,"STATNAME" INT,"STATADDR" INT,"LONGITUD" FLOAT,"LATITUDE" FLOAT,"FIPSST" INT,"FIPSCO" INT,"FIPSPLAC" INT,"CNTYPOP" INT,"LOCALE" TEXT,"CENTRACT" FLOAT,"CENBLOCK" INT,"CDCODE" TEXT,"MAT_CENT" TEXT,"MAT_TYPE" INT,"CBSA" INT,"MICROF" TEXT, PRIMARY KEY ("FSCSKEY","FSCS_SEQ")); SELECT count(*) FROM libdata.libout LIMIT 20000; SELECT "CITY","C_OUT_TY" FROM libout LIMIT 10;
CQLSH Commands
COPY libout ("STABR","FSCSKEY","FSCS_SEQ","LIBID","LIBNAME","ADDRESS","CITY","ZIP","ZIP4", "CNTY","PHONE","C_OUT_TY","C_MSA","SQ_FEET","F_SQ_FT","L_NUM_BM","F_BKMOB","HOURS", "F_HOURS","WKS_OPEN","F_WKSOPN","YR_SUB","STATSTRU","STATNAME","STATADDR", "LONGITUD","LATITUDE","FIPSST","FIPSCO","FIPSPLAC","CNTYPOP", "LOCALE","CENTRACT","CENBLOCK","CDCODE","MAT_CENT","MAT_TYPE","CBSA","MICROF") FROM 'puout11a.utf8.csv' WITH HEADER=TRUE;
Pig Commands
libdata = load 'cql://libdata/libout' USING CqlStorage(); book_by_mail = FILTER libdata BY C_OUT_TY.value == 'BM'; DUMP book_by_mail; libdata_buildings = FILTER libdata BY SQ_FEET.value > 0; state_flat = FOREACH libdata_buildings GENERATE STABR.value as State,SQ_FEET.value as SquareFeet; state_grouped = GROUP state_flat BY State; state_footage = FOREACH state_grouped GENERATE group as State, SUM(state_flat.SquareFeet) as TotalFeet:int; dump state_footage;
1. My guessing game estimate would go something like this. California is big and mostly empty, so I’ll assume only about 1% is actually Urban. Within cities libraries also tend to occupy a small amount of land in relation to the entire city. Oakland has about 10 libraries for a city of ~300,000 so 1 library/10,000 homes. At first order I assume libraries are about the same size as houses (10 times seems too much) and cities are practically all houses. Therefore I estimated that 10^-2 City Land / California Land * 10^-4 Library Land/ City Land = 10^-6 Library Land/ California Land. Final guess, one millionth of California is libraries.
UPDATE: 9/10/2013 Added Pig -> Cassandra Example
UPDATE: 9/27/2013 Added note about changed format in DSE 3.1.4/ AC 1.2.10