Skip to content

Commit

Permalink
Merge pull request #30 from decodableco/postgres-to-snowflake-with-cdc
Browse files Browse the repository at this point in the history
postgres to snowflake with cdc
  • Loading branch information
rmoff authored Nov 12, 2024
2 parents 4345a7c + 2747548 commit fdcc438
Show file tree
Hide file tree
Showing 10 changed files with 787 additions and 4 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ _Learn more [here](https://decodable.co), and [sign up for a free trial](https:/

### Data Pipelines

| Example | Description |
|-----------------------------------------------------|--------------------------------------------------------|
| [Opinionated Data Pipelines](opinionated-pipelines) | Building data pipelines with schema on write streams. |
| [Postman](postman) | Building data pipelines with Postman. |
| Example | Description |
|-----------------------------------------------------|----------------------------------------------------------|
| [Opinionated Data Pipelines](opinionated-pipelines) | Building data pipelines with schema on write streams. |
| [Postman](postman) | Building data pipelines with Postman. |
| [Postgres to Snowflake](postgres-to-snowflake-with-cdc) | Getting data from Postgres to Snowflake using Decodable. |

### PyFlink

Expand Down
170 changes: 170 additions & 0 deletions postgres-to-snowflake-with-cdc/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
= Postgres CDC to Snowflake

This is supporting code for a blog about streaming data from Postgres to Snowflake using Decodable.

It includes a Docker Compose for running Postgres locally, accessible from the internet using ngrok

To use this you need to https://dashboard.ngrok.com/signup[create an ngrok account] and add a file called `.env` in this folder with the following entry:

[source,bash]
----
NGROK_AUTH_TOKEN=<your_token>
----

Bring up the Postgres and ngrok stack with

[source,bash]
----
docker compose up
----

Once up, you can find out your Postgres server host/post that is available on the internet:

[source,bash]
----
curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g'
----

== Source connection: Postgres CDC

=== Configure Postgres tables for replication

[source,bash]
----
docker exec -it postgres psql -h localhost -U postgres -d postgres -f /data/replication-config.sql
----

[source,sql]
----
ALTER TABLE customers REPLICA IDENTITY FULL;
ALTER TABLE pets REPLICA IDENTITY FULL;
ALTER TABLE appointments REPLICA IDENTITY FULL;
----

Check their replica status; each should show `f`:

[source,sql]
----
SELECT oid::regclass, relreplident FROM pg_class
WHERE oid in ( 'customers'::regclass, 'pets'::regclass, 'appointments'::regclass);
----

=== Store the password

Ref:

* https://docs.decodable.co/declarative/apply.html
* https://docs.decodable.co/administer/manage-secrets.html

[source,bash]
----
decodable apply decodable/pg-secret.yaml
----

[source,yaml]
----
---
kind: secret
name: omd-pg
id: ee94bd72
result: updated
• Wrote plaintext values for secret IDs: [ee94bd72]
----

=== Generate resource definitions

Ref: https://docs.decodable.co/declarative/scan.html

[source,bash]
----
decodable connection scan \
--name oh-my-dawg-pg \
--connector postgres-cdc \
--type source \
--prop hostname=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f1) \
--prop port=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f2) \
--prop database-name=postgres \
--prop username=postgres \
--prop password=$(decodable query --name omd-pg --keep-ids | yq '.metadata.id') \
--include-pattern schema-name=public \
--output-resource-name-template stream-name="omd-{table-name}" \
> omd-pg.yaml
----

=== Apply resource definitions

[source,bash]
----
decodable apply omd-pg.yaml
----

[source,yaml]
----
---
kind: connection
name: oh-my-dawg-pg
id: 6d02ba15
result: created
---
kind: stream
name: omd-appointments
id: 975bba8d
result: created
---
kind: stream
name: omd-customers
id: 7885b32e
result: created
---
kind: stream
name: omd-pets
id: 3cc8e060
result: created
----

=== Activate the Postgres connection

[source,bash]
----
decodable query --name oh-my-dawg-pg -X activate --stabilize
----

== Sink connection: Snowflake

=== Provision Snowflake resources

Ref:

* https://docs.decodable.co/connect/sink/snowflake.html
* https://docs.snowflake.com/en/user-guide/key-pair-auth#verify-the-user-s-public-key-fingerprint

=== Generate & create the Snowflake sink

_You can pipe from one command to another to streamline the process._

[source,bash]
----
decodable connection scan \
--name oh-my-dawg-snowflake \
--connector snowflake \
--type sink \
--prop snowflake.database=omd \
--prop snowflake.schema=omd \
--prop snowflake.user=decodable \
--prop snowflake.private-key=$(decodable query --name omd-snowflake --keep-ids | yq '.metadata.id') \
--prop snowflake.role=load_data \
--prop snowflake.account-name=<org>-<account> \
--prop snowflake.warehouse=stg \
--prop snowflake.merge-interval="1 minute" \
--include-pattern stream-name='^omd-' \
| decodable apply -
----

NOTE: If you encounter errors then remove the pipe and `decodable apply` and inspect the YAML generated by `decodable connection scan` first.

=== Activate the Snowflake connection

[source,bash]
----
decodable query --name oh-my-dawg-snowflake -X activate --stabilize
----
7 changes: 7 additions & 0 deletions postgres-to-snowflake-with-cdc/decodable/pg-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
kind: secret
metadata:
name: omd-pg
spec_version: v1
spec:
value_literal: Welcome123
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
---
kind: connection
metadata:
name: oh-my-dawg-pg
description: ""
spec_version: v2
spec:
execution:
active: true
connector: postgres-cdc
properties:
database-name: postgres
hostname: <hostname>
port: <port>
username: postgres
password: omd-pg
stream_mappings:
- stream_name: omd-appointments
external_resource_specifier:
database-name: postgres
schema-name: public
table-name: appointments
- stream_name: omd-customers
external_resource_specifier:
database-name: postgres
schema-name: public
table-name: customers
- stream_name: omd-pets
external_resource_specifier:
database-name: postgres
schema-name: public
table-name: pets
type: source
---
kind: stream
metadata:
name: omd-appointments
spec_version: v1
spec:
schema_v2:
constraints:
primary_key:
- appointment_id
fields:
- kind: physical
name: appointment_id
type: STRING NOT NULL
- kind: physical
name: duration_minutes
type: BIGINT
- kind: physical
name: appointment_time
type: STRING
- kind: physical
name: status
type: STRING
- kind: physical
name: pet_id
type: STRING
- kind: physical
name: customer_id
type: STRING
- kind: physical
name: service_type
type: STRING
- kind: physical
name: price
type: DOUBLE
- kind: physical
name: last_updated
type: STRING
---
kind: stream
metadata:
name: omd-customers
spec_version: v1
spec:
schema_v2:
constraints:
primary_key:
- customer_id
fields:
- kind: physical
name: customer_id
type: STRING NOT NULL
- kind: physical
name: first_name
type: STRING
- kind: physical
name: last_name
type: STRING
- kind: physical
name: phone
type: STRING
- kind: physical
name: email
type: STRING
- kind: physical
name: registered_date
type: STRING
---
kind: stream
metadata:
name: omd-pets
spec_version: v1
spec:
schema_v2:
constraints:
primary_key:
- pet_id
fields:
- kind: physical
name: pet_id
type: STRING NOT NULL
- kind: physical
name: customer_id
type: STRING
- kind: physical
name: pet_name
type: STRING
- kind: physical
name: pet_type
type: STRING
Loading

0 comments on commit fdcc438

Please sign in to comment.