Kafka Streams in Python Using Faust Streaming

By Adrian Gonciarz, QA / SRE Director

What is Kafka Stream processing?

Stream processing is a programming paradigm according to which input and output data is viewed as a continuous stream manipulated by the processor. Confluent Inc. has introduced the Kafka stream processing concept along with the Kafka Streams client - for Java/Scala applications. The idea of a Kafka stream "represents an unbounded, continuously updating data set.” ¹ Kafka messages sent to topics are an excellent and basic example for such a stream of data. They are ordered (partition-wise), re-playable, and fault-tolerant, as demanded by the definition.


A data stream can either be an input (source) or an output (sink) to any number of stream processors. Processors are applications which use the input data to do computations such as aggregation or data enrichment, etc. and produce output to another stream. One application’s output stream can be another application’s input and so on. A stream processing topology can be complex, but here is a simple example to illustrate it.


An example of stream processing

Imagine your system handles orders from customers. Each order created in the system is posted by some microservice to the Kafka topic called order-events. The event contains data about the order including: timestamp, customer email, order ID, and the list of order items. It is represented by the following JSON:

{
  "created": "2022-11-08T10:24:22.601603",
  "email": "kate@example.com",
  "items": [
    {
      "name": "t-shirt",
      "price": 20,
      "quantity": 1
    },
    {
      "name": "socks",
      "price": 8,
      "quantity": 3
    },
    {
      "name": "shoes",
      "price": 50,
      "quantity": 3
    }
  ],
  "id": "d85b9ee8-f4ba-496b-99c9-699a64c874f7"
}

For some reason, no one included the order total in the message and you would perhaps want to have the information in your database. Stream processing can be used to help achieve that goal, by attaching a processor to the input stream of order-events, calculating the total for each order, and sending a message containing order ID and total to a downstream topic (sink), called order-totals:

{
    "id": "d85b9ee8-f4ba-496b-99c9-699a64c874f7",
    "total": 194.0
}

This is an intermediate data point that can be used in various ways, like periodic calculation of average order total or total sales per hour.


Implementing the idea of Kafka Streams in Python

Although, as mentioned earlier, Kafka Streams is a Java/Scala library, the concept has been ported to other languages, including Python. Faust, developed by Robin Hood Inc., is a popular and reliable package providing similar functionality. However, at the time of writing this article, it has not seen active development for over two years. The community forked it as Faust Streaming, and this properly maintained and supported library, is the one that we will be using. We will not deep-dive into every concept behind the library. The documentation provided by Faust Streaming is very good and explains most of the concepts in detail. Instead, this post will focus on the implementation of the example described above using Faust Streaming, with a step-by-step explanation.


For more great implementation examples, visit the Faust Streaming repository.


Running a Kafka Broker

In order to implement Kafka stream solution, even as an exercise, you'd need a running Kafka Broker instance. The simplest way, in my humble opinion, is to run it using Docker Compose. Using the above solution, you can pass an environment variable to automatically create topics. You need to create the topics below in order to follow the exercise.

  • order-events

  • order-totals

Creating a project

Let’s start by creating a new Python project, i.e. a new directory and a virtual environment. (keep in mind I am using Mac OS X and commands may vary depending on the system you use)

$ mkdir faust-orders
$ cd faust-orders
$ virtualenv venv -p python3
$ source venv/bin/activate
$ which python

The last command should return the path of your current directory, which will indicate you’re using a local Python virtual environment. It’s surprising how often we forget to actually activate the venv, right?

Next, we’ll install faust-streaming by running the following command:

pip install faust-streaming

Now, we’re ready to start some coding!

Faust Streaming application

We’ll create the file main.py and inside it we’ll initialize the Faust application. I have my Kafka Broker running on localhost:9092 - and if you are like me, you’ll want your 3rd party addresses to be held under an environment variable.

import faust
import os

kafka_broker = os.getenv('KAFKA_BROKER', 'localhost:9092')

app = faust.App(
    'orders-processor',
    broker=kafka_broker,
    value_serializer='json'
)

if __name__ == '__main__':
   app.main()

The first parameter (‘orders-processor’) is the application name, the others are self explanatory.

In order to run our application, we use the following command:

python main.py worker

So far, no logic has been implemented, thus, you should only see the Faust Streaming startup logs, including the information about topics created or used by our app.

Reading the stream

1. Data models

The first step we take to process the data is reading the input stream. We will first need a model for deserializing the messages:

from faust import Record
import datetime
from typing import List

class OrderItem(Record, serializer='json'):
   name: str
   price: float
   quantity: int

class Order(Record, coerce=True, serializer='json'):
   email: str
   created: datetime.datetime
   id: str
   items: List[OrderItem]

These classes inherit from the Faust Streaming Record class, and are simple data models we’ll use to deserialize the messages from the input stream - later on we’ll define a similar model to serialize messages for the output. As you can see, we first defined the model for OrderItem, which we then use in the Order class to define a list-type field called items. In order to deserialize the datetime field automatically, the coerce=True flag is set in the Order class definition parameters.


2. Topic

Next, we need to define the topic which we’re going to read the stream from:

input_topic = app.topic('order-events', value_type=Order)
3. Agent function

Finally, we need a function to process the input stream of orders and calculate the order total for each of the incoming orders:

@app.agent(input_topic)
async def calculate_total(orders):
   async for order in orders:
       total = sum([i.price*i.quantity for i in order.items])
       print(total)

There are three important things to note regarding the defined function:

  • It is an asynchronous function; since Faust Streaming uses Python’s native asyncio mechanism for providing reliable, highly efficient processing.

  • It is decorated with the @app.agent annotation, along with the reference to the input topic.

  • There is an asynchronous for-loop defined inside the function. The loop body defines the actual logic of the processing executed for each message in the stream.

For the time being, we have just implemented a simple calculation of the order total (calculating the sum of item price multiplied by item quantity for each of the order items) and printing the total to the standard output. Let’s see how this works, but before that, let me post the code we have put in main.py so far:

import datetime
import os
from typing import List

import faust
from faust import Record

kafka_broker = os.getenv('KAFKA_BROKER', 'localhost:9092')

app = faust.App(
   'orders-processor',
   broker=kafka_broker,
   value_serializer='json'
)


class OrderItem(Record, serializer='json'):
   name: str
   price: float
   quantity: int


class Order(Record, coerce=True, serializer='json'):
   email: str
   created: datetime.datetime
   id: str
   items: List[OrderItem]


input_topic = app.topic('order-events', value_type=Order)


@app.agent(input_topic)
async def calculate_total(orders):
   async for order in orders:
       total = sum([i.price * i.quantity for i in order.items])
       print(total)


if __name__ == '__main__':
   app.main()

A simple processor in action

First, let’s run the application as we did before:

python main.py worker

Now, let’s send two messages to the order-events topic:

{"created": "2022-11-10T09:57:26.148598", "email": "kate@example.com", "items": [{"name": "Hat", "price": 15.0, "quantity": 2}, {"name": "shoes", "price": 50.0, "quantity": 2}, {"name": "t-shirt", "price": 20.0, "quantity": 3}, {"name": "hoodie", "price": 30.0, "quantity": 2}], "id": "488aa8f3-2974-43eb-b43a-37e6fff66ed3"}

{"created": "2022-11-10T09:57:26.152157", "email": "adam@example.com", "items": [{"name": "t-shirt", "price": 20.0, "quantity": 1}, {"name": "socks", "price": 8.0, "quantity": 3}, {"name": "Hat", "price": 15.0, "quantity": 2}], "id": "4986a07f-7061-4f6d-afb9-df1c73f5346d"}

In the output of terminal running the processor, we should see something like this:

[2022-11-10 09:57:26,177] [47331] [WARNING] 250.0 
[2022-11-10 09:57:26,178] [47331] [WARNING] 74.0 

Which means, for each order posted to the order-events topic, the app has calculated the total of the order properly and printed it out.

Sending to a sink

Since we’ve already covered the input of the processor, we need to add the second part, namely sending messages to the output stream (commonly referred to as the “sink”). For each processed order, we’d like to output a message with the order ID and its total:

{"id": "488aa8f3-2974-43eb-b43a-37e6fff66ed3", "amount": 74.0}

First, let’s define a serializer class for output message:

class OrderTotal(Record, serializer='json'):
   id: str
   amount: float

Then, we’ll define the output topic the same way as we did in case of the input above:

sink = app.topic('order-totals', value_type=OrderTotal)

And finally, let’s modify the code of the processing function by creating an OrderTotal instance, filling in the fields, and sending the message to the sink topic:

@app.agent(input_topic)
async def calculate_total(orders):
   async for order in orders:
       total = sum([i.price * i.quantity for i in order.items])
       print(total)
       order_total = OrderTotal(id=order.id, amount=total)
       await sink.send(key=order_total.id, value=order_total)

Let’s run our app again with the updated code and observe the effects in the sink topic.


Running the processor with an output stream

Again, start the app and feed some messages to the input topic, order-events:

{"created": "2022-11-10T12:18:54.075909", "email": "adam@example.com", "items": [{"name": "hoodie", "price": 30.0, "quantity": 3}, {"name": "t-shirt", "price": 20.0, "quantity": 3}, {"name": "shoes", "price": 50.0, "quantity": 2}, {"name": "socks", "price": 8.0, "quantity": 1}, {"name": "Hat", "price": 15.0, "quantity": 2}], "id": "b67ee2fe-7b08-4cbb-91dd-c3b743045c02"}
{"created": "2022-11-10T12:18:54.077766", "email": "mary@example.com", "items": [{"name": "socks", "price": 8.0, "quantity": 3}, {"name": "hoodie", "price": 30.0, "quantity": 1}, {"name": "t-shirt", "price": 20.0, "quantity": 2}, {"name": "Hat", "price": 15.0, "quantity": 1}, {"name": "shoes", "price": 50.0, "quantity": 2}], "id": "2c4b3a97-544b-4672-a9ed-7351cffe3e3e"}

When you read the messages in the order-totals topic, you’ll see the following:

{"id":"b67ee2fe-7b08-4cbb-91dd-c3b743045c02","amount":288.0,"__faust":{"ns":"main.OrderTotal"}}
{"id":"2c4b3a97-544b-4672-a9ed-7351cffe3e3e","amount":209.0,"__faust":{"ns":"main.OrderTotal"}}

The produced messages also contain an automatically added __faust key with information about the namespace which includes the package and model used for serialization.

We've now successfully written a stream processing application which reads an input stream and produces calculated data for each message in it.


Summary

Faust Streaming, an actively maintained fork of the Faust library, is a great tool for implementing the Kafka Streams concept in Python. It has proven to be fast, reliable, and easy to use in our production environment.


If you found this article interesting, Faust Streaming has really good user documentation that is well worth reading.


Give this idea a try on some of your own problems!




Sources:

1: https://kafka.apache.org/26/documentation/streams/core-concepts