This is a brief write up on how to send rows inserted into a CockroachDB table as messages to a SNS topic.
CockroachDB Change Data Capture (CDC) features several sinks, see docs for a full list here, however AWS SNS is not yet one of these.
We can workaround this limitation by leveraging the webhook sink, which publishes to a HTTP endpoint, and API Gateway, which provides such HTTP endpoint and is tightly integrated internally with other AWS services, such as SNS.
At high level, the pipeline works as follows:
- A Changefeed is created on the CockroachDB table.
- The Changefeed watches for INSERTs (any write operation, to be precise) on the table and emits messages to the API Gateway endpoint.
- API Gateway receives the message and publishes the payload to the SNS topic via an internal AWS integration.
- Subscribers of the SNS topic get a notification.
We will create this pipeline backwards, starting from creating the SNS topic back to configuring the Changefeed.
Create SNS topic
This is the easiest step of them all, thanks to the AWS Console for making it so easy.
- Open the AWS Console and head to Simple Notification Service.
- You'll be welcomed to a page where you are prompted to create a new topic.
- Create a topic named "events". Keep all the default settings; for this test we won't bother with certs and keys, we want to make sure the pipeline works first.
- Create a subscriber. In my example, I choose "Protocol" to be "Email", and entered my email address as the "Endpoint".
- Finally, very important, test it out directly from the Console by using the Publish message button.
- Check your inbox, you should have received a request to verify your email address, and the message you published.
Jot down your topic ARN, in my case is arn:aws:sns:ca-central-1:3xxx8:events
Good, you have created the topic and validated it's working.
Create AWS Role
A prerequisite for the API Gateway is to have a AWS Role with privileges to Publish to the SNS topic we just created.
- Go to AWS IAM.
- Go to Roles and "Create role". Select AWS Service as 'Trusted Entity Type' and "API Gateway" as the 'Use case'.
- Continue to Step 3. Name it "events_role" and create it.
- Your role is now created, but it doesn't have the permission it needs. Find the role in the list of roles and select it.
- In the Permission section, Add Permission and choose "AmazonSNSFullAccess", for simplicity, then save.
Jot down the ARN of the role, in my case it's arn:aws:iam::3xxx8:role/events_role
Create API Gateway endpoint
- From the AWS Console, head to API Gateway.
- Click the Create API button and select a 'REST API'.
- Name it "events", and set 'API Endpoint type' to "Regional".
- Within the "events" API selected, click the Create Resource button and choose
/
as Resource Path andevents
as Resource Name. No need to enable CORS or the Proxy Resource. - Once in the Resource Detail page for the
/events/
path, choose Create Method -
Choose:
Method type =POST
Integration type =AWS service
AWS Region =ca-central-1
, which is the region I've used to deploy the SNS topic
AWS Service =Simple Notification Service
AWS Subdomain = keep it blank
HTTP method =POST
Action Type =Use path override
Path override =arn:aws:apigateway:ca-central-1:sns:path//
<-- substitute with your region
Execution role = the ARN role you jotted down previouslyLeave the rest the default then "Create method"
You should now have something that looks like this:
Now, we need to create the tidbit that takes the incoming HTTP payload and inserts it as to the SNS topic.
- Click the "Integration Request" tab, then Edit
-
Scroll to the "URL request headers parameters" section and add
Name =
Content-Type
Mapped from ='application/x-www-form-urlencoded'
-
Go to "Mapping templates" and set
Content type =
application/json
Template body =
#set($topic="arn:aws:sns:ca-central-1:3xxx8:events") #set($msg=$input.path('$.payload')) Action=Publish&TopicArn=$util.urlEncode($topic)&Message=$util.urlEncode($msg)
Substitute my value with your SNS topic ARN
Save, then you should see the below
Finally, click the Test tab, and in the Request Body enter
{"payload":"test_event_api"}
You should get a confirmation that the message was published, and soon you should receive the same SNS email in your inbox.
Wow, it's working, getting there!
Now we need to publish this API, and create a usage plan.
Deploy API
- On the main page for this API, click "Deploy API"
- At the prompt, choose
New stage
and call itDEV
. - Click 'Deploy'
You'll be redirected to the Stages section.
Jot down the Invoke URL, in my case, it's https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV
Create usage plan
- On the main page for this API, click 'Usage Plans'
-
Create a new usage plan. I added these sample values for this functional test
Name = events_plan
Rate = 10
Burst = 10
Requests = 100 per day Click 'Create usage plan'
Re-select the newly create
events_plan
usage plan, and add a Stage-
At the prompt, choose
API =
events
Stage =DEV
Save your changes.
Test locally
Now that the API is live and that we have a billing plan attached, we are ready to test if that can be invoked by an external client.
On your laptop, thus locally, invoke it using curl
$ curl -X POST https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV/events --json '{"payload":"hello_from_curl"}' | jq
{
"PublishResponse": {
"PublishResult": {
"MessageId": "7a3e9555-6031-5d2b-b72a-cad5c8ddf3b4",
"SequenceNumber": null
},
"ResponseMetadata": {
"RequestId": "a63ca446-743e-520b-b689-fbc6dcd2bc94"
}
}
}
And sure enough I got my email with the same text.
We are now ready to have CockroachDB send data to our endpoint.
Create test table and configure the changefeed
Create a simple events
table such as
CREATE TABLE events (
id UUID NOT NULL DEFAULT gen_random_uuid(),
payload JSONB NULL,
ts TIMESTAMPTZ NULL DEFAULT now():::TIMESTAMPTZ,
CONSTRAINT pk PRIMARY KEY (id ASC)
);
Create the changefeed on the newly created events
table.
You will need to update the endpoint hostname with yours.
-- enable rangefeed
SET CLUSTER SETTING kv.rangefeed.enabled = true;
CREATE CHANGEFEED FOR TABLE events INTO 'webhook-https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV/events?insecure_tls_skip_verify=true'
WITH updated, resolved='20s', diff, schema_change_policy=backfill, initial_scan=no,
webhook_sink_config = '{ "Flush": {"Messages": 100, "Frequency": "5s"}, "Retry": { "Max": 4, "Backoff": "10s"} }'
;
-- job_id
-- -----------------------
-- 1010627173653250049
Check that the changefeed is running and posting a resolved
timestamp
select running_status from [show changefeed jobs where job_id 1010627173653250049;
-- running_status
-- --------------------------------------------
-- running: resolved=1728489668.321914000,0
Good, it's running so we're finally ready to test by inserting a row into the table
insert into events (payload) values ('{"my_key" : "my_value"}');
Check your inbox:
Top comments (0)