Portworx & Red Hat Hands-on Labs Register Now

Apache Kafka is a stream processing platform maintained by the Apache Foundation. Developers can use it with many different programming languages, like Node and Python. It acts as a message broker and transmits messages or streams of data.

The Apache Kafka message broker helps decouple sending and receiving services from logic. These services use the publisher-subscriber (pub-sub) pattern. The publisher publishes a message on a messaging queue, and the subscriber consumes it by listening to the queue. Kafka also helps retain the messages in queue until the subscribers or consumers consume them, reducing failures and retries.

It comes packed with the pub-sub and queuing features together, maintains message insertion history, and isn’t affected by network latency and errors. It also enables broadcasting. Apache Kafka is a reliable, scalable, low-latency message queuing and brokering system, making it an excellent choice as a distributed queue for event-driven architectures.

Kafka can transmit data to and from databases such as Apache Cassandra. Cassandra is a high-performance distributed database for handling vast amounts of data. Its peer-to-peer NoSQL structure makes it fast and easy to scale with the system architecture.

Cassandra’s data replication strategy makes it a highly available database. Its nodal-peer architecture also makes it highly fault-tolerant. It supports hybrid cloud architecture, too.

In this tutorial, we’ll explore how to integrate Apache Kafka with Cassandra in a project. We’ll then run this project on a Kubernetes cluster.

Integrating Kafka and Cassandra

Apache Kafka and Cassandra complement each other well. When integrated, they form a dynamic duo that’s highly available, robust, fault-tolerant, scalable, and flexible.

Kafka and Cassandra can work together in multiple ways. First, Kafka can act as a go-between for microservices. In this integration method, Cassandra stores data generated from event processing. Kafka drives the events where the consumer consumes the producer’s data stream, and Cassandra then stores the resultant data.

You can also create a Cassandra Sink for Kafka. The DataStax Apache Kafka Connector inserts Kafka-generated data into Cassandra. This method is handy for distributed systems requiring multi-datacenter replication and multiple events. We’ll explore it later in this article.

Finally, you can capture change data from Cassandra via plugins using the Kafka Connect framework.

A good example of Apache Kafka and Cassandra being used together is the Instaclustr Anomaly Detection application. According to Instaclustr, after upgrading their system to use Kafka and Cassandra together, “[t]he system achieved a peak Kafka writes of 2.3Million/s, while the rest of the pipeline ran at a sustainable anomaly check of 220,000/s.The system processed a massive 19 Billion events per day (19 x 10^9), nearly 500 times higher throughput with lower real-time latency compared to any previously published benchmarks for the Anomaly detection system.”

Building a Kafka-Cassandra Sink Application

To understand how to use a Cassandra Sink for Apache Kafka, let’s build a sample Python project. The sample project has two parts. First, the producer reads sales data from a CSV and creates a data stream. Then, we feed this data stream to a Cassandra Sink.

Prerequisites

You should have Apache Kafka, Cassandra, a running Kubernetes cluster, and Docker running on your system to follow this tutorial. Note that Apache Kafka requires that Java version 8 or higher be installed on your system.

Pushing the Data into Kafka

We use a simple record_parser_kafka.py file to read the data from the CSV and make it available to Kafka. We build a Kafka producer, read the CSV data, and feed the data to Kafka using the send method.

The code in the file is as follows:

bootstrap_servers = ['localhost']
topicname = 'test.salesdata'
producer = KafkaProducer(bootstrap_servers = bootstrap_servers,api_version=(0,1,0))
producer = KafkaProducer()

with open('SalesRecords.csv','r') as read_obj:
csv_dict_reader = DictReader(read_obj)
for row in csv_dict_reader:
     ack = producer.send(topicname, json.dumps(row).encode('utf-8'))
     result = ack.get()
    # print(result)

We need a consumer to pull these messages from the topic and save them row-by-row in a Cassandra table. But first, we need to prepare the Cassandra cluster by creating a keyspace and a table where we’ll insert our records.

Setting up Cassandra

First, we set up a keyspace in our Cassandra cluster. A keyspace tells Cassandra a replication strategy for any tables it creates. To create a keyspace, we first need to open the Cassandra query language shell (cqlsh).

To open cqlsh, type this command in a terminal:

cqlsh

This command should open the cqlsh prompt — that is, cqlsh>.

Then, we create a keyspace called sales_data_kafka2cassandra using this command:

CREATE KEYSPACE sales_data_kafka2cassandra WITH REPLICATION={'class': 'SimpleStrategy', 'replication_factor':'3'} AND durable_writes =true;

contentblog

Next, we switch over to using the keyspace, then create the table salesdata, as follows:

use sales_data_kafka2cassandra;
 CREATE TABLE sales_data_kafka2cassandra.salesdata (
                              ... data_id timeuuid,
                              ... region text,
                              ... country text,
                              ... item_type text,
                              ... sales_channel text,
                              ... order_priority text,
                              ... order_date text,
                              ... order_id text,
                              ... ship_date text,
                              ... units_sold text,
                              ... unit_price text,
                              ... unit_cost text,
                              ... total_revenue text,
                              ... total_cost text,
                              ... total_profit text,
                              ...  PRIMARY KEY (data_id)
                              ... ) ;

We’ve added a column called data_id in the table and assigned it a timeuuid type. Every time something inserts a record in the Cassandra table, we get a uniquely auto-generated ID for the record.

Now that we’ve set up our Cassandra tables, we’re ready to code the consumer and Sink.

Using Kafka Consumer with the Cassandra Data Sink

We need the DataStax Python Driver for Apache Cassandra to insert data into the Cassandra table. We also need a Kafka consumer to pull the messages from the topic for the Cassandra driver to pick up. Here’s the code for the Kafka consumer and table insertion:

kafka_consumer_sink.py
from kafka import KafkaConsumer
import json
import sys
from cassandra.cluster import Cluster
bootstrap_servers = ['localhost:9092']
topicname = 'sales_data_kafka2cassandra.salesdata'
consumer = KafkaConsumer(topicname, bootstrap_servers = bootstrap_servers,auto_offset_reset='earliest',group_id="test-consumer-group")
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('sales_data_kafka2cassandra')
try:
for message in consumer:
     entry = json.loads(json.loads(message.value))['salesdata']
        session.execute(
            """
        INSERT INTO salesdata (data_id,region,country,item_type,sales_channel,order_priority,order_date,order_id,ship_date,units_sold,unit_price,unit_cost,total_revenue,total_cost,total_profit)
        VALUES (now(),%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
           """,(entry['Region'],entry['Country'],entry['Item Type'],entry['Sales Channel'],entry['Order Priority'],entry['Order Date'],entry['Order ID'],entry['Ship Date'],entry['Units Sold'],entry['Unit Price'],entry['Unit Cost'],entry['Total Revenue'],entry['Total Cost'],entry['Total Profit']))
except KeyboardInterrupt:
sys.exit()

Notice that we make the Cassandra cluster connection using the following lines:

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('sales_data_kafka2cassandra')

The code then inserts the data as a JSON record.

Deploying the Application to Kubernetes

We’ll deploy our consumer application to a Kubernetes cluster. We ensure that the consumer runs continuously by deploying a Docker container to run the script for us. The Dockerfile is as follows:

FROM python:3
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY...
CMD [ "python", "-u", "./kafka_consumer_sink.py" ]

Next, we need to build the Docker container and push it to our Kubernetes cluster’s Docker registry. Once this is complete, we need to create a YAML file for the Kubernetes management system. Our YAML file contains the following:

kafka2cassandra.yaml
apiVersion: v1
kind: Pod
metadata:
  name: kafka2cassandra
spec:
  containers:
- name: kafka2cassandra
   image: testsalesdata/kafka2cassandra
   stdin: true
   tty: true

The image name should match the name we configured for the Docker container. When our YAML file is complete, we can deploy our pod and check the logs to confirm everything is working correctly.

To deploy our pod, we run this command:

kubectl create -f kafka2cassandra.yaml

When we deploy services to the Kubernetes cluster, we should keep a few things in mind to ensure the most efficient cluster performance:

  • Ensure we are running the latest version of Kubernetes, so we have all the security patches and platform upgrades.
  • Use Namespaces. Although optional, namespaces help ensure logical resource separation in a Kubernetes cluster in a distributed multiple-team environment.
  • Ensure the cluster image contains only the libraries and packages we require. This ensures better use of available resources.
  • In a multi-team environment, it is better to set limits on cluster resources like CPU and memory.
  • Always configure Kubernetes probes like the liveness and readiness probes. These probes are similar to a health check and help avoid unnecessary cluster downtime and failures.
  • Monitor the audit logs to ensure there are no security threats or breaches and that the clusters are running at optimal capacity and performance.
  • Deploy a role-based access control (RBAC) to ensure only members with the correct authorizations can access the clusters.
  • Use Kubernetes’ horizontal pod autoscaling and cluster autoscaling services to ensure the application scales dynamically in real time.

Once our application is deployed, we can confirm it is running by checking the Kubernetes logs or using the Cassandra query language shell. We can start a Cassandra query language shell using the cqlsh command.

We run a count query to confirm our application dumped the entries in the Cassandra table:

Select count(*) from sales_data_kafka2cassandra.salesdata;

image

Cloud-Native Storage Options

Apache Kafka and Cassandra work well together and are the best option for distributed systems. Most distributed systems are cloud-native, and their database should also be cloud-native to harness the potential of cloud-native applications.

A cloud-native database is scalable, elastic, and resilient. It can also be automated. Kubernetes provides persistent storage and stateful states and also offers APIs, the ability to scale, and inherent security for the Cassandra layer and Kafka layers.

Apache Kafka must be reliable, healthy, scalable, elastic, and secure, and a Kubernetes cluster helps with these needs. A cloud-native platform like Kubernetes also encrypts all our data by default, helps automate configuration, and ensures network communications by default.

Deploying our Apache Kafka and Cassandra distributed application on a Kubernetes cluster makes sense for all these reasons. However, Kafka and Cassandra need persistent data storage. Unfortunately, Kubernetes doesn’t provide it ​​— and persistent data storage is what makes a cloud-native database powerful.

Portworx provides persistent data storage and much more. In addition to running stateful applications, Postworx is packed with features like disaster recovery, data security, data scaling, performance, backup for all our Kubernetes applications, and more.

Conclusion

In this article, we integrated Apache Kafka with Cassandra into our sample application using Kafka Connector and Cassandra Sink. We also deployed our application to a Kubernetes cluster because cloud-native storage helps distributed systems reach their full potential. But, as we learned, Kubernetes doesn’t provide us with the storage we need. Portworx provides a persistent data storage service for our Kubernetes applications integrating Kafka and Cassandra. To understand the full potential of Portworx, explore Portworx Data Services and learn how it integrates with Kafka and Cassandra to power your distributed applications.

Share
Subscribe for Updates

About Us
Portworx is the leader in cloud native storage for containers.

ryan

Ryan Wallner

Portworx | Technical Marketing Manager
Explore Related Content:
  • cassandra
  • kafka
  • kubernetes
link
doors
February 8, 2022 Technical Insights
Kubernetes Operator Options for Apache Kafka
Ryan Wallner
Ryan Wallner
link
Graphic-46
March 27, 2019 How To
Run HA Kafka on IBM Cloud Kubernetes Service
Janakiram MSV
Janakiram MSV
link
Graphic-28
February 6, 2019 How To
How to Run HA Kafka on Red Hat OpenShift
Janakiram MSV
Janakiram MSV