DataStax Enterprise GraphFrames: Best Practices
This guide highlights best practices for loading data with the DseGraphFrame package. The DseGraphFrame package provides a Spark API for bulk operations and analytics on DataStax Graph. It is inspired by Databricks’ GraphFrame library and supports a subset of Apache TinkerPop™ Gremlin graph traversal language. The DseGraphFrame package supports reading of DataStax Graph data into a GraphFrame, and writing GraphFrames from any format supported by Spark into DataStax Graph. For a review of our initial offering and more introductory examples see our Introducing DataStax GraphFrames blog post. Technical documentation on general usage of DataStax Enterprise GraphFrames (DGF) can be viewed in our DSE GraphFrame overview page.
Common Pitfalls
Null unset issue can cause excessive tombstones
In version prior to 6.0.7 and 6.7.3, if a user omitted columns during DSE GraphFrames edge updates, the missing columns fields were implicitly written to DSE with null values, causing unintended deletions, tombstone build-up, and ultimately excessive stress on the system.
The workaround at the time was to set spark.setCassandraConf(Map("spark.cassandra.output.ignoreNulls" -> "true"))
, which will ignore unset or null-valued columns and not create unintended deletions on the server side. In DSE versions 6.0.7, 6.7.3, and higher the default value for ignoreNulls
is true.
Unintended caching can lead to OOM exceptions
Prior to DSE versions 5.1.14, 6.0.5, and 6.7.1, a problem existed such that during a DataStax GraphFrame bulk loading job, the Spark cache was being used by default, but not explicitly emptied. This lead to OutOfMemory(OOM) errors and other issues. The Spark Cassandra Connector parameter spark.dse.graphframes.update.persistLevel
was introduced that allows better control over Spark caching levels.
Additionally, a new cache parameter was introduced in the multi-label update methods that can be used as a workaround if the user wishes to explicitly uncache data after use. More details on this coming soon...
How to workaround Materialized Views during bulk loading
When indexing with Materialized Views is desired, it is often recommended to enable this after the data has been loaded because it significantly affects insertion performance. We expect about a 10% performance penalty per MV, and there are some subtleties to be aware of when defining the data model, see the Materialized View Performance in Cassandra 3.x blog post for more details.
Recommended steps for bulk loading data:
-
Drop all indices
-
Bulk load data
-
Recreate indices
After data is loaded, and one enables indexing, how do we know when it's done? There is a nodetool viewbuildstatus command for accomplishing exactly this.
How to manage multi/meta-properties
Here is an example of updating vertex multi and meta-properties. Suppose we add a multi-property called nicknames
, which itself has meta-properties name time
and date
. The person
vertex label will be defined with the nicknames
property.
groovy
schema.propertyKey("time").Timestamp().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("person"). partitionKey("name", "ssn"). clusteringKey("age"). properties("address", "coffeePerDay", "nicknames"). create()
We'll start with 2 person
vertices, 2 software
vertices, and 2 created
edges. Notice none of the person
vertices have nicknames
set yet.
scala
scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |null | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ scala> g.E().show(false) +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |src |dst |~label |id |weight| +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ==|created|296d28c0-e62c-11e9-ace5-5b43d7c0da8d|1.0 | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5|created|f4774f00-e62c-11e9-ace5-5b43d7c0da8d|1.0 | +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+
Now let’s add nicknames and its meta-properties. First construct a DataFrame consisting of the id
of the vertex named rocco, along with the nicknames
property and meta-properties we wish to update.
scala
scala> val df = Seq(("person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=", "docRoc", java.sql.Date.valueOf("2017-01-01"), new java.sql.Timestamp(100L))).toDF("id", "nicknames", "date", "time") df: org.apache.spark.sql.DataFrame = [id: string, nicknames: string ... 2 more fields] scala> df.show(false) +---------------------------------------------------+---------+----------+---------------------+ |id |nicknames|date |time | +---------------------------------------------------+---------+----------+---------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|docRoc |2017-01-01|1970-01-01 00:00:00.1| +---------------------------------------------------+---------+----------+---------------------+
Now we create a new DataFrame consisting of just the id
of the vertex and the nickname
property to update.
scala
scala> val updateDF = df.select(col("id"), array(struct($"nicknames", $"date", $"time")) as "nicknames") updateDF: org.apache.spark.sql.DataFrame = [id: string, nicknames: array<struct<nicknames:string,date:date,time:timestamp>>]
Notice how we construct the nicknames fields in this DataFrame, it is an array of struct
type.
scala
scala> updateDF.printSchema root |-- id: string (nullable = true) |-- nicknames: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- nicknames: string (nullable = true) | | |-- date: date (nullable = true) | | |-- time: timestamp (nullable = true) scala> updateDF.show(false) +---------------------------------------------------+-------------------------------------------+ |id |nicknames | +---------------------------------------------------+-------------------------------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|[[docRoc,2017-01-01,1970-01-01 00:00:00.1]]| +---------------------------------------------------+-------------------------------------------+
Now we update vertices using this updated DataFrame. Notice we are using the multi-vertex label API of the vertex update method.
scala
scala> g.updateVertices(updateDF) scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+
Alternatively, we could use the Apache TinkerPop™ property update syntax with DataStax GraphFrames
scala
scala> g.V().has("name", "rocco").property("nicknames", "docRoc", "date", java.sql.Date.valueOf("2017-01-01"), "time", new java.sql.Timestamp(100L)).iterate()
It is worth noting that regardless of the approach used, updateVertices
or Apache TinkerPop™ update syntax with DGF, multi-properties are append only. For example, if we look at the vertex named rocco after executing the second update with the Apache TinkerPop™ syntax, we'll see two entries in the nicknames
column.
scala
scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01], [docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+
How to carry out idempotent edge updates
When updating edges users should provide a valid and unique UUID for the id
column.
Suppose we start with the following graph schema, our examples will look at updates with the lives
edge label.
groovy
// truncated example for brevity schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("reason").Text().single().create() schema.propertyKey("age").Int().single().create() schema.propertyKey("name").Text().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("location").properties("name").create() schema.vertexLabel("god").properties("name", "age", "nicknames").create() schema.vertexLabel("god").index("god_age_index").secondary().by("age").add() schema.vertexLabel("god").index("god_name_index").secondary().by("name").add() schema.edgeLabel("lives").multiple().properties("reason").create() schema.edgeLabel("lives").connection("god", "location").add() // add data Vertex neptune = graph.addVertex(T.label, "god", "name", "neptune", "age", 4500); Vertex sea = graph.addVertex(T.label, "location", "name", "sea"); neptune.addEdge("lives", sea).property("reason", "loves waves");
The edge table maintains data for the lives
edge in the src
, dst
, ~label
, id
, and reason
columns.
java
DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |loves waves | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+
Now let's grab the edge that has a loves waves value in the reason
column, then overwrite with the word New.
java
DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); Dataset<Row> u = gf.gf().edges().filter("reason = 'loves waves'").drop("time").drop("reason").drop("name").withColumn("reason", functions.lit("New"));
This gives us the following DataFrame that can be used to update the edge. Notice that because we are using an existing row and simply reinserting it with a new reason, we get the unique id for free.
java
u.show(false); +--------------------+-------------------------+------+------------------------------------+------+ |src |dst |~label|id |reason| +--------------------+-------------------------+------+------------------------------------+------+ |god:RnS4AAAAAAAAAAAE|location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|New | +--------------------+-------------------------+------+------------------------------------+------+
We can now update the edge using the updateEdges
method, and as expected we see the edge with id
value f695a6b0-4500-11e9-8e88-fb68397e4bea has the reason
column set with the new value New.
java
gf.updateEdges(u); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |New | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+
In general, it’s important to note that when updating edges you must include the id
column in the dataset for the existing edges to be updated. If a user omits the id
column and instead only supplies the src
, dst
, and ~label
, they will end up duplicating edges with auto-generated IDs.
Key order matters when using the idColumn method
The idColumn(label: String, idColumns: Column*): Column
method is a utility used for generating GraphFrame compatible IDs, see the API documentation for more details. When using idColumn
for vertices that have multiple key columns it is important to provide the key columns to idColumn
in the same order in which they are defined in the schema.
Suppose we have the following name
vertex label
groovy
gremlin> schema.vertexLabel("name"). ......1> partitionKey("internal_party_id"). ......2> clusteringKey( ......3> "prefx_nm", ......4> "first_nm", ......5> "mdl_nm", ......6> "last_nm", ......7> "sufx_nm", ......8> "name_line_desc" ......9> ).create()
When passing the keys to the idColumn
method, the order must match the order defined in the schema. Notice in the example below the order of keys provided when constructing the dst
column.
scala
scala> val hasNameEdges = nameVertices .drop(col("~label")) .withColumn("src", nameVertices.col("partyId")) .withColumn("dst", g.idColumn( lit("name"), nameVertices.col("internal_party_id"), nameVertices.col("prefx_nm"), nameVertices.col("first_nm"), nameVertices.col("mdl_nm"), nameVertices.col("last_nm"), nameVertices.col("sufx_nm"), nameVertices.col("name_line_desc") )) .withColumn("~label", lit("has_name")) .drop(col("internal_party_id")) .drop(col("partyId")) .drop(col("first_nm")) .drop(col("last_nm")) .drop(col("mdl_nm")) .drop(col("name_line_desc")) .drop(col("prefx_nm")) .drop(col("sufx_nm")) scala> g.updateEdges(hasNameEdges)
The new API for updating single labels was introduced to address this issue and simplify the user experience.
Considerations for Loading Big Graphs
Spark Cassandra Connector tuning parameters still apply with DataStax Enterprise GraphFrames
To increase write performance during DataStax Enterprise GraphFrames bulk loading, remember that our existing Spark Cassandra Connector tuning parameters still apply.
For example, spark.cassandra.output.concurrent.writes
has been found to be one of the most intuitive and effective parameters to tune during load testing. Other parameters such as spark.cassandra.output.throughputMBPerSec
(formerly spark.cassandra.output.throughput_mb_per_sec
) can be very helpful as well. In cases where one expects a long insertion workload, it may be wise to down-tune spark.cassandra.output.throughputMBPerSec
appropriately to avoid overwhelming the database cluster.
The spark.cassandra.connection.keepAliveMs
may also be useful in scenarios with long-running insertion workloads where connections may experience longer than expected periods of inactivity, a potential side-effect of periodic delays while processing insertions/updates on the server.
Here are examples of using these parameters:
shell-console
dse spark-submit \ --conf "spark.cassandra.output.concurrent.writes=100" \ --conf "spark.cassandra.connection.keepAliveMS=120000" \ --conf "spark.cassandra.output.throughputMBPerSec=50" \ --class com.datastax.DataImport target/data-import-1.0-SNAPSHOT.jar \ newapi
Avoid over tuning your application on a small dataset
Be careful when tuning with a small dataset, very likely parameters tuned for short insertion workload will not behave similarly for longer more intensive workloads. A longer sustained insertion workload will lead to more data and more severe effects from background tasks such as memtable flushing, compaction, query routing, etc. In short, an incremental approach is recommended when loading large datasets. Try loading say 10-20% of the data, making note of parameters, cluster size, and overall node health during the process (e.g. lookout for obvious things like timeout exceptions, etc).
Also, increasing the cluster size can serve as an effective strategy in reducing individual node stress and improving overall ingestion performance. Again there is not a one-size-fits-all solution here, but an incremental approach with reasonably chosen tuning parameters and environment setup is a good approach.
How to copy a graph from one cluster to another
In DSE versions 5.1.15+, 6.0.8+, and 6.7.4+ a user has the ability to specify which host a DseGraphFrame object should connect with. This allows a user to read graph contents from one cluster and write to another. Suppose we want to copy vertices and edges from a remote cluster to the local cluster, here is a small example showing how to accomplish this.
scala
import com.datastax.spark.connector.cql.CassandraConnectorConf spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.1")) spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.2")) spark.conf.set("cluster", "cluster1") val source = spark.dseGraph("srcGraph") spark.conf.set("cluster", "cluster2") val dst = spark.dseGraph("dstGraph") dst.updateVertices(src.V) dst.updateEdges(src.E)