Here are some scenarios in which you might use the Pub/Sub Group Kafka Connector:
- You are migrating a Kafka-based architecture to Google Cloud.
- You have a frontend system that stores events in Kafka outside of Google Cloud, but you also use Google Cloud to run some of your backend services, which need to receive the Kafka events.
- You collect logs from an on-premises Kafka solution and send them to Google Cloud for data analytics.
- You have a frontend system that uses Google Cloud, but you also store data on-premises using Kafka.
As with Kafka, you can use Pub/Sub to communicate between components in your cloud architecture.
The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:
- The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub.
- The source connector reads messages from a Pub/Sub topic and publishes them to Kafka.
This document we are going to walk through how we can set up sink connector
More information
This section walks you through the following tasks:
- Configure the Pub/Sub Group Kafka Connector.
- Send events from Kafka to Pub/Sub.
Authenticate
The Pub/Sub Group Kafka Connector must authenticate with Pub/Sub in order to send Pub/Sub messages. To set up authentication, perform the following steps:
Grant roles to your Google Service Account, IAM roles: roles/pubsub.admin
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
cd java-pubsub-group-kafka-connector
Download the connector JAR
cp config/* [path to Kafka installation]/config/
Update your Kafka Connect configuration
- Navigate to your Kafka directory.
- Open the file named config/connect-standalone.properties in a text editor.
- If the plugin.path property is commented out, uncomment it.
- Update the plugin.path property to include the path to the connector JAR. Example:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Set the offset.storage.file.filename property to a local file name. In standalone mode, Kafka uses this file to store offset data.
Example:
offset.storage.file.filename=/tmp/connect.offsets
Forward events from Kafka to Pub/Sub
- Open the file /config/cps-sink-connector.properties in a text editor. Add values for the following properties, which are marked "TODO" in the comments:
- topics=KAFKA_TOPICS
- cps.project=PROJECT_ID
- cps.topic=PUBSUB_TOPIC
- gcp.credentials.file.path=PATH
- gcp.credentials.json = JSON_FILE
- Replace the following:
- KAFKA_TOPICS: A comma-separated list of Kafka topics to read from.
- PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
- SUB_TOPIC: The Pub/Sub topic to receive the messages from Kafka
- PATH String Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub (DOC says pub/sub lite but I guess should work on pub/sub also)
- JSON_FILE Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub (DOC says pub/sub lite but I guess should work on pub/sub also)
More info about settings
File: cps-sink-connector.properties
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unique name for the Pub/Sub sink connector.
name=CPSSinkConnector
# Tha Java class for the Pub/Sub sink connector.
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
# The maximum number of tasks that should be created for this connector.
tasks.max=10
# Set the key converter for the Pub/Sub sink connector.
key.converter=org.apache.kafka.connect.storage.StringConverter
# Set the value converter for the Pub/Sub sink connector.
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# A comma-seperated list of Kafka topics to use as input for the connector.
# TODO (developer): update to your Kafka topic name(s).
topics=scotia-wealth
cps.project=PROJECT-ID
# TODO (developer): update to your Pub/Sub topic ID, e.g.
# where data should be written.
cps.topic=kafka-topic-consumer
# Optional. A JSON file path and JSON blob that contains Google Cloud for authenticating Pub/Sub Lite.
gcp.credentials.file.path=PATH
gcp.credentials.json=JSON_FILE
- From the Kafka directory, run the following command:
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
- Follow the steps in the Apache Kafka quickstart to write some events to your Kafka topic.
- Use the gcloud CLI to read the events from Pub/Sub.
- gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
more info about kafka
GCP repo :https://github.com/googleapis/java-pubsub-group-kafka-connector
Ok, we have tested locally. Now it is a time to containerize and deploy the Kafka connector on GKE
Deploy kafka connector on GKE as following
- Build the image from the following based image
[https://github.com/bitnami/containers/tree/main/bitnami/kafka](https://github.com/bitnami/containers/tree/main/bitnami/kafka)
Built a custom Image with connector :- Pub/Sub Group Kafka Connector .Jar
example dockerfile :
FROM bitnami/kafka:3.4.0-debian-11-r15 AS build-stage
COPY YOUR_pubsubKafkaConnector_FOLDER /opt/bitnami/kafka/
COPY setup /opt/bitnami/kafka/
Create a pubsub
Topic
SubscriptionDeploy the helm chart :
Installing the Chart
Build the image from Dockerfile
## Prerequisites
https://cloud.google.com/artifact-registry/docs/docker/pushing-and-pulling
docker build -t LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY/IMAGE:TAG .
docker push LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY/IMAGE:TAG
## Helm installs the Kafka instance with a pub/sub connector from a custom image (a custom image that we have built previously)
helm repo add bitnami https://charts.bitnami.com/bitnami
change the docker image in kafka chart (kafka/value.yaml)
## Bitnami Kafka image version
## ref: https://hub.docker.com/r/bitnami/kafka/tags/
## @param image.registry Kafka image registry
## @param image.repository Kafka image repository
## @param image.tag Kafka image tag (immutable tags are recommended)
## @param image.digest Kafka image digest in the way sha256:aa.... Please note this parameter, if set, will override the tag
## @param image.pullPolicy Kafka image pull policy
## @param image.pullSecrets Specify docker-registry secret names as an array
## @param image.debug Specify if debug values should be set
##
image:
registry: LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY
repository: IMAGE
tag: TAG
digest: ""
Your GKE cluster looks like After deploying the helm chart
1. Statefulset
Kafka instance
Kafka zookeeper
2. K8s service
Exposed GKE service to publish and consume (kafka topic and messages)
-
impersonate GCP Service account via workload identity
Service account which has permissions to publish message (k8s
SA will impersonate GCP SA via workload identity ) more info
following example:
## Workload identity to impersonate gcp SA
gcloud iam service-accounts add-iam-policy-binding $GSA \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:$PROJECT_ID.svc.id.goog[kafka/kafka]"
## Annotate k8s serviceaccount kafka
kubectl annotate serviceaccount kafka \
--namespace kafka \
iam.gke.io/gcp-service-account=$GSA
- Create the ConfigMap from previously downloaded config map files
- Following the example, we have created a folder called config and saved config files
kubectl create configmap pubsub-connector --from-file=config/ -n kafka
Top comments (0)