Cloud-Native Event Streaming With Pravega | by Alex | Nov, 2022

Build an Efficient Streaming System

Image by author | Logo by Python and Pravega

So, recently I built a little FastAPI chatroom using WebSockets, and I love it. However, I was wondering about Web Sockets and how much research has gone into streaming services over the years. Are web sockets still the way to go? If they aren’t, what new tools are out there to help facilitate that process?

I’ve used Kafka before, and I was a bit familiar with RabbitMQ, but what else?!

So, I headed over to the CNCF sandbox to see if I could find something new to play with. Something with storage, an easy interface, and hopefully some Python bindings that I can add relatively easily. I found two very attractive streaming services in the CNCF sandbox that fit that bill:

I landed on Pravega because it was extremely easy to get up and running, has a standalone and distributed mode, seems incredibly efficient, and has auto-scaling (I’m an SRE and a sucker for auto-scaling). Plus there are tons of other neat features that I encourage you to take a look at.

So, why a chatroom? Ok, as soon as I send a message, a user should be alerted about the message and should then have the ability to reply. This sounds like a perfect use case for an event-based or streaming system! So let’s start.

What is velocity? Pravega is an open-source streaming and storage utility that promises unbeatable performance, long-term storage, and autoscaling. Is it not suitable for chat rooms?

In our chatrooms we will use it instead of basic sockets, web sockets or other popular chatroom means.

There are many ways to run Pravega, but I only ran it as a standalone container. There are also many ways to configure Pravega, such as distributing different shards in different containers for reliability purposes (which I recommend in a production system). However, we’re going to simply run it in standalone mode:

That’s it. You will now see Pravega running in a terminal, ready to accept connections!

StreamCli is what we are going to name our Python app. It will be an easy to run CLI that connects to the Pravega broker in both read and write capacity. Slack also inspired me to include some “slash-commands” to help us reduce typing for common tasks (greetings, jokes, etc.).

For all this to happen, we will need the following:

  • To read something from the Pravega stream and act on the messages
  • Something to take user input and write it to a Pravega stream
  • something to manage reader and writer

It sounds like a lot, but I promise it’s quick, easy, and only 200 lines of code.

Let’s start with the reader because it’s the most complicated. After that, it’s all downhill. Before actually sending the messages, we must agree on the format of the messages. This way, whatever is being sent can be processed properly on the receiving end of the stream. For that, we can use the Pedantic model:

It defines a python object that has two string properties:

  • From: ID of the sending user
  • message: content of the message

Pedantic models have a ton of helpers associated with them, such as:

  • JSON works
  • parsing task
  • very much

Now that we’ve agreed on a message format, we can start writing our reader. Our reader will need two features:

  • an ID so it knows who it’s getting it for
  • an impulse reader instance so it can connect to the stream

Its definition and init are simple. Here’s the code:

Logically, we need a method for:

  • read from stream
  • act on the message
  • tell the stream when it is listening (close the connection)

We can read from the stream using our Pravega example. Pravega organizes messages into things called slices, A shard is a part of a section, which are shards or fragments of a stream. Once we read that passage from the segment, we need to inform Pravega that it has been processed and should not be processed again by the same reader group. Finally, we need to process that piece and turn it into something actionable. So, in the code, our read The function looks like this:

Note that we first read a fragment from the clause using get_segment_slice_async, Once we have read all the data from the segment, then we tell Pravega that this reader group has processed the message and does not need to be processed again by this reader group release_segment, Finally, we process the message _process_message (more on that shortly).

Now that we’ve read a message, we need to do some action with it. this is where _process_message Comes to play. logically, our _process_message Must do:

  • Parse the message from its format to our message format
  • process it somehow
  • show it to the receiving user

It’s worth noting that I wanted to add two special commands to my chat system:

  • keywords greet should greet the user by saying says hi!,
  • keywords joke The receiving user must tell a random joke

so our _process_message The function becomes the following:

Note how we load the message from JSON format first. It will be sent as (discussed below) in our desired message format. Next, because we’re reading and writing to a stream, we want to make sure we’re not writing to ourselves. Finally, we process the message using our desired format and return it to our caller. for completeness, _tell_joke And _greet The functions are shown below:

Finally, we need a close method that tells Pravega, “This listener has ended and will be removed from the chatroom!”:

And all! Now we have an object that can read from a stream and perform semi-intelligent actions based on the messages received from the stream.

The writer is much simpler than the reader. It needs two pieces of information:

  • its id
  • a momentum writer example

Its definition and init are below:

It only needs one or two methods:

  • to frame a message
  • to send a message

Let us see how to frame the message.

So, given this, we build a pedantic model (called Message) and enter the required information. Finally, we return it as a JSON string to send over the stream. As a side note, we briefly looked at Message Pedantic model when discussing the reader.

Finally, we can send messages. Here’s what it looks like:

We see that we prepare our message and then send it to the momentum writer instance.

So, we have our readers, and we have our writers. To chat, we need to combine the two into a neat object. we will call that thing ours Chatter (see definition and init below):

Our chatter object will retain its reader and writer and notify them to close it with running feature (more on that shortly).

Conceptually, our chatter would need to have the same functions as its components. Namely:

  • a reading ceremony to ask the reader to do their work
  • a write function to tell the author to do his work
  • a close work to gracefully finish its pieces

We also don’t want a blocking system (forced to read before writing), so we’ll use separate threads for reader and writer.

The read function in the Chatter object is much simpler than in the Reader object, as it only needs to wait reader.read Method to return it, format it and print it:

You’ll see that when we finish running (self.running is false), we will automatically close our reader.

write The method is similar in that sense. All it has to do is take user input and send it to the writer. Here’s what it looks like:

Our close method simply has to eliminate those loops, and readers will close gracefully:

We can tie all these pieces together into a nice, concurrent package with threads:

If we analyze this a bit further, we can do the following:

  • Create a reader thread. Since read is an async function (so it doesn’t block), we need to wrap our target in asyncio, This thread will run the asynchronous callback.
  • Create an author thread. Writing is not asynchronous, as we have to wait for user input. So we can use specific thread target here.
  • Start our thread!

Finally, we bundle it all together and package it as a CLI. There are lots of great Python logic-analysis utilities out there. Here are three great ones:

and i chose docopt For today because it’s easy to use and takes a lot of the load off the programmer. docopt Will parse our docstring and then generate logic based on that. Our docstring looks something like this:

So docopt We’ll analyze it, build up a dictionary of possible arguments, and then we can act on them accordingly.

Our main function does just that. Main Will:

  • Parse our docstrings and parse user supplied arguments
  • Create a Pravega stream manager, scope, stream, reader, and writer
  • Create our reader and writer wrappers
  • make our shit
  • start chatting

The Pravega API calls are the important parts. we make the first one StreamManager Which allows us to perform some administrative tasks for our Pravega cluster. Then we create or use a scope similar to a namespace. Then we create a stream within that scope and a readership group within that stream and scope. Each user has its own reader group, which means that when userA reads from stream, userBAnd userC will not be affected. Finally, we explicitly create separate readers and writers to be able to read from and write to the stream.

Now we are ready to run!

In a terminal, make sure your Pravega Docker image is running! If not, you can run the following:

Now, open two other terminals. run into one of them python streamcli.py tcp://127.0.0.1:9090 alex and in the other, run python streamcli.py tcp://127.0.0.1:9090 tim and start chatting back and forth!

Watch the video below!

All code can be found in my public repo at the following link https://github.com/afoley587/cloud-native-streaming,

Leave a Reply