Getting Started with Spark Streaming, Python, and Kafka
Last month I wrote a series of articles in which I looked at the use of Spark for performing data transformation and manipulation. This was in the context of replatforming an existing Oracle-based ETL and datawarehouse solution onto cheaper and more elastic alternatives. The processing that I wrote was very much batch-focussed; read a set of files from block storage ('disk'), process and enrich the data, and write it back to block storage.
In this article I am going to look at Spark Streaming. This is one of several libraries that the Spark platform provides (others include Spark SQL, Spark MLlib, and Spark GraphX). Spark Streaming provides a way of processing "unbounded" data - commonly referred to as "streaming" data. It does this by breaking it up into microbatches, and supporting windowing capabilities for processing across multiple batches. You can read more in the excellent Streaming Programming Guide.
Why Stream Processing?
Processing unbounded data sets, or "stream processing", is a new way of looking at what has always been done as batch in the past. Whilst intra-day ETL and frequent batch executions have brought latencies down, they are still independent executions with optional bespoke code in place to handle intra-batch accumulations. With a platform such as Spark Streaming we have a framework that natively supports processing both within-batch and across-batch (windowing).
By taking a stream processing approach we can benefit in several ways. The most obvious is reducing latency between an event occurring and taking an action driven by it, whether automatic or via analytics presented to a human. Other benefits include a more smoothed out resource consumption profile. We can avoid the very 'spiky' demands on CPU/memory/etc every time a batch runs by instead processing the same volume of data processed but in smaller intervals. Finally, given that most data we process is actually unbounded ("life doesn't happen in batches"), designing new systems to be batch driven - with streaming seen as an exception - is actually an anachronism with roots in technology limitations that are rapidly becoming moot. Stream processing doesn't have to imply, or require, "fast data" or "big data". It can just mean processing data continually as it arrives, and not artificially splitting it into batches.
For more details and discussion of streaming in depth and some of its challenges, I would recommend:
Use-Case and Development Environment
So with that case made above for stream processing, I'm actually going to go back to a very modest example. The use-case I'm going to put together is - almost inevitably for a generic unbounded data example - using Twitter, read from an Apache Kafka topic. We'll start simply, counting the number of tweets per user within each batch and doing some very simple string manipulations. After that we'll see how to do the same but over a period of time (windowing). In the next blog we'll extend this further into a more useful example, still based on Twitter but demonstrating how to satisfy some real-world requirements in the processing.
I developed all of this code using Jupyter Notebooks. I've written before about how awesome notebooks are (along with Jupyter, there's Apache Zeppelin). As well as providing a superb development environment in which both the code and the generated results can be seen, Jupyter gives the option to download a Notebook to Markdown. This blog runs on Ghost, which uses Markdown as its native syntax for composing posts - so in fact what you're reading here comes directly from the notebook in which I developed the code. Pretty cool.

If you want can view the notebook online here, and from there download it and run it live on your own Jupyter instance.
I used the docker image all-spark-notebook to provide both Jupyter and the Spark runtime environment. By using Docker I don't have to really worry about provisioning the platform on which I want to develop the code - I can just dive straight in and start coding. As and when I'm ready to deploy the code to a 'real' execution environment (for example EMR), then I can start to worry about that. The only external aspect was an Apache Kafka cluster that I had already, with tweets from the live Twitter feed on an Apache Kafka topic imaginatively called twitter
Preparing the Environment
We need to make sure that the packages we're going to use are available to Spark. Instead of downloading jar
files and worrying about paths, we can instead use the --packages
option and specify the group/artifact/version based on what's available on Maven and Spark will handle the downloading. We specify PYSPARK_SUBMIT_ARGS
for this to get passed correctly when executing from within Jupyter.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
Import dependencies
We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka. We also need the python json
module for parsing the inbound twitter data
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
Create Spark context
The Spark context is the primary object under which everything else is called. The setLogLevel
call is optional, but saves a lot of noise on stdout that otherwise can swamp the actual outputs from the job.
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
Create Streaming Context
We pass the Spark context (from above) along with the batch duration which here is set to 60 seconds.
See the API reference and programming guide for more details.
ssc = StreamingContext(sc, 60)
Connect to Kafka
Using the native Spark Streaming Kafka capabilities, we use the streaming context from above to connect to our Kafka cluster. The topic connected to is twitter
, from consumer group spark-streaming
. The latter is an arbitrary name that can be changed as required.
For more information see the documentation.
kafkaStream = KafkaUtils.createStream(ssc, '', 'spark-streaming', {'twitter':1})
Message Processing
Parse the inbound message as json
The inbound stream is a DStream
, which supports various built-in transformations such as map
which is used here to parse the inbound messages from their native JSON format.
Note that this will fail horribly if the inbound message isn't valid JSON.
parsed = v: json.loads(v[1]))
Count number of tweets in the batch
The DStream
object provides native functions to count the number of messages in the batch, and to print them to the output:
We use the map
function to add in some text explaining the value printed.
Note that nothing gets written to output from the Spark Streaming context and descendent objects until the Spark Streaming Context is started, which happens later in the code. Also note that pprint
by default only prints the first 10 values.
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
If you jump ahead and try to use Windowing at this point, for example to count the number of tweets in the last hour using the countByWindow
function, it'll fail. This is because we've not set up the streaming context with a checkpoint directory yet. You'll get the error: java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
. See later on in the blog for details about how to do this.
Extract Author name from each tweet
Tweets come through in a JSON structure, of which you can see an example here. We're going to analyse tweets by author, which is accessible in the JSON structure at user.screen_name
The lambda
anonymous function is used to apply the map
to each RDD within the DStream. The result is a DStream holding just the author's screenname for each tweet in the original DStream.
authors_dstream = tweet: tweet['user']['screen_name'])
Count the number of tweets per author
With our authors DStream, we can now count them using the countByValue
function. This is conceptually the same as this quasi-SQL statement:
Using countByValue
is a more legible way of doing the same thing that you'll see done in tutorials elsewhere with a map / reduceBy.
author_counts = authors_dstream.countByValue()
Sort the author count
If you try and use the sortBy
function directly against the DStream you get an error:
'TransformedDStream' object has no attribute 'sortBy'
This is because sort is not a built-in DStream function. Instad we use the transform
function to access sortBy
from pySpark.
To use sortBy
you specify a lambda function to define the sort order. Here we're going to do it based on the number of tweets (index 1 of the RDD) per author. You'll note this index references being used in the sortBy
lambda function x[1]
, negated to reverse the sort order.
Here I'm using \
as line continuation characters to make the code more legible.
author_counts_sorted_dstream = author_counts.transform(\
(lambda foo:foo\
.sortBy(lambda x:( -x[1]))))
Get top 5 authors by tweet count
To display just the top five authors, based on number of tweets in the batch period, we'll using the take
function. My first attempt at this failed with:
AttributeError: 'list' object has no attribute '_jrdd'
Per my woes on StackOverflow a parallelize
is necessary to return the values into a DStream form.
top_five_authors = author_counts_sorted_dstream.transform\
(lambda rdd:sc.parallelize(rdd.take(5)))
Get authors with more than one tweet, or whose username starts with 'rm'
Let's get a bit more fancy now - filtering the resulting list of authors to only show the ones who have tweeted more than once in our batch window, or -arbitrarily- whose screenname begins with rm
filtered_authors = author_counts.filter(lambda x:\
x[1]>1 \
or \
We'll print this list of authors matching the criteria, sorted by the number of tweets. Note how the sort is being done inline to the calling of the pprint
function. Assigning variables and then pprint
ing them as I've done above is only done for clarity. It also makes sense if you're going to subsequently reuse the derived stream variable (such as with the author_counts
in this code).
(lambda rdd:rdd\
.sortBy(lambda x:-x[1]))\
List the most common words in the tweets
Every example has to have a version of wordcount, right? Here's an all-in-one with line continuations to make it clearer what's going on. Note that whilst it makes for tidier code, it also makes it harder to debug...
flatMap(lambda tweet:tweet['text'].split(" "))\
(lambda rdd:rdd.sortBy(lambda x:-x[1]))\
Start the streaming context
Having defined the streaming context, now we're ready to actually start it! When you run this cell, the program will start, and you'll see the result of all the pprint
functions above appear in the output to this cell below. If you're running it outside of Jupyter (via spark-submit
) then you'll see the output on stdout.
Time: 2017-01-11 15:34:00
Tweets in this batch: 188
Time: 2017-01-11 15:34:00
You can see the full output from the job in the notebook here.
So there we have it, a very simple Spark Streaming application doing some basic processing against an inbound data stream from Kafka.
Windowed Stream Processing
Now let's have a look at how we can do windowed processing. This is where data is processed based on a 'window' which is a multiple of the batch duration that we worked with above. So instead of counting how many tweets there are every batch (say, 5 seconds), we could instead count how many there are per minute. Here, a minutes (60 seconds) is the window interval. We can perform this count potentially every time the batch runs; how frequently we do the count is known as the slide interval.
Image credit, and more details about window processing, here.
The first thing to do to enable windowed processing in Spark Streaming is to launch the Streaming Context with a checkpoint directory configured. This is used to store information between batches if necessary, and also to recover from failures. You need to rework your code into the pattern shown here. All the code to be executed by the streaming context goes in a function - which makes it less easy to present in a step-by-step form in a notebook as I have above.
Reset the Environment
If you're running this code in the same session as above, first go to the Jupyter Kernel menu and select Restart.
Prepare the environment
These are the same steps as above.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
Define the stream processing code
def createContext():
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_02")
ssc = StreamingContext(sc, 5)
# Define Kafka Consumer
kafkaStream = KafkaUtils.createStream(ssc, '', 'spark-streaming2', {'twitter':1})
## --- Processing
# Extract tweets
parsed = v: json.loads(v[1]))
# Count number of tweets in the batch
count_this_batch = kafkaStream.count().map(lambda x:('Tweets this batch: %s' % x))
# Count by windowed time period
count_windowed = kafkaStream.countByWindow(60,5).map(lambda x:('Tweets total (One minute rolling count): %s' % x))
# Get authors
authors_dstream = tweet: tweet['user']['screen_name'])
# Count each value and number of occurences
count_values_this_batch = authors_dstream.countByValue()\
.transform(lambda rdd:rdd\
.sortBy(lambda x:-x[1]))\
.map(lambda x:"Author counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))
# Count each value and number of occurences in the batch windowed
count_values_windowed = authors_dstream.countByValueAndWindow(60,5)\
.transform(lambda rdd:rdd\
.sortBy(lambda x:-x[1]))\
.map(lambda x:"Author counts (One minute rolling):\tValue %s\tCount %s" % (x[0],x[1]))
# Write total tweet counts to stdout
# Done with a union here instead of two separate pprint statements just to make it cleaner to display
# Write tweet author counts to stdout
return ssc
Launch the stream processing
This uses local disk to store the checkpoint data. In a Production deployment this would be on resilient storage such as HDFS.
Note that, by design, if you restart this code using the same checkpoint folder, it will execute the previous code - so if you need to amend the code being executed, specify a different checkpoint folder.
ssc = StreamingContext.getOrCreate('/tmp/checkpoint_v01',lambda: createContext())
Time: 2017-01-11 17:08:55
Tweets this batch: 782
Tweets total (One minute rolling count): 782
Time: 2017-01-11 17:08:55
You can see the full output from the job in the notebook here. Let's take some extracts and walk through them.
Total tweet counts
First, the total tweet counts. In the first slide window, they're the same, since we only have one batch of data so far:
Time: 2017-01-11 17:08:55
Tweets this batch: 782
Tweets total (One minute rolling count): 782
Five seconds later, we have 25 tweets in the current batch - giving us a total of 807 (782 + 25):
Time: 2017-01-11 17:09:00
Tweets this batch: 25
Tweets total (One minute rolling count): 807
Fast forward just over a minute and we see that the windowed count for a minute is not just going up - in some cases it goes down - since our window is now not simply the full duration of the inbound data stream, but is shifting along and giving a total count for the last 60 seconds only.
Time: 2017-01-11 17:09:50
Tweets this batch: 28
Tweets total (One minute rolling count): 1012
Time: 2017-01-11 17:09:55
Tweets this batch: 24
Tweets total (One minute rolling count): 254
Count by Author
In the first batch, as with the total tweets, the batch tally is the same as the windowed one:
Time: 2017-01-11 17:08:55
Author counts this batch: Value AnnaSabryan Count 8
Author counts this batch: Value KHALILSAFADO Count 7
Author counts this batch: Value socialvidpress Count 6
Author counts this batch: Value SabSad_ Count 5
Author counts this batch: Value CooleeBravo Count 5
Time: 2017-01-11 17:08:55
Author counts (One minute rolling): Value AnnaSabryan Count 8
Author counts (One minute rolling): Value KHALILSAFADO Count 7
Author counts (One minute rolling): Value socialvidpress Count 6
Author counts (One minute rolling): Value SabSad_ Count 5
Author counts (One minute rolling): Value CooleeBravo Count 5
But notice in subsequent batches the rolling totals are accumulating for each author. Here we can see KHALILSAFADO
(with a previous rolling total of 7, as above) has another tweet in this batch, giving a rolling total of 8:
Time: 2017-01-11 17:09:00
Author counts this batch: Value DawnExperience Count 1
Author counts this batch: Value KHALILSAFADO Count 1
Author counts this batch: Value Alchemister5 Count 1
Author counts this batch: Value uused2callme Count 1
Author counts this batch: Value comfyjongin Count 1
Time: 2017-01-11 17:09:00
Author counts (One minute rolling): Value AnnaSabryan Count 9
Author counts (One minute rolling): Value KHALILSAFADO Count 8
Author counts (One minute rolling): Value socialvidpress Count 6
Author counts (One minute rolling): Value SabSad_ Count 5
Author counts (One minute rolling): Value CooleeBravo Count 5
What I've put together is a very rudimentary example, simply to get started with the concepts. In the examples in this article I used Spark Streaming because of its native support for Python, and the previous work I'd done with Spark. Jupyter Notebooks are a fantastic environment in which to prototype code, and for a local environment providing both Jupyter and Spark it all you can't beat the Docker image all-spark-notebook.
There are other stream processing frameworks and languages out there, including Apache Flink, Kafka Streams, and Apache Beam, to name but three. Apache Storm and Apache Samza are also relevant, but whilst were early to the party seem to crop up less frequently in stream processing discussions and literature nowadays.
In the next blog we'll see how to extend this Spark Streaming further with processing that includes:
- Matching tweet contents to predefined list of filter terms, and filtering out retweets
- Including only tweets that include URLs, and comparing those URLs to a whitelist of domains
- Sending tweets matching a given condition to a Kafka topic
- Keeping a tally of tweet counts per batch and over a longer period of time, as well as counts for terms matched within the tweets