DEV Community

GBASE Database
GBASE Database

Posted on

Using GBase 8a Kafka Consumer for Data Synchronization

Configuration of Data Synchronization

Parameter Configuration

To use the Kafka consumer, configure the parameters as follows. Refer to the supplementary notes for details on adjustable parameters.

GCluster Configuration

Edit the file: /opt/gcluster/config/gbase_8a_gcluster.cnf

_gbase_transaction_disable=1 (Ensure this is not set to 0)
gcluster_lock_level=10 (Not recommended to use 2)
_gcluster_insert_cache_buffer_flag=1
gcluster_assign_kafka_topic_period=20
gcluster_kafka_max_message_size=1000000
gcluster_kafka_batch_commit_dml_count=100000
gcluster_kafka_local_queue_size=210000
gcluster_kafka_consume_batch=100
gcluster_kafka_user_allowed_max_latency=15
Enter fullscreen mode Exit fullscreen mode

Adjustable Parameters:

  • gcluster_assign_kafka_topic_period: Time period in seconds for automatic consumer takeover. Min: 20s, Max: 120s.
  • gcluster_kafka_max_message_size: Maximum message size in bytes from Kafka topic. Must be >= Kafka server's message.max.bytes.
  • gcluster_kafka_batch_commit_dml_count: Number of DML operations per commit. Adjust based on table count and performance.
  • gcluster_kafka_user_allowed_max_latency: Maximum latency in milliseconds for message caching in the 8a cluster layer.
  • gcluster_kafka_local_queue_size: Length of the DML operation queue. Recommended to be more than double gcluster_kafka_batch_commit_dml_count.
  • gcluster_kafka_consume_batch: Number of Kafka messages consumed at once. Recommended range: 10-1000.

GNode Configuration

Edit the file: /opt/gnode/config/gbase_8a_gbase.cnf

_gbase_transaction_disable=1 (Ensure this is not set to 0)
gbase_tx_log_mode=ONLY_SPECIFY_USE (Do not use USE,STANDARD_TRANS)
gbase_buffer_insert=1024M
gbase_tx_log_flush_time=5
Enter fullscreen mode Exit fullscreen mode

Adjustable Parameters:

  • gbase_buffer_insert: Size of the insert buffer. Adjust based on data volume and consumer tasks.
  • gbase_tx_log_flush_time: Frequency in seconds for flushing in-memory data. Recommended: 5 seconds.

Note: Ensure all nodes have consistent configurations to avoid unknown errors.

Commands for Starting and Stopping

Execute the following commands on any Coordinator node using gccli.

Creating a Consumer Task

CREATE KAFKA CONSUMER <consumer_name> TRANSACTION TOPIC <kafka_topic_name> BROKERS 'ip:port, ip:port,…';
Enter fullscreen mode Exit fullscreen mode
  • consumer_name: Unique name for the consumer task.
  • kafka_topic_name: Name of the Kafka topic to consume.
  • ip:port: Kafka broker's IP and port.

Example:

CREATE KAFKA CONSUMER test1 TRANSACTION TOPIC topic_1 BROKERS '10.10.10.10:9092,10.10.10.11:9092';
Enter fullscreen mode Exit fullscreen mode

Deleting a Consumer Task

DROP KAFKA CONSUMER <consumer_name>;
Enter fullscreen mode Exit fullscreen mode

Ensure the consumer task is stopped before deletion.

Viewing Consumer Task Properties

SHOW KAFKA CONSUMER <consumer_name>;
SHOW TRANSACTION CONSUMER;
Enter fullscreen mode Exit fullscreen mode

Starting a Consumer Task

START KAFKA CONSUMER <consumer_name>;
Enter fullscreen mode Exit fullscreen mode

Stopping a Consumer Task

STOP KAFKA CONSUMER <consumer_name>;
Enter fullscreen mode Exit fullscreen mode

Status Query

SELECT * FROM information_schema.kafka_consumer_status;
Enter fullscreen mode Exit fullscreen mode

Fields:

  • Consumer: Name of the consumer.
  • IP: Node IP where the consumer task is running.
  • Topic: Kafka topic name.
  • Status: Start or stop state.
  • Min_offset: Minimum offset in the Kafka queue.
  • Max_offset: Maximum offset in the Kafka queue.
  • Cur_offset: Current offset being processed by 8a.
  • Process_offset: Offset of the message currently being synchronized.
  • Commit_offset: Offset of the last committed message.
  • Exception: Description of the last error encountered.

Handling Exceptions in Consumer Tasks

JSON Parsing Errors

If a JSON message parsing error occurs, the consumer task will enter a sleep state. Check the exception field for details. Use Kafka's consumer tool to inspect the problematic message:

$kafka_home/bin/kafka-simple-consumer-shell.sh --broker-list 192.168.103.74:9092 --topic test3 --offset 797 --max-messages 1 --partition 0
Enter fullscreen mode Exit fullscreen mode

Cluster Lock Failure or Node Offline

If cluster lock failure or node offline occurs, the consumer task will exit and wait for task scheduler reassignment. Check the status for status=waiting start, IP=unknown.

GBase Single Node Execution Error

If an error occurs during single node execution, the consumer task will enter a sleep state. Address the issue and restart the consumer task.

Common Exception Descriptions

  • JSON Parsing Issues: Indicates a malformed JSON message.
  • Missing Primary Key: Indicates a missing primary key in the JSON message.
  • Invalid Primary Key: Indicates an invalid primary key in the JSON message.
  • Target Table Does Not Exist: Indicates the target table is missing in GBase 8a.
  • Table Definition Mismatch: Indicates a mismatch in table definitions.
  • Primary Key Changed: Indicates a change in the primary key.
  • Data Insertion Error: Indicates an error during data insertion, such as data truncation.

Note: Always ensure the JSON messages and table definitions are correct to avoid synchronization issues.

Top comments (0)