KillrVideo Python Pt. 6— Cassandra with Python: Simple to Complex
This is the latest installment of a series about building a Python application with Apache Cassandra — specifically a Python implementation of the KillrVideo microservice tier. In previous posts I shared what motivated this project, how I started with infrastructure including GRPC and Etcd, the testing approach, and most recently, how I began implementing data access using Cassandra.
In this post we’ll look at some additional examples of data access using the DataStax Python Driver, ranging from the simple to the complex. (I’ll be making reference some driver / mapper concepts from the previous post so you may want to review that if you haven’t already.)
Keeping it simple
In the last post, I shared how easy it was to implement the data access for the User Management Service using Apache Cassandra and the DataStax Python Driver. Using the cqlengine
mapper was a big factor in my productivity. Because of that productivity boost, when implementing the other services I started with the mapper and only used other approaches when the complexity of the data access required it.
In fact, a couple of additional services were implemented entirely using the mapper: the Statistics Service and the Ratings Service. These demonstrate Cassandra features including counters and batches.
Simple example 1: Counters in the Statistics Service
The Statistics Service (statistics_service.py) stores counts of how many time each video has been viewed. Counting statistics is one of the relatively few use cases for which the Cassandra counter type is a good fit, because and therefore a good example of how to manipulate counters with the mapper. The model class we used to track statistics is shown here:
class VideoPlaybackStatsModel(Model):
"""Model class that maps to the video_playback_stats table"""
__table_name__ = 'video_playback_stats'
video_id = columns.UUID(primary_key=True, db_field='videoid')
views = columns.Counter()
Note the use of the columns.Counter()
type for the views
column.
Incrementing the count of views in the record_playback_started()
operation is very simple:
VideoPlaybackStatsModel(video_id=video_id).update(views=1)
Note that the value views=1
represents a single view of the video.
Simple example 2: Batch Writes in the Ratings Service
The Ratings Service (ratings_service.py) stores user ratings of videos and allows retrieval of the ratings either by user or by video. One interesting aspect of this service is the use of the mapper as part of a Cassandra batch when storing ratings in order to support writes to two denormalized tables supporting the “by user” and “by video” queries mentioned above. The code block below shows the data access code from the rate_video()
operation:
now = datetime.utcnow()
# create and execute batch statement to insert into multiple tables batch_query = BatchQuery(timestamp=now)
VideoRatingsByUserModel.batch(batch_query)\
.create(video_id=video_id, user_id=user_id, rating=rating)
# updating counter columns rating_counter and rating_total
# values are interpreted as amount to increment VideoRatingsModel(video_id=video_id).\
update(rating_counter=1, rating_total=rating).batch(batch_query)
batch_query.execute()
In this example, we use two different mapper classes to write to two different tables. Note the use of each mapper’s batch()
operation to add a statement to the batch_query
, which we can then execute. The first statement created is a CQL INSERT
into the killrvideo.video_ratings_by_user
via the VideoRatingsByUserModel
.
If you’re looking closely may you have also noticed something about that second write — we’re using the VideoRatingsModel
, which goes to the killrvideo.video_ratings
table to do a CQL UPDATE
. This table is another interesting use of counters. In this case, two counter columns are used. The rating_counter
tracks the number of times a video has been rated, while the rating_total
tracks the sum of all ratings for the video. The average rating can then be calculated by the client via simple division (rating_total/rating_counter
).
When simple doesn’t cut it
There were a few other services that involved more complex data access where the mapper couldn’t fully address my needs. These included the Video Catalog Service, the Comments Service, and the Search Service.
Complex example 1: Paging in the Comments Service
Although it’s not “YouTube scale”, the KillrVideo application is designed to support a very large number of videos, users, ratings, and other data types, in order to demonstrate best practices for data modeling and driver usage found in real-world applications.
With this in mind, imagine a user being presented with screens of videos and comments in a web browser on a client device. It would be infeasible and unnecessary to return the entire video catalog or the entire comment history for a popular video to the client at once, so of course, some form of paging is required. However, paging is a feature that isn’t supported by the cqlengine
mapper provided with the DataStax Python Driver. So, we need to fall back to other methods.
A good example of the approach is found in the Comments Service (comments_service.py). Since paging only comes into effect on reads, we can use the mapper for writes, and then use regular CQL statements on the reads to get control over paging.
As an example, to access comments by user, we first create prepared statements in the constructor for the class:
# Prepared statements for get_user_comments()
self.userComments_startingPointPrepared = \
session.prepare('SELECT * FROM comments_by_user WHERE userid = ? AND (commentid) <= (?)')
self.userComments_noStartingPointPrepared = \
session.prepare('SELECT * FROM comments_by_user WHERE userid = ?')
Then, in the get_user_comments() operation, we use one of the prepared statements to create a bound statement which we can then execute:
bound_statement = None
if starting_comment_id:
bound_statement = self.userComments_startingPointPrepared
.bind([user_id, starting_comment_id])
else: bound_statement = self.userComments_noStartingPointPrepared .bind([user_id])
bound_statement.fetch_size = page_size result_set = None
if paging_state:
# see below where we encode paging state to hex before returning
result_set = self.session.execute(bound_statement,
paging_state=paging_state.decode('hex'))
else:
result_set = self.session.execute(bound_statement)
Note where the fetch_size
of the batch statement is set to the page_size
requested by the client. The code after this iterates over the rows in the result_set
to build up a list of comments to return to the client. If page_size
rows were returned, then the Cassandra paging state is extracted from the result set and returned to the client:
if len(results) == page_size:
# Use hex encoding since paging state is raw bytes that won't encode to UTF-8
next_page_state = result_set.paging_state.encode('hex')
This allows the client to pass back the paging state on a subsequent call to get_user_comments()
to retrieve the next page. Note that we encode/decode the paging state to a hex string, which works well over the Protobuf message format we’re using on our service interfaces.
Complex example 2: Full-text Search in the Search Service
The Search Service presents a different sort of problem. If you’re familiar with Cassandra data modeling practices, you’ll be aware that Cassandra doesn’t support arbitrary searches, and the secondary index implementation that comes with Cassandra is known to perform poorly over large data sets.
Instead, the best practice is to design a table per query, with primary keys based on the attributes you will specify in each query. However, your options when you don’t know the exact key you are looking for, with limited support for range queries and no support for text search features like prefix/postfix or fuzzy matching.
However, we do have a couple of cases where we need text search features in the Search Service, in both the get_query_suggestions()
operation, which performs a typeahead search for common search terms, and the search_videos()
operation, which is used to search for videos containing a search term in the title or description.
Following the pattern set in the other language implementations of the KillrVideo services, the Python implementation of the Search Service uses DataStax Enterprise Search. First, we need a search index on the videos
table (extracted from the CQL for the search schema):
CREATE SEARCH INDEX IF NOT EXISTS on videos;
Then, we create a prepared statement in the constructor of the Search Service:
self.search_videos_prepared = \ session.prepare('SELECT * FROM videos WHERE solr_query = ?')
Finally, in the search_videos()
operation, we use the solr_query
syntax supported by DSE Search to create a bound statement containing our desired CQL query:
solr_query = '{"q":"name:(' + query + ')^4 OR tags:(' + query + \ ')^2 OR description:(' + query + ')", "paging":"driver"}' bound_statement = self.search_videos_prepared.bind([solr_query])
We can then execute the bound statement and iterate over the results (not shown).
The solr_query
defined above searches for the provided search term (query
) in the name
, tags
, or description
columns. The solr_query
places additional weight on the name
and description
columns so that movies with the query
appearing in the name
column will appear the highest in search results, followed by those that have the search term in the description
column.
And now for something completely different
In the past two posts, I’ve taken you on a guided tour of the data access code for most of the KillrVideo Python services. The remaining service we haven’t discussed yet is the Suggested Videos Service. As you may have guessed, this involves building a recommender, which we’re doing using DataStax Enterprise Graph since that is built on top of Cassandra.
In upcoming posts I’ll share about this recommender, starting with how we shared data between services using Kafka in order to populate data into a graph used for recommendations.