Configuration of Data Synchronization
Parameter Configuration
To use the Kafka consumer, configure the parameters as follows. Refer to the supplementary notes for details on adjustable parameters.
GCluster Configuration
Edit the file: /opt/gcluster/config/gbase_8a_gcluster.cnf
_gbase_transaction_disable=1 (Ensure this is not set to 0)
gcluster_lock_level=10 (Not recommended to use 2)
_gcluster_insert_cache_buffer_flag=1
gcluster_assign_kafka_topic_period=20
gcluster_kafka_max_message_size=1000000
gcluster_kafka_batch_commit_dml_count=100000
gcluster_kafka_local_queue_size=210000
gcluster_kafka_consume_batch=100
gcluster_kafka_user_allowed_max_latency=15
Adjustable Parameters:
- gcluster_assign_kafka_topic_period: Time period in seconds for automatic consumer takeover. Min: 20s, Max: 120s.
-
gcluster_kafka_max_message_size: Maximum message size in bytes from Kafka topic. Must be >= Kafka server's
message.max.bytes
. - gcluster_kafka_batch_commit_dml_count: Number of DML operations per commit. Adjust based on table count and performance.
- gcluster_kafka_user_allowed_max_latency: Maximum latency in milliseconds for message caching in the 8a cluster layer.
-
gcluster_kafka_local_queue_size: Length of the DML operation queue. Recommended to be more than double
gcluster_kafka_batch_commit_dml_count
. - gcluster_kafka_consume_batch: Number of Kafka messages consumed at once. Recommended range: 10-1000.
GNode Configuration
Edit the file: /opt/gnode/config/gbase_8a_gbase.cnf
_gbase_transaction_disable=1 (Ensure this is not set to 0)
gbase_tx_log_mode=ONLY_SPECIFY_USE (Do not use USE,STANDARD_TRANS)
gbase_buffer_insert=1024M
gbase_tx_log_flush_time=5
Adjustable Parameters:
- gbase_buffer_insert: Size of the insert buffer. Adjust based on data volume and consumer tasks.
- gbase_tx_log_flush_time: Frequency in seconds for flushing in-memory data. Recommended: 5 seconds.
Note: Ensure all nodes have consistent configurations to avoid unknown errors.
Commands for Starting and Stopping
Execute the following commands on any Coordinator node using gccli
.
Creating a Consumer Task
CREATE KAFKA CONSUMER <consumer_name> TRANSACTION TOPIC <kafka_topic_name> BROKERS 'ip:port, ip:port,…';
- consumer_name: Unique name for the consumer task.
- kafka_topic_name: Name of the Kafka topic to consume.
- ip:port: Kafka broker's IP and port.
Example:
CREATE KAFKA CONSUMER test1 TRANSACTION TOPIC topic_1 BROKERS '10.10.10.10:9092,10.10.10.11:9092';
Deleting a Consumer Task
DROP KAFKA CONSUMER <consumer_name>;
Ensure the consumer task is stopped before deletion.
Viewing Consumer Task Properties
SHOW KAFKA CONSUMER <consumer_name>;
SHOW TRANSACTION CONSUMER;
Starting a Consumer Task
START KAFKA CONSUMER <consumer_name>;
Stopping a Consumer Task
STOP KAFKA CONSUMER <consumer_name>;
Status Query
SELECT * FROM information_schema.kafka_consumer_status;
Fields:
- Consumer: Name of the consumer.
- IP: Node IP where the consumer task is running.
- Topic: Kafka topic name.
- Status: Start or stop state.
- Min_offset: Minimum offset in the Kafka queue.
- Max_offset: Maximum offset in the Kafka queue.
- Cur_offset: Current offset being processed by 8a.
- Process_offset: Offset of the message currently being synchronized.
- Commit_offset: Offset of the last committed message.
- Exception: Description of the last error encountered.
Handling Exceptions in Consumer Tasks
JSON Parsing Errors
If a JSON message parsing error occurs, the consumer task will enter a sleep state. Check the exception
field for details. Use Kafka's consumer tool to inspect the problematic message:
$kafka_home/bin/kafka-simple-consumer-shell.sh --broker-list 192.168.103.74:9092 --topic test3 --offset 797 --max-messages 1 --partition 0
Cluster Lock Failure or Node Offline
If cluster lock failure or node offline occurs, the consumer task will exit and wait for task scheduler reassignment. Check the status for status=waiting start, IP=unknown
.
GBase Single Node Execution Error
If an error occurs during single node execution, the consumer task will enter a sleep state. Address the issue and restart the consumer task.
Common Exception Descriptions
- JSON Parsing Issues: Indicates a malformed JSON message.
- Missing Primary Key: Indicates a missing primary key in the JSON message.
- Invalid Primary Key: Indicates an invalid primary key in the JSON message.
- Target Table Does Not Exist: Indicates the target table is missing in GBase 8a.
- Table Definition Mismatch: Indicates a mismatch in table definitions.
- Primary Key Changed: Indicates a change in the primary key.
- Data Insertion Error: Indicates an error during data insertion, such as data truncation.
Note: Always ensure the JSON messages and table definitions are correct to avoid synchronization issues.
Top comments (0)