Kafka migration from on-prem to Confluent

SADA
The SADA Engineering Blog
21 min readNov 8, 2023

--

Author: Shreya Kumari, Data Engineer, SADA

Overview

The purpose of this technical blog is to guide readers through the migration process from on-prem to Confluent Kafka and to shed light on the data ingesting/transformation processes carried through data pipelines to finally store data to BigQuery. The blog provides valuable insights, guidance, and best practices for successful migration and upgrading to Confluent Kafka clusters, a widely used distributed event streaming platform. Migrations can be complex tasks that require careful planning and execution.

Objective

  • Build an end-to-end migration framework from on-prem to Confluent Kafka.
  • Stages of the process will include the following steps:
  1. Migration from on-prem Kafka to Confluent Kafka servers, followed by Schema registry updates.
  2. Repointing existing data sources to Confluent Kafka for ingestion/transformation so as to finally store data in BigQuery tables.
  3. For new data sources, direct ingestion/transformation can be done from Confluent Kafka to BigQuery after migration is performed successfully.

Tech stack

  • Cloud Storage — Cloud Storage is utilized for saving built templates.
  • Dataflow — For building data pipelines which use built templates. Built Templates are stored in Google Cloud Storage (GCS) using whichever ingestion/transformation of data from Confluent Kafka that happens to finally store it in BigQuery.
  • BigQuery — A data warehouse, for storing and reading data in tabular form.
  • Schema Registry — For managing/updating Confluent Kafka topics Schema.
  • Confluent Kafka — For creating a Confluent Kafka cluster to migrate data from on-prem Kafka.
  • CloudComposer — For proper orchestration of the entire workflow.
  • Bitbucket — For maintaining version controlled code hosting/collaboration platforms.
  • Slack — A messaging application used here for alerting purposes.

Problem statement and solution

User story

Modernizing data infrastructure for improved scalability and real-time processing: XYZ Corporation wants to modernize their on-premises data infrastructure to improve scalability and enhance real-time data processing capabilities.

Acceptance criteria:

1. Problem statement:

The current on-premises data infrastructure is struggling to keep up with the increasing data volume and real-time processing demands, effectively causing performance issues and frequent downtimes. Integrating Confluent Kafka is a strategic move to address these challenges.

2. Integration with existing systems:

  • Problem: The existing on-prem Kafka data infrastructure lacks seamless integration with various systems, including databases, data warehouses, and analytics platforms.
  • Solution: Ensure that Confluent Kafka seamlessly integrates with various systems by establishing reliable data flows, synchronization mechanisms, and robust strategies for data consistency.

3. Data migration strategy:

  • Problem: The migration process from on-prem to Confluent Kafka must be meticulously planned to minimize downtime and data loss and ensure proper monitoring.
  • Solution: Develop a comprehensive data migration strategy that includes topic and data migration from on-prem Kafka to Confluent Kafka through replication and real-time alerts to track the migration progress, ensuring minimal disruption to the operations.

4. Data volume and throughput:

  • Problem: The current on-prem Kafka infrastructure cannot handle the growing volume and throughput of data streams without incurring significant infrastructure costs or compromising performance.
  • Solution: Optimize data processing within Confluent Kafka, including scaling considerations, to efficiently manage data volume and throughput without sacrificing performance and incurring excessive costs and further ingesting from Confluent Kafka through GCP Dataflow to store it as BigQuery tables

Definition of done:

  • Successful integration of Confluent Kafka with existing systems, enabling seamless data flow and synchronization.
  • Implementation of a robust data migration strategy that ensures minimal downtime and data loss during the transition.
  • Implementation of data cleaning and transformation processes to prepare data for ingestion from Confluent Kafka to BigQuery.
  • Deployment of real-time alerts to monitor the progress of data migration, as well as data validations with respect to data getting ingested from Confluent Kafka to BigQuery.
  • Optimization of Confluent Kafka to handle increased data volume and throughput without compromising performance.
  • Successful migration from the on-premises data infrastructure to Confluent Kafka, improving scalability and real-time data processing capabilities.

The user story encapsulates the organization’s need to modernize its data infrastructure using Confluent Kafka while addressing specific problems with the existing on-premises deployment.

Benefits with respect to user story

By migrating to Confluent Kafka, the aim is to address these critical issues. Here are the key benefits tied back to the user story:

  • Enhanced management tools: Confluent provides a set of management and monitoring tools, such as Confluent Control Center, that offer a more user-friendly and comprehensive way to manage and monitor Kafka clusters. This can alleviate some of the operational burdens faced in an on-premises Kafka setup.
  • Schema Registry: Confluent’s Schema Registry helps manage data schema evolution, ensuring data compatibility and reducing the risk of issues related to data format changes, which can be a challenge in on-premises Kafka.
  • KSQL and Kafka Streams: Confluent offers KSQL and Kafka Streams for real-time stream processing, which simplifies the development of real-time applications and analytics, addressing challenges related to data processing and integration.
  • Connectors ecosystem: Confluent provides a wide range of pre-built connectors (e.g., for databases, cloud services) that simplify data integration tasks, making it easier to connect to various data sources and sinks.
  • Managed services: Confluent Cloud, a fully managed cloud-based Kafka service, can offload much of the infrastructure management burden, reducing the need for extensive on-premises hardware and simplifying scalability.
  • Security features: Confluent Kafka offers robust security features, including role-based access control (RBAC), encryption, and authentication mechanisms, addressing concerns related to data security.
  • Managed upgrades: Confluent offers managed upgrades and support, making it easier to keep the Kafka infrastructure up to date with the latest features and bug fixes.
  • Scalability: Confluent Kafka is designed to scale horizontally, allowing organizations to handle growing data volumes and throughput requirements more effectively.
  • Community and support: Confluent offers professional support and a strong community, providing access to expertise and resources to address technical challenges.
  • Integration with Confluent Platform: Confluent Kafka integrates seamlessly with other components of the Confluent Platform, such as Confluent Replicator and Confluent Tiered Storage, to address data migration, backup, and storage management concerns.

While moving to Confluent Kafka can significantly alleviate many challenges faced by on-premises Kafka, it’s essential to conduct a thorough assessment of the organization’s specific requirements and constraints before making the transition. Additionally, a well-planned migration strategy and adequate training for the migration team will be crucial for a successful transition to Confluent Kafka.

Delivery semantics objective

The objective for delivery semantics in this user story is to ensure that data is reliably and consistently delivered to its intended destinations, even in the face of network failures or system errors. This will ensure that the business stakeholders can trust the data they receive and can make decisions based on accurate and up-to-date information.

Data has to arrive reliably and maintain its order and consistency. In the context of this migration, it’s crucial to specify how data will be delivered from the on-premises system to Confluent Kafka which includes factors like:

  • Delivery guarantees: The data delivery patterns such as “at-least-once delivery,” has to be maintained.
  • Message ordering: Message ordering should be maintained such that data arrives in the correct sequence, particularly if the order of data is critical for any applications.
  • Reliability mechanisms: To ensure data consistency and reliability, it is imperative to follow mechanisms for acknowledgments, retries, and error handling. These mechanisms safeguard against data loss caused by network issues or system failures.
  • Monitoring and alerts: It is crucial to generate real-time alerts to track the migration progress, enabling prompt issue identification, including cases where data delivery failures or errors occur.

The objective related to delivery semantics, as part of the user story, makes it clear that delivering data reliably and consistently is a fundamental aspect of the migration process. It ensures that the migration is not just about moving data but doing so in a way that guarantees data integrity and minimizes disruption to operations during the transition to Confluent Kafka.

Solution approach for migration

Migration process:

Assessment and planning:

  • Assess current Kafka deployment: Begin by understanding the existing Kafka deployment, the current Kafka version, configuration settings, topics, partitions, consumers, and producers.
  • Define migration goals: The goals of the migration needs to be defined, including an understanding of the specific benefits to expect from using Confluent Kafka, such as improved performance, security, or ease of management.
  • Select a Confluent Kafka version: Determine which version of Confluent Kafka best suits the requirements. Confluent documentation can be used to understand the features and changes in the chosen version.

Confluent Cloud provides Kafka as a cloud service, so it’s no longer needed to install, upgrade, or patch Kafka server components. For detailed knowledge of various aspects related to Confluent Kafka refer here: https://docs.confluent.io/cloud/current/overview.html

Environment setup:

  • Provision new cluster: A new Confluent Kafka cluster needs to be provisioned. To deploy Confluent Platform on one’s own infrastructure, Confluent Cloud can be used.
  1. Sign up and log in: If you already have an existing Confluent Cloud account, log in. If not, you’ll need to sign-up to create one.Here is the link to access.
  2. Create a new environment: In Confluent Cloud, environments are used to isolate resources. A new environment for the cluster can be created by navigating to this link or following these steps in Confluent Cloud Account,

i. Click on “Environments” in the left sidebar.

ii. Click the “Create Environment” button.

iii. Enter a name and description for the environment.

iv. Click “Create.”

3. Provision a new Kafka cluster: Now, a new Kafka cluster within the environment needs to be created, steps for which are as follows:

i. In the Confluent dashboard, click on the environment created.

ii. Click the “Add Cluster” button.

iii. Choose preferred cloud provider, region..Configure cluster settings.

iv. Click “Create Cluster.”

For more information on the quick start with Confluent Cloud using Basic Kafka cluster and environment setup refer here: https://docs.confluent.io/cloud/current/get-started/index.html

  • Configure cluster: The Confluent Kafka cluster has to be configured according to one’s requirements. Special attention is to be paid towards security settings, replication factors, and other configuration parameters that may differ from your existing Kafka setup.

Once the cluster is created, certain access and security settings need to be configured. Obtain the cluster-specific configuration details:

i. Click on the new cluster in the Confluent Cloud dashboard.

ii. Under “Overview,” the cluster’s connection settings, includes the bootstrap servers and API endpoints. For more information on API refer here: https://docs.confluent.io/cloud/current/kafka-rest/krest-qs.html

Iii. Under “Security,” create API keys and secrets for the applications to connect to the cluster securely.

  • Network configuration: Ensuring proper network connectivity between the existing Kafka cluster (if any) and the new Confluent Kafka cluster is the next step after cluster configuration. This may involve setting up firewall rules or VPN connections.

Confluent Cloud supports the following public internet connectivity and private networking solutions . For more information refer here: https://docs.confluent.io/cloud/current/networking/overview.html

Data migration:

  • Topic data migration: Migrating the existing Kafka topics and their data to the new Confluent Kafka cluster requires tools like Confluent Replicator to replicate data between Kafka clusters. For that, Confluent Replicator on the on-premises Kafka cluster has to be configured to replicate data to the Confluent Cloud cluster. Here’s an example of how to set up Confluent Replicator on the on-premises Kafka cluster:

i. Install Confluent Replicator on the on-premises servers.

ii. Create a Replicator configuration file (say replicator.properties) specifying the source (on-premises) and destination (Confluent Cloud) Kafka clusters. Example:

name=my-replicator
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
source.cluster.bootstrap.servers=On-prem-broker1:9092,On-prem-broker2:9092
source.cluster.security.protocol=PLAINTEXT
dest.kafka.bootstrap.servers=<Confluent_Cloud_Bootstrap_Servers>
dest.kafka.security.protocol=SASL_SSL
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_API_KEY" password="USER_API_SECRET";
topics=topic1,topic2

Start Confluent Replicator using the configuration file:

confluent-replicator run replicator.properties

This will start the replication process, copying data from the on-premises Kafka cluster to the Confluent cluster. Confluent Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages, Replicator will create topics as needed, preserving the topic configuration in the source cluster. For more information on Confluent Replicator refer here:

https://docs.confluent.io/platform/current/multi-dc-deployments/replicator/index.html#replicator-detail

  • Schema Registry: Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. The schemas and associated data have to be migrated to the Confluent Schema Registry. Schema Registry is a critical component for managing Avro schemas in a Kafka ecosystem. The Avro schema is there on the producer side for providing greater control over schema evolution because when data is produced, it is ensured that it adheres to a specific version of the schema. This helps prevent data compatibility issues when consumers are updated.

Below are the steps to migrate schemas:

  1. Export schemas from the on-premises Schema Registry:

Exporting existing schemas from the on-premises Kafka Schema Registry typically involves using the Schema Registry’s API to retrieve the schemas in Avro format. Here’s an example using curl to export schemas for a specific subject:

curl -X GET http://On-prem-schema-registry:8081/subjects/<subject-name>/versions/latest

2. Store exported schemas:

Save the exported schemas in a safe location. One can store them in files or a version control system to ensure they don’t lose any schema definitions.

3. Import schemas into Confluent Cloud Schema Registry:

Using the Confluent Cloud Schema Registry’s API, the schema imports were performed which were exported earlier. Here’s an example of how to do this using curl command:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data @<path-to-schema-file>.avsc \ https://<confluent-cloud-schema-registry-url>/subjects/<subject-name>/versions

Replace <path-to-schema-file>, <confluent-cloud-schema-registry-url>, and <subject-name> with the appropriate values for one’s migration.

Repeat this process for each schema exported from the on-premises registry.

For more information on schema registries refer here: https://docs.confluent.io/cloud/current/sr/index.html

Consumer and producer updates:

  • Update client libraries: A Kafka client library provides functions, classes, and utilities that allow developers to create Kafka producer clients (Producers) and consumer clients (Consumers) using various programming languages. If Kafka clients were used in any applications, an update must be performed so as to use the Confluent-supported client libraries, which are usually compatible with Apache Kafka but offer additional features and improvements. Firstly, the versions of the current Kafka client libraries that were used in the on-premises Kafka deployment need to be identified. This includes both producer and consumer libraries. For better understanding on implementation aspects related to producers and consumers refer to this link.

Visit Confluent’s website or documentation to identify the Confluent-supported versions of the Kafka client libraries that are compatible with Confluent Cloud. These versions are typically listed in Confluent’s documentation.

Compatibility:

Version 7.5 is a major release of Confluent Platform that provides Apache Kafka® 3.5, the latest stable version of Kafka.

When upgrading to a major release like Confluent Platform 7.5, it’s essential to consider client SDK compatibility, including sourceCompatibility and targetCompatibility. Here are some caveats and considerations for this upgrade:

  • Client SDK versions: Check the compatibility of the existing Kafka client SDK versions with Confluent Platform 7.5. Major Confluent releases may introduce changes or deprecated features that could impact the code.
  • Java compatibility: For Confluent Platform 7.5, ensure that the sourceCompatibility and targetCompatibility settings align with the recommended Java version. For example, if Java 11 is recommended, set sourceCompatibility and targetCompatibility to 11.

For more information refer to https://www.confluent.io/ and https://docs.confluent.io/platform/current/release-notes/index.html

Depending on the programming language and build system, the dependencies have to be updated for the respective project to use the Confluent-supported versions of the Kafka client libraries.

Below are the examples in Java:

Java (Maven): An update on the pom.xml file with the appropriate dependencies has to be performed as the first step:

<!-- Kafka Producer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-client</artifactId>
<version>CONFLUENT_VERSION</version>
</dependency>

Replace CONFLUENT_VERSION with the version that matches one’s Confluent Cloud cluster.

For more information on building client applications for Confluent Kafka refer to: https://docs.confluent.io/platform/current/clients/index.html

  • Configuration updates: The consumer and producer configurations were modified to align with any changes or improvements in Confluent Kafka. You need to update the producer and consumer code that interacts with Kafka in order to use Confluent’s Kafka client library. To do this, Maven or Gradle needs to be installed, depending on the project’s build system. Here’s a sample producer and consumer code for Java using Confluent’s Kafka client:

Producer Code (Java):

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Confluent Cloud Bootstrap Servers>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

try {
producer.send(new ProducerRecord<>("topic-name", "key", "value"));
} finally {
producer.close();
}
}
}

Consumer Code (Java):

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Confluent Cloud Bootstrap Servers>");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic-name"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// Process records here
}
}
}

Replace <Confluent Cloud Bootstrap Servers> with the actual bootstrap servers provided by Confluent Cloud.

For further reference please refer to: https://docs.confluent.io/kafka-clients/java/current/overview.html

Security and access control:

  • Security configuration: Security settings need to be configured, such as SSL/TLS for encryption, SASL for authentication, and ACLs (Access Control Lists) for authorization. Since Confluent Kafka is frequently used to store mission-critical data, enabling security features is crucial.

For more information refer to: https://docs.confluent.io/platform/current/security/general-overview.html

  • User and ACL migration: Migrating user accounts and access control rules to Confluent’s security framework should also be done (if applicable).

Monitoring and management:

  • Set up monitoring: Implementing monitoring solutions (e.g Confluent Control Center or other monitoring tools like Grafana etc) to keep track of one’s Confluent Kafka cluster’s health and performance should be practiced.

For more information refer to: https://docs.confluent.io/cloud/current/monitoring/metrics-api.html

  • Backup and recovery: Establishing backup and recovery procedures for ensuring data durability and disaster recovery is another important step.

For more information refer to: https://docs.confluent.io/platform/current/kafka/post-deployment.html#backup-and-restoration

Testing:

  • Testing phase: Thorough testing has to be performed to ensure that data is flowing correctly, consumers and producers are working as expected, and applications are functioning properly with the new Confluent Kafka cluster.

For more information refer to: https://blogs.halodoc.io/kafka-integration-testing-automation/

Documentation and training:

  • Documentation: Make sure to update the documentation to reflect the changes in the Kafka environment.
  • Training: It’s vital to provide training to the team members so as to familiarize them with the new Confluent Kafka setup and any additional tools provided by Confluent.

Rollout and monitoring:

  • Gradual rollout: Gradually transitioning traffic and workloads to the new Confluent Kafka cluster has to be facilitated by closely monitoring its performance and reliability during the transition.

For more information refer to: https://docs.confluent.io/platform/current/kafka/post-deployment.html#

Post-migration optimization:

  • Performance tuning: Fine-tuning the Confluent Kafka cluster for optimal performance has to be done based on real-world usage patterns.
  • Scaling: Adjusting the cluster size to accommodate increased workloads.

For more information refer to: https://docs.confluent.io/platform/current/kafka/post-deployment.html#performance-tips

Ongoing maintenance:

  • Regular updates are to be performed on Confluent Kafka and its components to stay up to date with the latest features and security patches.

Monitor and manage the Confluent Platform environment with Confluent Health+. This Ensures the health of the clusters and minimizes business disruption with intelligent alerts, monitoring, and proactive support based on best practices created by the inventors of Apache Kafka. For more information refer to: https://docs.confluent.io/platform/current/health-plus/index.html

Migrating to Confluent Kafka is a significant undertaking that requires careful planning and testing. It’s essential to involve all relevant teams, including developers, operations, and security, in the migration process to ensure a successful transition.

Solution approach for data ingestion and transformation

Block diagram and explanation

Figure 1 : Here A, B, C, D, E, F depicts:

A: Signifies data ingestion from new on-prem sources and replication from existing on-prem sources to Confluent Kafka.

B: Signifies the Dataflow job is used for data ingestion and performing intermediate transformations.

C: Signifies the ingested/transformed data is stored into BigQuery tables.

D: Signifies the SQL Validation scripts controlled through Cloud Composer.

E, F, G: Signifies the end users of this data, and includes Looker dashboards, Google Sheets exports, and other logical layers of data warehouse built on top of this existing data.

H: Signifies Slack used to trigger alert notifications if any validation failures happen through the Cloud Composer controlled validation scripts .

Data ingestion and transformation

  • The JSON config file can be prepared containing the destination dataset, table name, and schema that need to be ingested, and the transformation logic that needs to be applied to the data received from Confluent Kafka topics that will be stored in a structured tabular form in the data warehouse logical layers of BigQuery using Dataflow pipeline for ingestion and transformation.

Here is a sample JSON config that can be used for reference:

{
"table_config": [
{
"partition_field": "<PARTIONING FIELD>",
"row_transformner": "<TRANSFORMATION FILE PATH>",
"topic_name": "<TOPIC NAME>",
"table_name": "<TABLE NAME>",
"metadata": "<YES/NO>",
"table_schema": "<TABLE SCHEMA>",
"table_dataset": "<TABLE DATASET>"
}
}
  • The transformation logics needs to be specified by giving a path of the transformer file containing logics related to generic/specific transformations (like Conversion of Epoch time to BigQuery timestamp, rowcount calculation, adding timestamp to track when data is processed real time in the data pipeline), or any such transformations needed.
  • Build and package with Maven
mvn clean package

This will create a JAR file containing the application and its dependencies.

Assumption: Here we are going ahead with building the pipelines using Dataflow. We can use other alternatives as well like Data Fusion etc . The main reason behind using Dataflow in this case is because we are considering scenarios where the volume of data will be in TBs, so the pipelines should be robust enough to handle it, and even if there arises some discrepancies at the infrastructure level, we could change the machine type to facilitate the data ingestion/transformation through pipelines.

Create a Dataflow template:

  • To create a Dataflow template, one can use the Apache Beam library, which is compatible with Google Cloud Dataflow and other cloud-based data processing services. The necessary Apache Beam dependencies have to be added to the pom.xml file:
<dependencies>
<!-- Apache Beam Dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.36.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.36.0</version>
</dependency>
<!-- Other Dependencies -->
</dependencies>
  • Creation of a Dataflow pipeline to run the Kafka consumer as a template is the intermediate step to be performed in this whole end-to-end migration process in order for the pipeline to read messages from Kafka using the Kafka IO connector and process them as needed.

Here’s a simplified example of a Dataflow pipeline:

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class KafkaToDataflowTemplate {

public static void main(String[] args) {
DataflowOptions options = PipelineOptionsFactory.fromArgs(args).as(DataflowOptions.class);
options.setRunner(DataflowRunner.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
.apply("Read from Kafka", KafkaIO.<String, String>read()
.withBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.withTopic("your-kafka-topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.apply("Process Messages", ParDo.of(new KafkaMessageProcessor()));

pipeline.run();
}

public static class KafkaMessageProcessor extends DoFn<KafkaRecord<String, String>, Void> {
@ProcessElement
public void processElement(ProcessContext context) {
// Process Kafka message here
String messageValue = context.element().getKV().getValue();
// Your processing logic
}
}
}
  • The deployment script for deploying JSON Configs mapping to the bootstrap servers and schema registry URLs of the Confluent Kafka to build the Dataflow template is as follows:
mvn compile exec:java \
-Dexec.mainClass=com.example.KafkaToDataflowTemplate \
-Dexec.args="--project=your-project-id --tempLocation=gs://your-bucket/tmp"

The com.example.KafkaToDataflowTemplate should be replaced with the actual package and class name of the Dataflow pipeline.

  • Once the template is built, it has to be stored in Google Cloud Storage and can be deployed and executed in a cloud environment like Google Cloud Dataflow using the gcloud command-line tool or web console as follows to ingest into BigQuery.
Figure 2: Depicting the end-to-end flow of operations.

Data de-duplication and validation

  • At BigQuery level, scheduled merge can be performed from raw tables, in which Kafka ingestion happens through Dataflow, to a new final table to get rid of deduplication (if any). After a successful merge, a scheduled cleanup should be performed on the raw table so as not to accumulate redundant data. This workflow can be managed via Cloud Composer Airflow Dags.
  • Numerous scheduled SQL validation checks can be performed to check whether all the business regulations on data are being satisfied or not. These scheduled SQL validation checks, as well the workflow orchestration, can be controlled via Cloud Composer Airflow Dags.
  • Slack and mail alerts could be triggered in case of failures in business validation rules for immediate escalations and resolutions.

Decommission and cut-over

Decommissioning of on-prem Kafka topics

When no applications are actively using the topics and all data has been consumed, one can proceed with the decommissioning process. This typically involves using the Kafka command-line tools or an administrative library to delete the topics.

For example, using the Kafka command-line tool:

kafka-topics --zookeeper <zookeeper-connection-string> --delete --topic <topic-name>

Decommissioning of on-prem Kafka Dataflow pipelines in GCP

Assumption: That the on-prem data is also being stored on GCP.

1. Identify and document DataFlow pipelines:

  • All the Dataflow pipelines that need to be decommissioned have to be listed and later identified with their names, job IDs, and any related details.

2. Stop Dataflow jobs:

  • Google Cloud Console, Cloud SDK (gcloud), or the Dataflow API have to be used to stop the Dataflow jobs associated with the pipelines to decommission. A job can be stopped using the following gcloud command, for example:
gcloud dataflow jobs cancel JOB_ID

The jobs need to be stopped and verified that they are no longer running.

3. Check for pending data:

  • After stopping the Dataflow jobs, any available pending data in the Kafka topic that were the source of these pipelines, need to be checked. This is to ensure that all data has been processed and there are no unprocessed messages.

4. Export and backup data (if needed):

  • If needed to retain the data processed by these pipelines for archival or other purposes, consider exporting and backing up the data from the target storage (e.g., BigQuery, Google Cloud Storage) before proceeding with decommissioning.

5. Delete Dataflow jobs:

  • Once the Dataflow jobs have been stopped and all data has been processed, the Dataflow jobs can be deleted using the Google Cloud Console, gcloud command, or the Dataflow API.
gcloud dataflow jobs delete JOB_ID

Decommissioning of on-prem Kafka tables at BigQuery:

Assumption: That the on-prem data is also being stored on GCP.

The tables containing on-prem Kafka data need to be identified along with its dependencies so as to understand the downstream impact on consumers for decommissioning purposes. Then the whole decommissioning plan has to be circulated to the team/stakeholders. Old tables have to be decommissioned, and users should be repointed to new tables that are getting data from Confluent Kafka.

Updating the consumers to stop consuming data from the deprecated tables and switching to alternative sources, if necessary, should be the final closure step, coupled with monitoring the impact of the changes made to ensure that consumers are functioning correctly with the new configurations.

Best practices

Migrating from an on-premises Kafka cluster to Confluent Kafka, and subsequently ingesting data from Confluent Kafka into BigQuery using Dataflow, involves several steps and best practices to ensure a smooth and successful transition. Below are some best practices to consider during this migration process:

1. Comprehensive planning: Start with a detailed migration plan that outlines the scope, timeline, and resources required for the migration. Identify all Kafka topics, data producers, consumers, and dependencies in the on-premises environment.

2. Confluent Kafka setup: Set up Confluent Kafka clusters following best practices, including proper broker sizing, replication, and partitioning strategies. Ensure adequate monitoring and alerting are in place for Confluent Kafka using tools like Confluent Control Center.

3. Data schema evolution: If the data schemas evolve during migration, use Avro or another schema registry to manage schema changes and ensure compatibility between producers and consumers.

4. Data replication: Use Confluent Replicator or MirrorMaker to replicate data from on-premises Kafka to Confluent Kafka. Configure it to replicate topics incrementally while the migration is ongoing. Monitor replication lag to ensure data consistency between the two clusters.

5. Kafka Optimization Theorem: The main actors involved in an event flow as a producer, a consumer, and a topic are defined as the opposing goals to optimize for: throughput versus latency and durability versus availability. Given these parameters, the Kafka optimization theorem states that any Kafka data flow makes the tradeoffs shown in Figure 2 in throughput versus latency and durability versus availability.

Figure 3. Kafka performance involves two orthogonal axes: Availability versus durability and latency versus throughput.

For more information refer here: https://developers.redhat.com/articles/2022/05/03/fine-tune-kafka-performance-kafka-optimization-theorem#

6. Data validation: Implement data validation checks during replication to ensure data integrity between the source and destination clusters.

7. BigQuery setup: Set up and configure BigQuery datasets and tables to receive the Kafka data. Consider partitioning and clustering tables for better query performance and cost optimization in BigQuery.

8. Dataflow configuration: Develop Dataflow pipelines to ingest data from Confluent Kafka topics into BigQuery. Consider using the KafkaIO connector for Dataflow. Implement error handling and retry mechanisms in Dataflow to handle transient issues during data ingestion. Optimize your Dataflow pipeline for performance by adjusting resources, windowing strategies, and parallelism to match the data volume and velocity. Monitor Dataflow job metrics and adjust as needed to maintain performance. If necessary, perform data transformation within Dataflow before ingesting data into BigQuery. This can include data enrichment, aggregation, or schema transformations (auto scaling to add).

9. Monitoring and alerts: Set up monitoring and alerting for your Dataflow jobs using Grafana and Cloud Logging to ensure you are promptly alerted to any issues.

10. Testing and validation: Thoroughly test the migration process in a staging environment to identify and address any issues before performing the actual migration.

11. Post-migration validation: After migration, validate the data in BigQuery against the source Kafka data to confirm data integrity.

12. Rollback plan: Develop a rollback plan in case any issues arise during migration. This plan should include procedures for reverting consumers and data replication.

13. Documentation: Document the entire migration process, including configurations, dependencies, and best practices. This documentation will be invaluable for future reference and troubleshooting.

By following these best practices and taking a systematic approach, the risks and challenges associated with migrating from an on-premises Kafka cluster to Confluent Kafka could be minimized, and successful ingestion of data into BigQuery using Dataflow can be performed. Regular monitoring and optimization of the environment post-migration can ensure continued performance and reliability.

Prospects for refinement

There exists possible future iterations to fully adapt the GCP ecosystem by leveraging Kafka to pub/sub connectors to make data sources available to different GCP services.

Pub/Sub is a managed service for sending and receiving messages asynchronously. As with Kafka, you can use Pub/Sub to communicate between components in your cloud architecture. The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:

  • The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub.
  • The source connector reads messages from a Pub/Sub topic and publishes them to Kafka.

For more information on this, refer here:https://cloud.google.com/pubsub/docs/connect_kafka

About Shreya Kumari

Shreya Kumari, a Data Engineer at SADA, specializes in crafting robust data infrastructure through best practices. She is proficient in designing and implementing data solutions that empower businesses to make informed decisions.

--

--

Global business and cloud consulting firm | Helping CIOs and #IT leaders transform in the #cloud| 3-time #GoogleCloud Partner of the Year.