TL;DR:
To cancel Incremental Snapshot you could push manually combined message to Kafka Connect internal ...-offsets topic with value.incremental_snapshot_primary_key equal to value.incremental_snapshot_maximum_key from latest "offset" messages
Long story:
Sometimes you might need to make a snapshot of some already tracked tables once again and Debezium has Incremental Snapshots feature exactly for that purpose. You could send a "signal" (write a new row into signal DB table) which instructs Debezium to re-read some table. But what if you want to cancel already running Incremental Snapshot?
We faced situation when Incremental Snapshot on some huge table was started but additional conditions were not applied! So instead of re-read 30k rows Debezium started to read all the 20 million records. We won't that much data to be produced because it will flood the data topic and latest changes (that we were need to be snapshotted) won't be pushed for hours. So we need to stop this snapshot.
As I found - Debezium had no ability to stop already running snapshot by some sort of signal. Kafka Connect restarts also don't have any affect to snapshot process - it continues from last processed offset. So I dig into internal Kafka Connect topics, especially in "...-offsets" one, and here it is: Debezium stored it's own running snapshot offsets here. Example message for running snapshot:
{
"key":[
"dbz_prod",
{
"server":"mssql"
}
],
"value":{
"transaction_id":null,
"event_serial_no":2,
"incremental_snapshot_maximum_key":"6e2166716a6c4b5310027575000decac616e672e4f626a6563743b90ce589f1073296c020000787000000001737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700142f017",
"commit_lsn":"0006593e:000287c8:0003",
"change_lsn":"0006593e:000287c8:0002",
"incremental_snapshot_collections":"prod.dbo.InvoiceLines",
"incremental_snapshot_primary_key":"6e2166716a6c4b5310027575000decac616e672e4f626a6563743b90ce589f1073296c020000787000000001737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000016862"
},
"headers":[],
"exceededFields":null
}
Here we see 2 valuable keys:
- incremental_snapshot_maximum_key
- incremental_snapshot_primary_key
Seems like snapshot will be stopped when current snapshot offset (incremental_snapshot_primary_key) become equal to maximum primary key (incremental_snapshot_maximum_key, which table contained when snapshot was started). You can see that these keys differ just in last 7 chars. And these last chars are hexadecimal values for offset and max primary key (142f017 to decimal eq 21,164,055).
So I tried to push same message to ...-offsets topic with incremental_snapshot_primary_key equal incremental_snapshot_maximum_key. And it worked for me - snapshot were marked as "finished" and data flood was stopped.
"Finished" message:
{
"key":[
"dbz_prod",
{
"server":"mssql"
}
],
"value":{
"transaction_id":null,
"event_serial_no":2,
"incremental_snapshot_maximum_key":"6e2166716a6c4b5310027575000decac616e672e4f626a6563743b90ce589f1073296c020000787000000001737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700142f017",
"commit_lsn":"0006593e:000287c8:0003",
"change_lsn":"0006593e:000287c8:0002",
"incremental_snapshot_collections":"prod.dbo.InvoiceLines",
"incremental_snapshot_primary_key":"6e2166716a6c4b5310027575000decac616e672e4f626a6563743b90ce589f1073296c020000787000000001737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700142f017"
},
"headers":[],
"exceededFields":null
}
Just in case, I had stopped Kafka Connect before pushing custom "finish" message to topic. I think it was not necessary.
Top comments (0)