TechnologyJune 2, 2014

Powers of Ten – Part II

Powers of Ten – Part II

"'Curiouser and curiouser!' cried Alice (she was so much surprised, that for the moment she quite forgot how to speak good English); 'now I'm opening out like the largest telescope that ever was!"
-- Lewis Carroll - Alice's Adventures in Wonderland

alice-too-bigIt is sometimes surprising to see just how much data is available. Much like Alice and her sudden increase in height, in Lewis Carroll's famous story, the upward growth of data can happen quite quickly and the opportunity to produce a multi-billion edge graph becomes immediately present. Luckily, Titan is capable of scaling to accommodate such size and with the right strategies for loading this data, the development efforts can more rapidly shift to the rewards of massive scale graph analytics.
This article represents the second installment in the two part Powers of Ten series that discusses bulk loading data into Titan at varying scales. For purposes of this series, the "scale" is determined by the number of edges to be loaded. As it so happens, the strategies for bulk loading tend to change as the scale increases over powers of ten, which creates a memorable way to categorize different strategies. "Part I" of this series, looked at strategies for loading millions and tens of millions of edges and focused on usage of Gremlin to do so. This part of the series will focus on hundreds of millions and billions of edges and will focus on the usage of Faunus as the loading tool.

Note: By Titan 0.5.0, Faunus will be pulled into the Titan project under the name Titan/Hadoop.

100 Million

As a reminder from this article's predecessor, loading tens of millions of edges was best handled with BatchGraph. The use of BatchGraph might also be useful in the low hundreds of millions of edges, assuming that the time of loading related to developmental iteration is not a problem. It is at this point that the decision to use Faunus for loading could be a good one.

Faunus is a graph analytics engine that is based on Hadoop and in addition to its role of being an analytic tool, Faunus also provides ways to manage large scale graphs, providing ETL-related functions. By taking advantage of the parallel nature of Hadoop, the loading time for hundreds of millions of edges can be decreased, as compared to a single threaded loading approach with BatchGraph.

The DocGraph data set "shows how healthcare providers team to provide care". DocGraph was introduced in the previous installment to the Powers of Ten series, where the smallest version of the data set was utilized. As a quick reminder of this data set's contents, recall that vertices in this network represent healthcare providers and edges represent shared interactions between two providers. This section will utilize the "365-day Window", which consists of approximately 1 million vertices and 154 million edges.

DocGraph Schema

Graphs in the low hundreds of millions of edges, like DocGraph, can often be loaded using a single Hadoop node running in psuedo-distributed mode. In this way, it is possible to have gain the advantage of parallelism, while keeping the configuration complexity and resource requirements as low as possible. In developing this example, a single m2.xlarge EC2 instance was used to host Hadoop and Cassandra in a single-machine cluster. It assumes that the following prerequisites are in place:

Once the prerequisites have been established, download the DocGraph data set and unpackage it to $FAUNUS_HOME/:

$ curl -L -O http://downloads.cms.gov/foia/physician-referrals-2012-2013-days365.zip

$ unzip physician-referrals-2012-2013-days365.zip

One of the patterns established in the previous Powers of Ten post was the need to always create the Titan Type Definitions first. This step is most directly accomplished by connecting to Cassandra with the Titan Gremlin REPL (i.e. $TITAN_HOME/bin/gremlin.sh) which will automatically establish the Titan keyspace. Place the following code in a file at the root of called $TITAN_HOME/schema.groovy:

g = com.thinkaurelius.titan.core.TitanFactory.open("conf/titan-cassandra.properties")

g.makeKey("npi").dataType(String.class).single().unique().indexed(Vertex.class).make()

sharedTxCount = g.makeKey("sharedTxCount").dataType(Integer.class).make()

patientTotal = g.makeKey("patientTotal").dataType(Integer.class).make()

sameDayTotal = g.makeKey("sameDayTotal").dataType(Integer.class).make()

g.makeLabel("shares").signature(sharedTxCount, patientTotal, sameDayTotal).make()

g.commit()

This file can be executed in the REPL as: gremlin> \. schema.groovy
docgraph-logoThe DocGraph data is formatted as a CSV file, which means that in order to read this data the Faunus input format must be capable of processing that structure. Faunus provides a number of out-of-the-box formats to work with and the one to use in this case is the ScriptInputFormat. This format allows specification of an arbitrary Gremlin script to write a FaunusVertex, where the FaunusVertex is the object understood by the various output formats that Faunus supports.

The diagram below visualizes the process, where the script defined to the ScriptInputFormat will execute against each line of the CSV file in a parallel fashion, allowing it to parse the line into a resulting FaunusVertex and related edges, forming an adjacency list. That adjacency list can then be written to Cassandra with the TitanCassandraInputFormat.

Faunus

The following script contains the code to parse the data from the CSV file and will be referred to as $FAUNUS_HOME/NPIScriptInput.groovy

ID_CHARACTERS = ['0'..'9','D'].flatten()

NUM_CHARACTERS = ID_CHARACTERS.size()

  

def long encodeId(String id) {

  id.inject(0L, { acc, c ->

    acc * NUM_CHARACTERS + ID_CHARACTERS.indexOf(c)

  })

}

  

def boolean read(FaunusVertex vertex, String line) {

  

    def (id1,

         id2,

         sharedTxCount,

         patientTotal,

         sameDayTotal) = line.split(',')*.trim()

  

    vertex.reuse(encodeId(id1))

    vertex.setProperty("npi", id1)

  

    def edge = vertex.addEdge(Direction.OUT, "shares", encodeId(id2))

    edge.setProperty("sharedTxCount", sharedTxCount as Integer)

    edge.setProperty("patientTotal", patientTotal as Integer)

    edge.setProperty("sameDayTotal", sameDayTotal as Integer)

  

    return true

}

hadoop-logoThe most important aspect of the code above is the definition of the read function at line ten, where the FaunusVertex and a single line from the CSV file are fed. This function processes the CSV line by splitting on the comma separator, setting the property on the supplied FaunusVertex and creating the edge represented by that CSV line. Once the script is created to deal with the input file, attention should be turned to the Faunus properties file (named $FAUNUS_HOME/faunus.properties):

# input graph parameters

faunus.graph.input.format=com.thinkaurelius.faunus.formats.script.ScriptInputFormat

faunus.input.location=docgraph/Physician-Referrals-2012-2013-DAYS365.txt

faunus.graph.input.script.file=docgraph/NPIScriptInput.groovy

faunus.graph.input.edge-copy.direction=OUT

  

# output data (graph or statistic) parameters

faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat

faunus.graph.output.titan.storage.backend=cassandra

faunus.graph.output.titan.storage.hostname=localhost

faunus.graph.output.titan.storage.port=9160

faunus.graph.output.titan.storage.keyspace=titan

faunus.graph.output.titan.storage.batch-loading=true

faunus.graph.output.titan.infer-schema=false

 

mapred.task.timeout=5400000

mapred.max.split.size=5242880

mapred.reduce.tasks=2

mapred.map.child.java.opts=-Xmx8G

mapred.reduce.child.java.opts=-Xmx8G

mapred.job.reuse.jvm.num.tasks=-1

  

faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

faunus.output.location=output

faunus.output.location.overwrite=true

HDFS LogoThe above properties file defines the settings Faunus will use to execute the loading process. Lines two through five specify the input format and properties related to where the source data is. Note that the file locations specified are representative of locations in Hadoop's distributed file system, HDFS, and not the local file system. Lines eight through fourteen focus on the output format, which is a TitanGraph. These settings are mostly standard Titan configurations, prefixed with faunus.graph.output.titan.. As with previous bulk loading examples in Part I of this series, storage.batch-loading is set to true.

It is now possible to execute the load through the Faunus Gremlin REPL, which can be started with, $FAUNUS_HOME/bin/gremlin.sh. The first thing to do is to make sure that the data and script files are available to Faunus in HDFS. Faunus has built-in help for interacting with that distributed file system, allowing for file moves, directory creation and other such functions.

gremlin> hdfs.mkdir("docgraph")

==>null

gremlin> hdfs.copyFromLocal('Physician-Referrals-2012-2013-DAYS365.txt','docgraph/Physician-Referrals-2012-2013-DAYS365.txt')

==>null

gremlin> hdfs.copyFromLocal("NPIScriptInput.groovy","docgraph/NPIScriptInput.groovy")

==>null

Now that HDFS has those files available, execute the Faunus job that will load the data as shown below:

gremlin> g = FaunusFactory.open("faunus.properties")

==>faunusgraph[scriptinputformat->titancassandraoutputformat]

gremlin> g._()      

13:55:05 INFO mapreduce.FaunusCompiler: Generating job chain: g._()

13:55:05 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.2-job.jar

13:55:05 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)

13:55:05 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]

...

17:55:25 INFO input.FileInputFormat: Total input paths to process : 2

17:55:25 INFO mapred.JobClient: Running job: job_201405141319_0004

17:55:26 INFO mapred.JobClient:  map 0% reduce 0%

17:56:23 INFO mapred.JobClient:  map 1% reduce 0%

...

02:06:46 INFO mapred.JobClient:  map 100% reduce 0%

...

18:54:05 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters

18:54:05 INFO mapred.JobClient:     EDGE_PROPERTIES_WRITTEN=463706751

18:54:05 INFO mapred.JobClient:     EDGES_WRITTEN=154568917

18:54:05 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=624

...

18:54:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=77376

At line one, the FaunusGraph instance is created using the docgraph.properties file to configure it. Line three, executes the job given the configuration. The output from the job follows, culminating in EDGES_WRITTEN=154568917, which is the number expected from this dataset.

The decision to utilize Faunus for loading at this scale will generally be balanced against the time of loading and the complexity involved in handling parallelism in a custom way. In other words, BatchGraph and custom parallel loaders might yet be good strategies if time isn't a big factor or if parallelism can be easily maintained without Hadoop. Of course, using Faunus from the beginning will allow the same load to scale up easily, as converting from a single machine pseudo-cluster, to a high-powered, multi-node cluster isn't difficult to do and requires no code changes for that to happen.

1 Billion

In terms of loading mechanics, the approach to loading billions of edges, is not so different from the previous section. The strategy for loading is still Faunus-related, however a single machine psuedo-cluster is likely under-powered for a job of this magnitude. A higher degree of parallelism is required for it to execute in a reasonable time frame. It is also likely that the loading of billions of edges will require some trial-and-error "knob-turning" with respect to Hadoop and the target backend store (e.g. Cassandra).
The Friendster social network dataset represents a graph with 117 million vertices and 2.5 billion edges. The graph is represented as an edge list, where each line in the CSV file has the out and in vertex represented as a long separated by a colon delimiter. Like the previous example with DocGraph, the use of ScriptInputFormat provides the most convenient way to process this file.

In this case, a four node Hadoop cluster was created using m2.4xlarge EC2 instances. Each instance was configured with eight mappers and six reducers, yielding a total of thirty-two mappers and twenty-four reducers in the cluster. Compared to the single machine pseudo-cluster used in the last section, where there were just two mappers and two reducers, this fully distributed cluster has a much higher degree of parallelism. Like the previous section, Hadoop and Cassandra were co-located, where Cassandra was running on each of the four nodes.

As the primary difference between loading data at this scale and the previous one is the use of a fully distributed Hadoop cluster as compared to a pseudo-cluster, this section will dispense with much of the explanation related to execution of the load and specific descriptions of the configurations and scripts involved. The script for processing each line of data in the Friendster dataset looks like this:

import com.thinkaurelius.faunus.FaunusVertex

import static com.tinkerpop.blueprints.Direction.OUT

  

def boolean read(final FaunusVertex v, final String line) {

    def parts = line.split(':')

    v.reuse(Long.valueOf(parts[0]))

    if (parts.size() > 1) {

        parts[1].split(',').each({

            v.addEdge(OUT, 'friend', Long.valueOf(it))

        })

    }

    return true

}

The faunus.properties file isn't really any different than the previous example except that it now points to Friendster related files in HDFS in the "input format" section. Finally, as with every loading strategy discussed so far, ensure that the Titan schema is established first prior to loading. The job can be executed as follows:

gremlin> hdfs.copyFromLocal("/tmp/FriendsterInput.groovy","FriendsterInput.groovy")

==>null

gremlin> g = FaunusFactory.open("bin/friendster.properties")

==>faunusgraph[scriptinputformat->titancassandraoutputformat]

gremlin> g._()                                             

18:28:46 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.4-job.jar

18:28:46 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)

18:28:46 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]

...

18:28:47 INFO input.FileInputFormat: Total input paths to process : 125

18:28:47 INFO mapred.JobClient: Running job: job_201405111636_0005

18:28:48 INFO mapred.JobClient:  map 0% reduce 0%

18:29:39 INFO mapred.JobClient:  map 1% reduce 0%

...

02:06:46 INFO mapred.JobClient:  map 100% reduce 0%

...

02:06:57 INFO mapred.JobClient:   File Input Format Counters

02:06:57 INFO mapred.JobClient:     Bytes Read=79174658355

02:06:57 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters

02:06:57 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=15094

02:06:57 INFO mapred.JobClient:     EDGES_WRITTEN=2586147869

02:06:57 INFO mapred.JobClient:   FileSystemCounters

02:06:57 INFO mapred.JobClient:     HDFS_BYTES_READ=79189272471

02:06:57 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1754590920

...

02:06:57 INFO mapred.JobClient:     Bytes Written=0

The billion edge data load did not introduce any new techniques in loading, but it did show that the same technique used in the hundred million edge scale could scale in a straight-forward manner to billion edge scale without any major changes to the mechanics of loading. Moreover, scaling up Faunus data loads can really just be thought of as introducing more Hadoop nodes to the cluster.

Conclusion

GremlinOver the course of this two post series, a number of strategies have been presented for loading data at different scales. Some patterns, like creating the Titan schema before loading and enabling storage.batch-loading, carry through from the smallest graph to the largest and can be thought of as "common strategies". As there are similarities that can be identified, there are also vast differences ranging from single-threaded loads that take a few seconds to massively parallel loads that can take hours or days. Note that the driver for these variations is the data itself and that aside from "common strategies", the loading approaches presented can only be thought of as guidelines which must be adapted to the data and the domain.

Complexity of real-world schema will undoubtedly increase as compared to the examples presented in this series. The loading approach may actually consist of several separate load operations, with strategies gathered from each of the sections presented. By understanding all of these loading patterns as a whole, it is possible to tailor the process to the data available, thus enabling the graph exploration adventure.

Acknowledgments

Dr. Vadas Gintautas originally foresaw the need to better document bulk loading strategies and that such strategies seemed to divide themselves nicely in powers of ten.

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.