From a0b03da72a98d635f0fadccf21bb3c279f57eae8 Mon Sep 17 00:00:00 2001 From: rmoff Date: Thu, 31 Oct 2024 13:27:40 +0000 Subject: [PATCH 1/5] WIP --- postgres-to-snowflake-with-cdc/README.adoc | 23 +++ .../docker-compose.yml | 38 ++++ .../shadowtraffic/config.json | 163 ++++++++++++++++++ 3 files changed, 224 insertions(+) create mode 100644 postgres-to-snowflake-with-cdc/README.adoc create mode 100644 postgres-to-snowflake-with-cdc/docker-compose.yml create mode 100644 postgres-to-snowflake-with-cdc/shadowtraffic/config.json diff --git a/postgres-to-snowflake-with-cdc/README.adoc b/postgres-to-snowflake-with-cdc/README.adoc new file mode 100644 index 0000000..af001a9 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/README.adoc @@ -0,0 +1,23 @@ += Docker Compose for running Postgres locally, accessible from the internet using ngrok + +To use this you need to https://dashboard.ngrok.com/signup[create an ngrok account] and add a file called `.env` in this folder with the following entry: + +[source,bash] +---- +NGROK_AUTH_TOKEN= +---- + +Bring up the Postgres and ngrok stack with + +[source,bash] +---- +docker compose up +---- + +Once up, find out your Postgres server host/post that is available on the internet: + +[source,bash] +---- +curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' +---- + diff --git a/postgres-to-snowflake-with-cdc/docker-compose.yml b/postgres-to-snowflake-with-cdc/docker-compose.yml new file mode 100644 index 0000000..4bdf162 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/docker-compose.yml @@ -0,0 +1,38 @@ +--- +services: + ngrok: + image: ngrok/ngrok:latest + container_name: ngrok + # Sign up for an ngrok account at https://dashboard.ngrok.com/signup + # Get your auth-token from https://dashboard.ngrok.com/get-started/your-authtoken + # and either put it directly in the file here, or write it to a .env file in + # the same folder as this Docker Compose file in the form + # NGROK_AUTH_TOKEN=your_token_value + command: tcp postgres:5432 --log stdout --authtoken $NGROK_AUTH_TOKEN + ports: + - 4040:4040 # Web dashboard for ngrok + + postgres: + image: postgres:latest + ports: + - 5432:5432 + container_name: postgres + environment: + POSTGRES_PASSWORD: Welcome123 + + shadowtraffic: + # watch 'docker exec shadowtraffic curl -s localhost:9400/metrics |grep events_sent' + image: shadowtraffic/shadowtraffic:latest + container_name: shadowtraffic + env_file: + - shadowtraffic/license.env + volumes: + - ./shadowtraffic:/data + command: --config /data/config.json --with-studio --sample 1000 --watch + ports: + - 8080:8080 + - 9400:9400 + +networks: + default: + name: zaphod diff --git a/postgres-to-snowflake-with-cdc/shadowtraffic/config.json b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json new file mode 100644 index 0000000..f5b0954 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json @@ -0,0 +1,163 @@ +{ + "generators": [ + { + "table": "customers", + "row": { + "customer_id": { "_gen": "uuid" }, + "first_name": { "_gen": "string", "expr": "#{Name.firstName}" }, + "last_name": { "_gen": "string", "expr": "#{Name.lastName}" }, + "phone": { "_gen": "string", "expr": "#{PhoneNumber.cellPhone}" }, + "email": { "_gen": "string", "expr": "#{Internet.emailAddress}" }, + "registered_date": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + }, + "localConfigs": { "throttleMs": 5000 } + }, + { + "table": "pets", + "fork": { + "key": { + "_gen": "lookup", + "table": "customers", + "path": ["customer_id"] + }, + "stagger": { "ms": 250 } + }, + "row": { + "pet_id": { "_gen": "uuid" }, + "customer_id": { "_gen": "var", "var": "forkKey" }, + "pet_name": { "_gen": "string", "expr": "#{Artist.name}" }, + "pet_type": { "_gen": "oneOf", "choices": ["Dog", "Cat", "Bird", "Reptile"] } + }, + "localConfigs": { + "repeat": { + "rate": 1, + "times": { + "_gen": "weightedOneOf", + "choices": [ + { "weight": 70, "value": 1 }, + { "weight": 20, "value": 2 }, + { "weight": 10, "value": 3 } + ] + } + } + } + }, + { + "table": "appointments", + "fork": { + "key": { + "_gen": "lookup", + "table": "pets", + "path": ["pet_id"] + }, + "stagger": { "ms": 500 } + }, + "stateMachine": { + "_gen": "stateMachine", + "initial": "Scheduled", + "transitions": { + "Scheduled": { + "_gen": "weightedOneOf", + "choices": [ + { "weight": 90, "value": "Completed" }, + { "weight": 10, "value": "Cancelled" } + ] + } + }, + "states": { + "Scheduled": { + "row": { + "status": "Scheduled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + } + }, + "Completed": { + "row": { + "status": "Completed", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + } + }, + "Cancelled": { + "row": { + "status": "Cancelled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + } + } + } + }, + "row": { + "appointment_id": { "_gen": "uuid" }, + "customer_id": { "_gen": "lookup", "table": "pets", "path": ["pet_id"] }, + "pet_id": { "_gen": "lookup", "table": "pets", "path": ["customer_id"] }, + "service_type": { "_gen": "oneOf", "choices": ["Bathing", "Haircut", "Nail Trim", "Full Grooming"] }, + "appointment_time": { + "_gen": "formatDateTime", + "ms": { + "_gen": "intervals", + "intervals": [ + ["0 9 * * 1-5", { "_gen": "add", "args": [{ "_gen": "now" }, { "_gen": "uniformDistribution", "bounds": [1, 14] }] }] + ], + "defaultValue": { "_gen": "now" } + } + }, + "duration_minutes": { + "_gen": "oneOf", + "choices": [30, 60, 90] + }, + "price": { "_gen": "normalDistribution", "mean": 50, "sd": 10, "decimals": 2 }, + "status": "Scheduled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + }, + "localConfigs": { + "repeat": { + "rate": 0.5, + "times": { + "_gen": "uniformDistribution", + "bounds": [ + 1, + 4 + ] + } + }, + "throttleMs": 500 + } + } + ], + "connections": { + "pg": { + "kind": "postgres", + "connectionConfigs": { + "host": "postgres", + "port": 5432, + "username": "postgres", + "password": "Welcome123", + "db": "postgres" + } + } + } +} From 69e9a8aa3dede29cc8952af8f80d2959dcee8b2e Mon Sep 17 00:00:00 2001 From: rmoff Date: Mon, 4 Nov 2024 12:32:05 +0000 Subject: [PATCH 2/5] working ingestion --- postgres-to-snowflake-with-cdc/README.adoc | 140 ++++++ .../decodable/omd-pg.yaml | 135 ++++++ .../decodable/pg-secret.yaml | 7 + .../docker-compose.yml | 18 +- .../postgres/add-pk.sql | 14 + .../postgres/postgresql.conf.sample | 16 + .../postgres/replication-config.sql | 3 + .../shadowtraffic/config.json | 410 +++++++++++------- 8 files changed, 590 insertions(+), 153 deletions(-) create mode 100644 postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml create mode 100644 postgres-to-snowflake-with-cdc/decodable/pg-secret.yaml create mode 100644 postgres-to-snowflake-with-cdc/postgres/add-pk.sql create mode 100644 postgres-to-snowflake-with-cdc/postgres/postgresql.conf.sample create mode 100644 postgres-to-snowflake-with-cdc/postgres/replication-config.sql diff --git a/postgres-to-snowflake-with-cdc/README.adoc b/postgres-to-snowflake-with-cdc/README.adoc index af001a9..c81a30f 100644 --- a/postgres-to-snowflake-with-cdc/README.adoc +++ b/postgres-to-snowflake-with-cdc/README.adoc @@ -21,3 +21,143 @@ Once up, find out your Postgres server host/post that is available on the intern curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' ---- +== Steps + +=== Configure Postgres tables for replication + +[source,bash] +---- +docker exec -it postgres psql -h localhost -U postgres -d postgres -f /data/replication-config.sql +---- + +[source,sql] +---- +ALTER TABLE customers REPLICA IDENTITY FULL; +ALTER TABLE pets REPLICA IDENTITY FULL; +ALTER TABLE appointments REPLICA IDENTITY FULL; +---- + +Check their replica status; each should show `f`: + +[source,sql] +---- +SELECT relreplident FROM pg_class + WHERE oid in ( 'customers'::regclass, 'pets'::regclass, 'appointments'::regclass); +---- + +=== Add PKs to the Postgres tables + +[source,bash] +---- +docker exec -it postgres psql -h localhost -U postgres -d postgres -f /data/add-pk.sql +---- + + +[source,sql] +---- +alter table customers +alter column customer_id set not null; +alter table customers +add constraint pk_customers primary key (customer_id); + +alter table pets +alter column pet_id set not null; +alter table pets +add constraint pk_pets primary key (pet_id); + +alter table appointments +alter column appointment_id set not null; +alter table appointments +add constraint pk_appointments primary key (appointment_id ); +---- + + +=== Store the password + +[source,bash] +---- +decodable apply decodable/pg-secret.yaml +---- + +[source,yaml] +---- +--- +kind: secret +name: omd-pg +id: ee94bd72 +result: updated +• Wrote plaintext values for secret IDs: [ee94bd72] +---- + +=== Generate resource definitions + +[source,bash] +---- +decodable connection scan \ + --name oh-my-dawg-pg \ + --connector postgres-cdc \ + --type source \ + --prop hostname=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f1) \ + --prop port=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f2) \ + --prop database-name=postgres \ + --prop username=postgres \ + --prop password=$(decodable query --name omd-pg --keep-ids | yq '.metadata.id') \ + --include-pattern schema-name=public \ + --output-resource-name-template stream-name="omd-{table-name}" \ + > omd-pg.yaml +---- + +=== Edit resource definitions + + +- set to active + +[source,yaml] +---- +spec_version: v2 +spec: + execution: + active: true +---- + +- add tags + +[source,yaml] +---- +metadata: + tags: + project: "oh-my-dawg" + author: "rmoff" +---- + +=== Apply resource definitions + +[source,bash] +---- +decodable apply omd-pg.yaml +---- + +[source,yaml] +---- +--- +kind: connection +name: oh-my-dawg-pg +id: 6d02ba15 +result: created +--- +kind: stream +name: omd-appointments +id: 975bba8d +result: created +--- +kind: stream +name: omd-customers +id: 7885b32e +result: created +--- +kind: stream +name: omd-pets +id: 3cc8e060 +result: created +---- + diff --git a/postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml b/postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml new file mode 100644 index 0000000..f4c9829 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml @@ -0,0 +1,135 @@ +--- +kind: connection +metadata: + name: oh-my-dawg-pg + description: "" + tags: + project: "oh-my-dawg" + author: "rmoff" +spec_version: v2 +spec: + execution: + active: true + connector: postgres-cdc + properties: + database-name: postgres + hostname: 2.tcp.eu.ngrok.io + password: omd-pg + port: "19366" + username: postgres + stream_mappings: + - stream_name: omd-appointments + external_resource_specifier: + database-name: postgres + schema-name: public + table-name: appointments + - stream_name: omd-customers + external_resource_specifier: + database-name: postgres + schema-name: public + table-name: customers + - stream_name: omd-pets + external_resource_specifier: + database-name: postgres + schema-name: public + table-name: pets + type: source +--- +kind: stream +metadata: + name: omd-appointments + tags: + project: "oh-my-dawg" + author: "rmoff" +spec_version: v1 +spec: + schema_v2: + constraints: + primary_key: + - appointment_id + fields: + - kind: physical + name: appointment_id + type: STRING NOT NULL + - kind: physical + name: duration_minutes + type: BIGINT + - kind: physical + name: appointment_time + type: STRING + - kind: physical + name: status + type: STRING + - kind: physical + name: pet_id + type: STRING + - kind: physical + name: customer_id + type: STRING + - kind: physical + name: service_type + type: STRING + - kind: physical + name: price + type: DOUBLE + - kind: physical + name: last_updated + type: STRING +--- +kind: stream +metadata: + name: omd-customers + tags: + project: "oh-my-dawg" + author: "rmoff" +spec_version: v1 +spec: + schema_v2: + constraints: + primary_key: + - customer_id + fields: + - kind: physical + name: customer_id + type: STRING NOT NULL + - kind: physical + name: first_name + type: STRING + - kind: physical + name: last_name + type: STRING + - kind: physical + name: phone + type: STRING + - kind: physical + name: email + type: STRING + - kind: physical + name: registered_date + type: STRING +--- +kind: stream +metadata: + name: omd-pets + tags: + project: "oh-my-dawg" + author: "rmoff" +spec_version: v1 +spec: + schema_v2: + constraints: + primary_key: + - pet_id + fields: + - kind: physical + name: pet_id + type: STRING NOT NULL + - kind: physical + name: customer_id + type: STRING + - kind: physical + name: pet_name + type: STRING + - kind: physical + name: pet_type + type: STRING diff --git a/postgres-to-snowflake-with-cdc/decodable/pg-secret.yaml b/postgres-to-snowflake-with-cdc/decodable/pg-secret.yaml new file mode 100644 index 0000000..7e1c508 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/decodable/pg-secret.yaml @@ -0,0 +1,7 @@ +--- +kind: secret +metadata: + name: omd-pg +spec_version: v1 +spec: + value_literal: Welcome123 \ No newline at end of file diff --git a/postgres-to-snowflake-with-cdc/docker-compose.yml b/postgres-to-snowflake-with-cdc/docker-compose.yml index 4bdf162..5cc4ea4 100644 --- a/postgres-to-snowflake-with-cdc/docker-compose.yml +++ b/postgres-to-snowflake-with-cdc/docker-compose.yml @@ -13,12 +13,17 @@ services: - 4040:4040 # Web dashboard for ngrok postgres: - image: postgres:latest - ports: - - 5432:5432 container_name: postgres + image: quay.io/debezium/example-postgres:2.3 + ports: + - "5432:5432" environment: - POSTGRES_PASSWORD: Welcome123 + - POSTGRES_DB=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=Welcome123 + volumes: + - ./postgres/postgresql.conf.sample:/usr/share/postgresql/postgresql.conf.sample + - ${PWD}/postgres:/data shadowtraffic: # watch 'docker exec shadowtraffic curl -s localhost:9400/metrics |grep events_sent' @@ -28,9 +33,10 @@ services: - shadowtraffic/license.env volumes: - ./shadowtraffic:/data - command: --config /data/config.json --with-studio --sample 1000 --watch + command: --config /data/config.json +# --with-studio --sample 20 --watch ports: - - 8080:8080 +# - 8080:8080 - 9400:9400 networks: diff --git a/postgres-to-snowflake-with-cdc/postgres/add-pk.sql b/postgres-to-snowflake-with-cdc/postgres/add-pk.sql new file mode 100644 index 0000000..d1b8ff7 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/postgres/add-pk.sql @@ -0,0 +1,14 @@ +alter table customers +alter column customer_id set not null; +alter table customers +add constraint pk_customers primary key (customer_id); + +alter table pets +alter column pet_id set not null; +alter table pets +add constraint pk_pets primary key (pet_id); + +alter table appointments +alter column appointment_id set not null; +alter table appointments +add constraint pk_appointments primary key (appointment_id ); \ No newline at end of file diff --git a/postgres-to-snowflake-with-cdc/postgres/postgresql.conf.sample b/postgres-to-snowflake-with-cdc/postgres/postgresql.conf.sample new file mode 100644 index 0000000..17db3f1 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/postgres/postgresql.conf.sample @@ -0,0 +1,16 @@ +# LOGGING +# log_min_error_statement = fatal +# log_min_messages = DEBUG1 + +# CONNECTION +listen_addresses = '*' + +# MODULES +shared_preload_libraries = 'decoderbufs' + +# REPLICATION +wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart) +max_wal_senders = 10 # max number of walsender processes (change requires restart) +#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables +#wal_sender_timeout = 60s # in milliseconds; 0 disables +max_replication_slots = 10 # max number of replication slots (change requires restart) diff --git a/postgres-to-snowflake-with-cdc/postgres/replication-config.sql b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql new file mode 100644 index 0000000..368f17f --- /dev/null +++ b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql @@ -0,0 +1,3 @@ +ALTER TABLE customers REPLICA IDENTITY FULL; +ALTER TABLE pets REPLICA IDENTITY FULL; +ALTER TABLE appointments REPLICA IDENTITY FULL; \ No newline at end of file diff --git a/postgres-to-snowflake-with-cdc/shadowtraffic/config.json b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json index f5b0954..05c7fe0 100644 --- a/postgres-to-snowflake-with-cdc/shadowtraffic/config.json +++ b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json @@ -1,163 +1,279 @@ { - "generators": [ - { - "table": "customers", - "row": { - "customer_id": { "_gen": "uuid" }, - "first_name": { "_gen": "string", "expr": "#{Name.firstName}" }, - "last_name": { "_gen": "string", "expr": "#{Name.lastName}" }, - "phone": { "_gen": "string", "expr": "#{PhoneNumber.cellPhone}" }, - "email": { "_gen": "string", "expr": "#{Internet.emailAddress}" }, - "registered_date": { - "_gen": "formatDateTime", - "ms": { - "_gen": "now" - } - } + "generators": [ + { + "table": "customers", + "row": { + "customer_id": { + "_gen": "uuid", + "pgHint": "TEXT" + }, + "first_name": { + "_gen": "string", + "expr": "#{Name.firstName}" + }, + "last_name": { + "_gen": "string", + "expr": "#{Name.lastName}" + }, + "phone": { + "_gen": "string", + "expr": "#{PhoneNumber.cellPhone}" + }, + "email": { + "_gen": "string", + "expr": "#{Internet.emailAddress}" + }, + "registered_date": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } + } + }, + "localConfigs": { + "throttleMs": 1000, + "maxEvents": 500 + } + }, + { + "table": "pets", + "fork": { + "key": { + "_gen": "lookup", + "table": "customers", + "path": [ + "row", + "customer_id" + ] + }, + "oneTimeKeys": true + }, + "row": { + "pet_id": { + "_gen": "uuid", + "pgHint": "TEXT" + }, + "customer_id": { + "_gen": "var", + "var": "forkKey" + }, + "pet_name": { + "_gen": "string", + "expr": "#{Artist.name}" + }, + "pet_type": { + "_gen": "oneOf", + "choices": [ + "Dog", + "Cat", + "Bird", + "Reptile" + ] + } + }, + "localConfigs": { + "maxEvents": { + "_gen": "weightedOneOf", + "choices": [ + { + "weight": 70, + "value": 1 }, - "localConfigs": { "throttleMs": 5000 } - }, - { - "table": "pets", - "fork": { - "key": { - "_gen": "lookup", - "table": "customers", - "path": ["customer_id"] - }, - "stagger": { "ms": 250 } + { + "weight": 20, + "value": 2 }, + { + "weight": 10, + "value": 3 + } + ] + } + } + }, + { + "table": "appointments", + "vars": { + "apt_key": { + "_gen": "uuid", + "cast": "string" + } + }, + "fork": { + "key": { + "_gen": "lookup", + "table": "pets", + "path": [ + "row" + ] + }, + "oneTimeKeys": true + }, + "stateMachine": { + "_gen": "stateMachine", + "initial": "Scheduled", + "transitions": { + "Scheduled": { + "_gen": "weightedOneOf", + "choices": [ + { + "weight": 90, + "value": "Completed" + }, + { + "weight": 10, + "value": "Cancelled" + } + ] + } + }, + "states": { + "Scheduled": { "row": { - "pet_id": { "_gen": "uuid" }, - "customer_id": { "_gen": "var", "var": "forkKey" }, - "pet_name": { "_gen": "string", "expr": "#{Artist.name}" }, - "pet_type": { "_gen": "oneOf", "choices": ["Dog", "Cat", "Bird", "Reptile"] } - }, - "localConfigs": { - "repeat": { - "rate": 1, - "times": { - "_gen": "weightedOneOf", - "choices": [ - { "weight": 70, "value": 1 }, - { "weight": 20, "value": 2 }, - { "weight": 10, "value": 3 } - ] - } + "status": "Scheduled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" } + } } - }, - { - "table": "appointments", - "fork": { - "key": { - "_gen": "lookup", - "table": "pets", - "path": ["pet_id"] - }, - "stagger": { "ms": 500 } + }, + "Completed": { + "op": "update", + "where": { + "appointment_id": { + "_gen": "var", + "var": "apt_key" + } }, - "stateMachine": { - "_gen": "stateMachine", - "initial": "Scheduled", - "transitions": { - "Scheduled": { - "_gen": "weightedOneOf", - "choices": [ - { "weight": 90, "value": "Completed" }, - { "weight": 10, "value": "Cancelled" } - ] - } - }, - "states": { - "Scheduled": { - "row": { - "status": "Scheduled", - "last_updated": { - "_gen": "formatDateTime", - "ms": { - "_gen": "now" - } - } - } - }, - "Completed": { - "row": { - "status": "Completed", - "last_updated": { - "_gen": "formatDateTime", - "ms": { - "_gen": "now" - } - } - } - }, - "Cancelled": { - "row": { - "status": "Cancelled", - "last_updated": { - "_gen": "formatDateTime", - "ms": { - "_gen": "now" - } - } - } - } + "row": { + "status": "Completed", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" } + } + } + }, + "Cancelled": { + "op": "update", + "where": { + "appointment_id": { + "_gen": "var", + "var": "apt_key" + } }, "row": { - "appointment_id": { "_gen": "uuid" }, - "customer_id": { "_gen": "lookup", "table": "pets", "path": ["pet_id"] }, - "pet_id": { "_gen": "lookup", "table": "pets", "path": ["customer_id"] }, - "service_type": { "_gen": "oneOf", "choices": ["Bathing", "Haircut", "Nail Trim", "Full Grooming"] }, - "appointment_time": { - "_gen": "formatDateTime", - "ms": { - "_gen": "intervals", - "intervals": [ - ["0 9 * * 1-5", { "_gen": "add", "args": [{ "_gen": "now" }, { "_gen": "uniformDistribution", "bounds": [1, 14] }] }] - ], - "defaultValue": { "_gen": "now" } - } - }, - "duration_minutes": { - "_gen": "oneOf", - "choices": [30, 60, 90] - }, - "price": { "_gen": "normalDistribution", "mean": 50, "sd": 10, "decimals": 2 }, - "status": "Scheduled", - "last_updated": { - "_gen": "formatDateTime", - "ms": { - "_gen": "now" - } + "status": "Cancelled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" } - }, - "localConfigs": { - "repeat": { - "rate": 0.5, - "times": { - "_gen": "uniformDistribution", - "bounds": [ - 1, - 4 - ] - } - }, - "throttleMs": 500 + } } + } } - ], - "connections": { - "pg": { - "kind": "postgres", - "connectionConfigs": { - "host": "postgres", - "port": 5432, - "username": "postgres", - "password": "Welcome123", - "db": "postgres" + }, + "row": { + "appointment_id": { + "_gen": "var", + "pgHint": "TEXT", + "var": "apt_key" + }, + "customer_id": { + "_gen": "var", + "pgHint": "TEXT", + "var": "forkKey", + "path": [ + "customer_id" + ] + }, + "pet_id": { + "_gen": "var", + "pgHint": "TEXT", + "var": "forkKey", + "path": [ + "pet_id" + ] + }, + "service_type": { + "_gen": "oneOf", + "choices": [ + "Bathing", + "Haircut", + "Nail Trim", + "Full Grooming" + ] + }, + "appointment_time": { + "_gen": "formatDateTime", + "ms": { + "_gen": "intervals", + "intervals": [ + [ + "0 9 * * 1-5", + { + "_gen": "add", + "args": [ + { + "_gen": "now" + }, + { + "_gen": "uniformDistribution", + "bounds": [ + 1, + 14 + ] + } + ] + } + ] + ], + "defaultValue": { + "_gen": "now" } + } + }, + "duration_minutes": { + "_gen": "oneOf", + "choices": [ + 30, + 60, + 90 + ] + }, + "price": { + "_gen": "normalDistribution", + "mean": 50, + "sd": 10, + "decimals": 2 + }, + "status": "Scheduled", + "last_updated": { + "_gen": "formatDateTime", + "ms": { + "_gen": "now" + } } + }, + "localConfigs": { + "maxEvents": 2 + } + } + ], + "connections": { + "pg": { + "kind": "postgres", + "connectionConfigs": { + "host": "postgres", + "port": 5432, + "username": "postgres", + "password": "Welcome123", + "db": "postgres" + } } -} + } +} \ No newline at end of file From bb8d5bf1f0d575f576a6c79526b837543c434a77 Mon Sep 17 00:00:00 2001 From: rmoff Date: Mon, 11 Nov 2024 17:51:58 +0000 Subject: [PATCH 3/5] Completed pipeline --- README.md | 9 +- postgres-to-snowflake-with-cdc/README.adoc | 113 ++++++++++-------- .../omd-pg.yaml | 18 +-- .../omd-sf.yaml | 101 ++++++++++++++++ .../decodable/sf-secret.yaml | 34 ++++++ .../docker-compose.yml | 2 +- .../postgres/add-pk.sql | 14 --- .../postgres/replication-config.sql | 9 +- .../shadowtraffic/config.json | 29 ++--- 9 files changed, 225 insertions(+), 104 deletions(-) rename postgres-to-snowflake-with-cdc/decodable/{ => sample_connection_resource_definitions}/omd-pg.yaml (91%) create mode 100644 postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-sf.yaml create mode 100644 postgres-to-snowflake-with-cdc/decodable/sf-secret.yaml delete mode 100644 postgres-to-snowflake-with-cdc/postgres/add-pk.sql diff --git a/README.md b/README.md index dbddc14..15f6fa1 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,11 @@ _Learn more [here](https://decodable.co), and [sign up for a free trial](https:/ ### Data Pipelines -| Example | Description | -|-----------------------------------------------------|--------------------------------------------------------| -| [Opinionated Data Pipelines](opinionated-pipelines) | Building data pipelines with schema on write streams. | -| [Postman](postman) | Building data pipelines with Postman. | +| Example | Description | +|-----------------------------------------------------|----------------------------------------------------------| +| [Opinionated Data Pipelines](opinionated-pipelines) | Building data pipelines with schema on write streams. | +| [Postman](postman) | Building data pipelines with Postman. | +| [Postgres to Snowflake](postgres-to-snowflake-with-cdc) | Getting data from Postgres to Snowflake using Decodable. | ### PyFlink diff --git a/postgres-to-snowflake-with-cdc/README.adoc b/postgres-to-snowflake-with-cdc/README.adoc index c81a30f..36646e2 100644 --- a/postgres-to-snowflake-with-cdc/README.adoc +++ b/postgres-to-snowflake-with-cdc/README.adoc @@ -1,4 +1,8 @@ -= Docker Compose for running Postgres locally, accessible from the internet using ngrok += Postgres CDC to Snowflake + +This is supporting code for a blog about streaming data from Postgres to Snowflake using Decodable. + +It includes a Docker Compose for running Postgres locally, accessible from the internet using ngrok To use this you need to https://dashboard.ngrok.com/signup[create an ngrok account] and add a file called `.env` in this folder with the following entry: @@ -14,14 +18,14 @@ Bring up the Postgres and ngrok stack with docker compose up ---- -Once up, find out your Postgres server host/post that is available on the internet: +Once up, you can find out your Postgres server host/post that is available on the internet: [source,bash] ---- curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' ---- -== Steps +== Source connection: Postgres CDC === Configure Postgres tables for replication @@ -41,38 +45,16 @@ Check their replica status; each should show `f`: [source,sql] ---- -SELECT relreplident FROM pg_class +SELECT oid::regclass, relreplident FROM pg_class WHERE oid in ( 'customers'::regclass, 'pets'::regclass, 'appointments'::regclass); ---- -=== Add PKs to the Postgres tables - -[source,bash] ----- -docker exec -it postgres psql -h localhost -U postgres -d postgres -f /data/add-pk.sql ----- - - -[source,sql] ----- -alter table customers -alter column customer_id set not null; -alter table customers -add constraint pk_customers primary key (customer_id); - -alter table pets -alter column pet_id set not null; -alter table pets -add constraint pk_pets primary key (pet_id); - -alter table appointments -alter column appointment_id set not null; -alter table appointments -add constraint pk_appointments primary key (appointment_id ); ----- +=== Store the password +Ref: -=== Store the password +* https://docs.decodable.co/declarative/apply.html +* https://docs.decodable.co/administer/manage-secrets.html [source,bash] ---- @@ -91,6 +73,8 @@ result: updated === Generate resource definitions +Ref: https://docs.decodable.co/declarative/scan.html + [source,bash] ---- decodable connection scan \ @@ -107,29 +91,6 @@ decodable connection scan \ > omd-pg.yaml ---- -=== Edit resource definitions - - -- set to active - -[source,yaml] ----- -spec_version: v2 -spec: - execution: - active: true ----- - -- add tags - -[source,yaml] ----- -metadata: - tags: - project: "oh-my-dawg" - author: "rmoff" ----- - === Apply resource definitions [source,bash] @@ -161,3 +122,49 @@ id: 3cc8e060 result: created ---- +=== Activate the Postgres connection + +[source,bash] +---- +decodable query --name oh-my-dawg-pg -X activate --stabilize +---- + +== Sink connection: Snowflake + +=== Provision Snowflake resources + +Ref: + +* https://docs.decodable.co/connect/sink/snowflake.html +* https://docs.snowflake.com/en/user-guide/key-pair-auth#verify-the-user-s-public-key-fingerprint + +=== Generate & create the Snowflake sink + +_You can pipe from one command to another to streamline the process._ + +[source,bash] +---- +decodable connection scan \ + --name oh-my-dawg-snowflake \ + --connector snowflake \ + --type sink \ + --prop snowflake.database=omd \ + --prop snowflake.schema=omd \ + --prop snowflake.user=decodable \ + --prop snowflake.private-key=$(decodable query --name omd-snowflake --keep-ids | yq '.metadata.id') \ + --prop snowflake.role=load_data \ + --prop snowflake.account-name=- \ + --prop snowflake.warehouse=stg \ + --prop snowflake.merge-interval="1 minute" \ + --include-pattern stream-name='^omd-' \ + | decodable apply - +---- + +NOTE: If you encounter errors then remove the pipe and `decodable apply` and inspect the YAML generated by `decodable connection scan` first. + +=== Activate the Snowflake connection + +[source,bash] +---- +decodable query --name oh-my-dawg-snowflake -X activate --stabilize +---- diff --git a/postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml b/postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-pg.yaml similarity index 91% rename from postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml rename to postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-pg.yaml index f4c9829..5ae676d 100644 --- a/postgres-to-snowflake-with-cdc/decodable/omd-pg.yaml +++ b/postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-pg.yaml @@ -3,9 +3,6 @@ kind: connection metadata: name: oh-my-dawg-pg description: "" - tags: - project: "oh-my-dawg" - author: "rmoff" spec_version: v2 spec: execution: @@ -13,10 +10,10 @@ spec: connector: postgres-cdc properties: database-name: postgres - hostname: 2.tcp.eu.ngrok.io - password: omd-pg - port: "19366" + hostname: + port: username: postgres + password: omd-pg stream_mappings: - stream_name: omd-appointments external_resource_specifier: @@ -38,9 +35,6 @@ spec: kind: stream metadata: name: omd-appointments - tags: - project: "oh-my-dawg" - author: "rmoff" spec_version: v1 spec: schema_v2: @@ -79,9 +73,6 @@ spec: kind: stream metadata: name: omd-customers - tags: - project: "oh-my-dawg" - author: "rmoff" spec_version: v1 spec: schema_v2: @@ -111,9 +102,6 @@ spec: kind: stream metadata: name: omd-pets - tags: - project: "oh-my-dawg" - author: "rmoff" spec_version: v1 spec: schema_v2: diff --git a/postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-sf.yaml b/postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-sf.yaml new file mode 100644 index 0000000..9653d71 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/decodable/sample_connection_resource_definitions/omd-sf.yaml @@ -0,0 +1,101 @@ +--- +kind: connection +metadata: + name: oh-my-dawg-snowflake + description: "" +spec_version: v2 +spec: + execution: + active: true + connector: snowflake + properties: + snowflake.account-name: - + snowflake.database: omd + snowflake.private-key: omd-snowflake + snowflake.role: load_data + snowflake.schema: omd + snowflake.user: decodable + snowflake.warehouse: stg + snowflake.merge-interval: 1 minute + stream_mappings: + - stream_name: omd-appointments + external_resource_specifier: + snowflake.table: omd-appointments + - stream_name: omd-pets + external_resource_specifier: + snowflake.table: omd-pets + - stream_name: omd-customers + external_resource_specifier: + snowflake.table: omd-customers + type: sink +--- +kind: external-resource +spec_version: v1 +spec: + connection: oh-my-dawg-snowflake + external_resource_specifier: + snowflake.table: omd-appointments + schema: + - name: appointment_id + type: VARCHAR NOT NULL + - name: duration_minutes + type: BIGINT + - name: appointment_time + type: VARCHAR + - name: status + type: VARCHAR + - name: pet_id + type: VARCHAR + - name: customer_id + type: VARCHAR + - name: service_type + type: VARCHAR + - name: price + type: DOUBLE + - name: last_updated + type: VARCHAR + configuration: + primary_key: + - appointment_id +--- +kind: external-resource +spec_version: v1 +spec: + connection: oh-my-dawg-snowflake + external_resource_specifier: + snowflake.table: omd-customers + schema: + - name: customer_id + type: VARCHAR NOT NULL + - name: first_name + type: VARCHAR + - name: last_name + type: VARCHAR + - name: phone + type: VARCHAR + - name: email + type: VARCHAR + - name: registered_date + type: VARCHAR + configuration: + primary_key: + - customer_id +--- +kind: external-resource +spec_version: v1 +spec: + connection: oh-my-dawg-snowflake + external_resource_specifier: + snowflake.table: omd-pets + schema: + - name: pet_id + type: VARCHAR NOT NULL + - name: customer_id + type: VARCHAR + - name: pet_name + type: VARCHAR + - name: pet_type + type: VARCHAR + configuration: + primary_key: + - pet_id diff --git a/postgres-to-snowflake-with-cdc/decodable/sf-secret.yaml b/postgres-to-snowflake-with-cdc/decodable/sf-secret.yaml new file mode 100644 index 0000000..3095065 --- /dev/null +++ b/postgres-to-snowflake-with-cdc/decodable/sf-secret.yaml @@ -0,0 +1,34 @@ +--- +kind: secret +metadata: + name: omd-snowflake +spec_version: v1 +spec: +# Don't work, this is a throwaway private key 😅 + value_literal: | + MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCRxsQUE8CdKQXu + Xz0Hcwjej5JYWAvgS68WsE/P1pwRjhE8bba5w3LmD1ijMlBShh3RvcM2gT9Cj3Lx + 1X8D9pcdcOlwdOT3alEzXZjQlV7C4HC18xkEiEgNMjvWvP312af+Syq09u1UL4Hy + zl2TSI2XBTxttRmN6lY07Uk9ZHlGDjcL7+1ALg0Cq14PKFPNMnibKJ5cOWrtwZad + +HF6xTjyZRBDUpH72irHWIvwouLIM0ANhP6qTTaL5uxiSBxNyVcVKsgmaO2w1D+U + njHGL9lR55HPm1OoG6HOgI4KYDZ5xnVCQ0KCtiBHrrgtFOK00Gh31tBITxvlS7qs + kOGw1hz/AgMBAAECggEAAgJAr0KK0j+ZK6AK81oHXCj5uP27NpCK1AYcjqS9vgb8 + pgRtbHrmgu56mxxO4gw9R44hxtPNUnZilmi9DHA7kQ+X9lF0dEcpR+Sfl5EI+MHm + zul2LBgd3tyupjaXGbbk0aRchswRfvb9B4kGquICXXt+sGGHl1G/qJDahDePU9x9 + KE2WiMvXqbsdcD3U2RgFB8rSVQwx7vxLFH8islezyM3U7Q/G8LhWXtSEDoBR7vWD + uRpQ5+wqgBx8VdxJNt6dBYq5Umjag4FGnNTAEIJLqSfSHalofGMA/WjLjcb/YXth + o/Eyc+9U4+9rw90diM+tcBl3rr3UAHcLzC5WNoeVtQKBgQDIysJ3tcXgCZSs/TfU + aE1bgirtJxgRnvYBXKO1yQij20IsL5UM7YOCFxgzHmmjNuemKDeoCmKcU+tVFguE + ga4q9hhHTX8ZyqecgXUVwdMze67Sbv9T1eoG0smnvA+AjlElP5fKVUy9pBJI5+/5 + +gKYilSfHxAj6N2hTnfp/OO2OwKBgQC525kcFSSNPw4Gt9VfaWlJtk0Oa6lrOn3H + 4q00mYAcao5uIMAny7rW2KZB6YDB6X85y41XzecjOai/mAetNjNMR4KgDDMgP7JQ + yDjfHH8RxFcO86pxUriZeiU8X+dxRQQ7FXP/A5rCHtwZ+pj9YxE0/ZunDYxQmaC8 + sbX7dEfUDQKBgG9aiviKlT9G8O3yzBh+84+xI487pAx5pKJituOkpqcAfLU2eime + OtVVa3VGA32hgFxUZ3FIuSFLJPKd9Cs7I9Ttf89jOf6atdOEs+MqB6/AgtZu+iiL + NGsuUOk10T8RLg1DNDHglluBdyZ5gkuWjAP+iylnt7LCfM7tTnE0bzBrAoGBALSL + zsCpCUjs6AM+sdht3gntPg20KHAx8d4rJXbjZsA0AwiYaBJAps/uxhNhceLtoNnU + Ewooy1A8wuDcHxj0fgCrtwki0MeTGPXAiv6x//6SbL/plLlhUlJFhcaQo5Q1J1b+ + ECC6r6vDrqzN87CyfBSuCHbPgm8Jzkt/lvkejGhBAoGBAIAine7tBZ5G4bJxir3k + jA3tyO0TgdXUJz0KIJLiXeNf8K7xn+8CDu6UZ/WuDAMxMkwZdgZydufSWjhBGOXc + icGEqep9T0p/DPD6kz+HTMd1ad+Z+KtHA8G29NPwmn93TDAnVgQRIFh2LOHmwN74 + TK5RPdOw9kN1/BXQpWQOdwzn \ No newline at end of file diff --git a/postgres-to-snowflake-with-cdc/docker-compose.yml b/postgres-to-snowflake-with-cdc/docker-compose.yml index 5cc4ea4..ea34e80 100644 --- a/postgres-to-snowflake-with-cdc/docker-compose.yml +++ b/postgres-to-snowflake-with-cdc/docker-compose.yml @@ -34,7 +34,7 @@ services: volumes: - ./shadowtraffic:/data command: --config /data/config.json -# --with-studio --sample 20 --watch +# --with-studio --sample 20 --watch ports: # - 8080:8080 - 9400:9400 diff --git a/postgres-to-snowflake-with-cdc/postgres/add-pk.sql b/postgres-to-snowflake-with-cdc/postgres/add-pk.sql deleted file mode 100644 index d1b8ff7..0000000 --- a/postgres-to-snowflake-with-cdc/postgres/add-pk.sql +++ /dev/null @@ -1,14 +0,0 @@ -alter table customers -alter column customer_id set not null; -alter table customers -add constraint pk_customers primary key (customer_id); - -alter table pets -alter column pet_id set not null; -alter table pets -add constraint pk_pets primary key (pet_id); - -alter table appointments -alter column appointment_id set not null; -alter table appointments -add constraint pk_appointments primary key (appointment_id ); \ No newline at end of file diff --git a/postgres-to-snowflake-with-cdc/postgres/replication-config.sql b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql index 368f17f..e808dd6 100644 --- a/postgres-to-snowflake-with-cdc/postgres/replication-config.sql +++ b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql @@ -1,3 +1,6 @@ -ALTER TABLE customers REPLICA IDENTITY FULL; -ALTER TABLE pets REPLICA IDENTITY FULL; -ALTER TABLE appointments REPLICA IDENTITY FULL; \ No newline at end of file +ALTER TABLE omd_customers REPLICA IDENTITY FULL; +ALTER TABLE omd_pets REPLICA IDENTITY FULL; +ALTER TABLE omd_appointments REPLICA IDENTITY FULL; + +SELECT oid::regclass, relreplident FROM pg_class + WHERE oid in ( 'omd_customers'::regclass, 'omd_pets'::regclass, 'omd_appointments'::regclass); diff --git a/postgres-to-snowflake-with-cdc/shadowtraffic/config.json b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json index 05c7fe0..80109b0 100644 --- a/postgres-to-snowflake-with-cdc/shadowtraffic/config.json +++ b/postgres-to-snowflake-with-cdc/shadowtraffic/config.json @@ -1,11 +1,11 @@ { "generators": [ { - "table": "customers", + "table": "omd_customers", "row": { "customer_id": { "_gen": "uuid", - "pgHint": "TEXT" + "pgHint" : "TEXT PRIMARY KEY" }, "first_name": { "_gen": "string", @@ -31,16 +31,15 @@ } }, "localConfigs": { - "throttleMs": 1000, - "maxEvents": 500 + "throttleMs": 1000 } }, { - "table": "pets", + "table": "omd_pets", "fork": { "key": { "_gen": "lookup", - "table": "customers", + "table": "omd_customers", "path": [ "row", "customer_id" @@ -51,11 +50,12 @@ "row": { "pet_id": { "_gen": "uuid", - "pgHint": "TEXT" + "pgHint" : "TEXT PRIMARY KEY" }, "customer_id": { "_gen": "var", - "var": "forkKey" + "var": "forkKey", + "pgHint" : "TEXT" }, "pet_name": { "_gen": "string", @@ -92,8 +92,8 @@ } }, { - "table": "appointments", - "vars": { + "table": "omd_appointments", + "varsOnce": { "apt_key": { "_gen": "uuid", "cast": "string" @@ -102,8 +102,8 @@ "fork": { "key": { "_gen": "lookup", - "table": "pets", - "path": [ + "table": "omd_pets", + "path": [ "row" ] }, @@ -180,8 +180,8 @@ "row": { "appointment_id": { "_gen": "var", - "pgHint": "TEXT", - "var": "apt_key" + "var": "apt_key", + "pgHint" : "TEXT PRIMARY KEY" }, "customer_id": { "_gen": "var", @@ -267,6 +267,7 @@ "connections": { "pg": { "kind": "postgres", + "tablePolicy": "dropAndCreate", "connectionConfigs": { "host": "postgres", "port": 5432, From aeb92cba48f358a7c17b44712be9ff52263617d6 Mon Sep 17 00:00:00 2001 From: Robin Moffatt Date: Tue, 12 Nov 2024 15:09:07 +0000 Subject: [PATCH 4/5] Update postgres-to-snowflake-with-cdc/docker-compose.yml Co-authored-by: Gunnar Morling --- postgres-to-snowflake-with-cdc/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgres-to-snowflake-with-cdc/docker-compose.yml b/postgres-to-snowflake-with-cdc/docker-compose.yml index ea34e80..2b50996 100644 --- a/postgres-to-snowflake-with-cdc/docker-compose.yml +++ b/postgres-to-snowflake-with-cdc/docker-compose.yml @@ -14,7 +14,7 @@ services: postgres: container_name: postgres - image: quay.io/debezium/example-postgres:2.3 + image: quay.io/debezium/example-postgres:3.0 ports: - "5432:5432" environment: From 27475488ec4893258790ef0187180912f51d8359 Mon Sep 17 00:00:00 2001 From: Robin Moffatt Date: Tue, 12 Nov 2024 15:09:47 +0000 Subject: [PATCH 5/5] comment out SELECT --- .../postgres/replication-config.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/postgres-to-snowflake-with-cdc/postgres/replication-config.sql b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql index e808dd6..70c51aa 100644 --- a/postgres-to-snowflake-with-cdc/postgres/replication-config.sql +++ b/postgres-to-snowflake-with-cdc/postgres/replication-config.sql @@ -2,5 +2,6 @@ ALTER TABLE omd_customers REPLICA IDENTITY FULL; ALTER TABLE omd_pets REPLICA IDENTITY FULL; ALTER TABLE omd_appointments REPLICA IDENTITY FULL; -SELECT oid::regclass, relreplident FROM pg_class - WHERE oid in ( 'omd_customers'::regclass, 'omd_pets'::regclass, 'omd_appointments'::regclass); +-- To check that this worked, run: +-- SELECT oid::regclass, relreplident FROM pg_class +-- WHERE oid in ( 'omd_customers'::regclass, 'omd_pets'::regclass, 'omd_appointments'::regclass);