From 61b837df3cf371ba12d60fe9ba7c6e1cb99ec87f Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Tue, 25 Feb 2025 17:53:04 -0600 Subject: [PATCH 1/5] source-bigquery-batch: Add /_meta/row_id and op --- source-bigquery-batch/driver.go | 109 +++++++++++++++++++++++++---- source-bigquery-batch/main.go | 16 +++++ source-bigquery-batch/main_test.go | 4 +- 3 files changed, 115 insertions(+), 14 deletions(-) diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 041b34a8cb..5e257ad3d0 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -111,10 +111,19 @@ func (r Resource) Validate() error { type documentMetadata struct { Polled time.Time `json:"polled" jsonschema:"title=Polled Timestamp,description=The time at which the update query which produced this document as executed."` Index int `json:"index" jsonschema:"title=Result Index,description=The index of this document within the query execution which produced it."` + RowID int64 `json:"row_id" jsonschema:"title=Row ID,description=Row ID of the Document, counting up from zero."` + Op string `json:"op,omitempty" jsonschema:"title=Change Operation,description=Operation type (c: Create / u: Update / d: Delete),enum=c,enum=u,enum=d,default=u"` } -// The fallback collection key just refers to the polling iteration and result index of each document. -var fallbackKey = []string{"/_meta/polled", "/_meta/index"} +var ( + // The fallback key of discovered collections when the source table has no primary key. + fallbackKey = []string{"/_meta/row_id"} + + // Old captures used a different fallback key which included a value identifying + // the specific polling iteration which produced the document. This proved less + // than ideal for full-refresh bindings on keyless tables. + fallbackKeyOld = []string{"/_meta/polled", "/_meta/index"} +) func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) { // Generate schema for the metadata via reflection @@ -244,9 +253,12 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove } // Try to generate a useful collection schema, but on error fall back to the - // minimal schema with the default key [/_meta/polled, /_meta/index]. + // minimal schema with a fallback collection key which is always present. var collectionSchema = minimalSchema var collectionKey = fallbackKey + if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { + collectionKey = fallbackKeyOld + } if tableKey, ok := keysByTable[tableID]; ok { if generatedSchema, err := generateCollectionSchema(tableKey.Columns, tableKey.ColumnTypes); err == nil { collectionSchema = generatedSchema @@ -451,9 +463,10 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO return fmt.Errorf("parsing resource config: %w", err) } bindings = append(bindings, bindingInfo{ - resource: &res, - index: idx, - stateKey: boilerplate.StateKey(binding.StateKey), + resource: &res, + index: idx, + stateKey: boilerplate.StateKey(binding.StateKey), + collectionKey: binding.Collection.Key, }) } @@ -519,9 +532,10 @@ type capture struct { } type bindingInfo struct { - resource *Resource - index int - stateKey boilerplate.StateKey + resource *Resource + index int + stateKey boilerplate.StateKey + collectionKey []string // The key of the output collection, as an array of JSON pointers. } type captureState struct { @@ -529,9 +543,10 @@ type captureState struct { } type streamState struct { - CursorNames []string - CursorValues []any - LastPolled time.Time + CursorNames []string + CursorValues []any + LastPolled time.Time + DocumentCount int64 // A count of the number of documents emitted since the last full refresh started. } func (s *captureState) Validate() error { @@ -589,6 +604,17 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template var cursorNames = state.CursorNames var cursorValues = state.CursorValues + // If the key of the output collection for this binding is the Row ID then we + // can automatically provide useful `/_meta/op` values and inferred deletions. + var isRowIDKey = len(binding.collectionKey) == 1 && binding.collectionKey[0] == "/_meta/row_id" + + // Two distinct concepts: + // - If we have no resume cursor _columns_ then every query is a full refresh. + // - If we have no resume cursor _values_ then this is a backfill query, which + // could either be a full refresh or the initial query of an incremental binding. + var isFullRefresh = len(cursorNames) == 0 + var isInitialBackfill = len(cursorValues) == 0 + var quotedCursorNames []string for _, cursorName := range cursorNames { quotedCursorNames = append(quotedCursorNames, quoteIdentifier(cursorName)) @@ -633,9 +659,18 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template } var query = queryBuf.String() + // For incremental updates of a binding with a cursor, continue counting from where + // we left off. For initial backfills (which includes the first query of an incremental + // binding as well as every query of a full-refresh binding) restart from zero. + var nextRowID = state.DocumentCount + if isInitialBackfill { + nextRowID = 0 + } + log.WithFields(log.Fields{ "query": query, "args": fmt.Sprintf("%#v", cursorValues), + "rowID": nextRowID, }).Info("executing query") var pollTime = time.Now().UTC() @@ -699,10 +734,24 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template // plus the `_meta` property we add. rowValues = make([]any, len(row)+1) } - rowValues[0] = &documentMetadata{ + var metadata = &documentMetadata{ + RowID: nextRowID, Polled: pollTime, Index: count, } + if isRowIDKey { + // When the output key of a binding is the row ID, we can provide useful + // create/update change operation values based on that row ID. This logic + // will always set the operation to "c" for a cursor-incremental binding + // with row ID key since the row ID is always increasing. + if nextRowID < state.DocumentCount { + metadata.Op = "u" + } else { + metadata.Op = "c" + } + } + rowValues[0] = metadata + for idx, val := range row { var translatedValue, err = translateBigQueryValue(val, rows.Schema[idx].Type) if err != nil { @@ -729,7 +778,20 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template state.CursorValues = cursorValues count++ + nextRowID++ + if count%documentsPerCheckpoint == 0 { + // When a full-refresh binding outputs into a collection with key `/_meta/row_id` + // we rely on the persisted DocumentCount not being updated until after the whole + // update query completes successfully, so that we can infer deletions of any rows + // between the last rowID of the latest query and the persisted DocumentCount. + // + // But when emitting partial-progress updates on a _non_ full-refresh binding, we + // need to update the persisted DocumentCount on each partial progress checkpoint + // so that the rowID the next poll resumes from will match the persisted cursor. + if !isFullRefresh { + state.DocumentCount = nextRowID + } if err := c.streamStateCheckpoint(stateKey, state); err != nil { return err } @@ -738,6 +800,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template log.WithFields(log.Fields{ "name": res.Name, "count": count, + "rowID": nextRowID, }).Info("processing query results") } } @@ -746,8 +809,28 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template "name": res.Name, "query": query, "count": count, + "rowID": nextRowID, + "docs": state.DocumentCount, }).Info("polling complete") + + // A full-refresh binding whose output collection uses the key /_meta/row_id can + // infer deletions whenever a refresh yields fewer rows than last time. + if isRowIDKey && isFullRefresh { + var pollTimestamp = pollTime.Format(time.RFC3339Nano) + for i := int64(0); i < state.DocumentCount-nextRowID; i++ { + // These inferred-deletion documents are simple enough that we can generate + // them with a simple Sprintf rather than going through a whole JSON encoder. + var doc = fmt.Sprintf( + `{"_meta":{"polled":%q,"index":%d,"row_id":%d,"op":"d"}}`, + pollTimestamp, count+int(i), nextRowID+i) + if err := c.Output.Documents(binding.index, json.RawMessage(doc)); err != nil { + return fmt.Errorf("error emitting document: %w", err) + } + } + } + state.LastPolled = pollTime + state.DocumentCount = nextRowID // Always update persisted count on successful completion if err := c.streamStateCheckpoint(stateKey, state); err != nil { return err } diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index f7bc9c52df..9f00d7f4a0 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -11,6 +11,7 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" + "github.com/estuary/connectors/go/common" "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" @@ -18,6 +19,12 @@ import ( "google.golang.org/api/option" ) +var featureFlagDefaults = map[string]bool{ + // When true, the fallback collection key for keyless source tables will be + // ["/_meta/row_id"] instead of ["/_meta/polled", "/_meta/index"]. + "keyless_row_id": false, +} + // Config tells the connector how to connect to and interact with the source database. type Config struct { ProjectID string `json:"project_id" jsonschema:"title=Project ID,description=Google Cloud Project ID that owns the BigQuery dataset(s)." jsonschema_extras:"order=0"` @@ -29,6 +36,9 @@ type Config struct { type advancedConfig struct { PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` + FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` + + parsedFeatureFlags map[string]bool // Parsed feature flags setting with defaults applied } // Validate checks that the configuration possesses all required properties. @@ -54,6 +64,12 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid default polling schedule %q: %w", c.Advanced.PollSchedule, err) } } + // Strictly speaking this feature-flag parsing isn't validation at all, but it's a convenient + // method that we can be sure always gets called before the config is used. + c.Advanced.parsedFeatureFlags = common.ParseFeatureFlags(c.Advanced.FeatureFlags, featureFlagDefaults) + if c.Advanced.FeatureFlags != "" { + log.WithField("flags", c.Advanced.parsedFeatureFlags).Info("parsed feature flags") + } return nil } diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index 15dcf887f4..cbf0f96753 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -45,6 +45,8 @@ var ( "testdata", "The dataset (schema) to create test tables in", ) + + testFeatureFlags = flag.String("feature_flags", "", "Feature flags to apply to all test captures.") ) func TestMain(m *testing.M) { @@ -74,13 +76,13 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec { Dataset: *testDataset, Advanced: advancedConfig{ PollSchedule: "200ms", + FeatureFlags: *testFeatureFlags, }, } var sanitizers = make(map[string]*regexp.Regexp) sanitizers[`"polled":""`] = regexp.MustCompile(`"polled":"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]+:[0-9]+)"`) sanitizers[`"LastPolled":""`] = regexp.MustCompile(`"LastPolled":"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]+:[0-9]+)"`) - sanitizers[`"index":999`] = regexp.MustCompile(`"index":[0-9]+`) return &st.CaptureSpec{ Driver: bigqueryDriver, From c381b6a833e240f6958046e15e0ad7d4d9ab41b5 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Tue, 25 Feb 2025 18:13:30 -0600 Subject: [PATCH 2/5] source-bigquery-batch: Update test snapshots --- .../.snapshots/TestBinaryTypes-Capture | 14 +++++----- .../.snapshots/TestBinaryTypes-Discovery | 21 +++++++++++++-- .../TestCaptureWithDatetimeCursor-Capture | 22 ++++++++-------- .../TestCaptureWithDatetimeCursor-Discovery | 21 +++++++++++++-- .../TestCaptureWithEmptyPoll-Capture | 22 ++++++++-------- .../TestCaptureWithEmptyPoll-Discovery | 21 +++++++++++++-- .../TestCaptureWithModifications-Capture | 22 ++++++++-------- .../TestCaptureWithModifications-Discovery | 21 +++++++++++++-- .../TestCaptureWithNullCursor-Capture1 | 12 ++++----- .../TestCaptureWithNullCursor-Capture2 | 6 ++--- .../TestCaptureWithNullCursor-Discovery | 21 +++++++++++++-- .../TestCaptureWithTwoColumnCursor-Capture | 26 +++++++++---------- .../TestCaptureWithTwoColumnCursor-Discovery | 21 +++++++++++++-- .../TestCaptureWithUpdatedAtCursor-Capture | 22 ++++++++-------- .../TestCaptureWithUpdatedAtCursor-Discovery | 21 +++++++++++++-- .../.snapshots/TestCompositeTypes-Capture | 8 +++--- .../.snapshots/TestCompositeTypes-Discovery | 21 +++++++++++++-- .../.snapshots/TestFullRefresh-Capture1 | 10 +++---- .../.snapshots/TestFullRefresh-Capture2 | 18 ++++++------- .../.snapshots/TestFullRefresh-Discovery | 19 +++++++++++++- .../.snapshots/TestIntegerTypes-Capture | 12 ++++----- .../.snapshots/TestIntegerTypes-Discovery | 21 +++++++++++++-- .../.snapshots/TestJSONType-Capture | 18 ++++++------- .../.snapshots/TestJSONType-Discovery | 21 +++++++++++++-- .../.snapshots/TestNumericTypes-Capture | 12 ++++----- .../.snapshots/TestNumericTypes-Discovery | 21 +++++++++++++-- .../TestQueryTemplateOverride-Capture | 22 ++++++++-------- .../TestQueryTemplateOverride-Discovery | 21 +++++++++++++-- .../.snapshots/TestSimpleCapture-Capture | 18 ++++++------- .../.snapshots/TestSimpleCapture-Discovery | 19 +++++++++++++- source-bigquery-batch/.snapshots/TestSpec | 5 ++++ .../.snapshots/TestStringTypes-Capture | 14 +++++----- .../.snapshots/TestStringTypes-Discovery | 21 +++++++++++++-- .../.snapshots/TestTemporalTypes-Capture | 12 ++++----- .../.snapshots/TestTemporalTypes-Discovery | 21 +++++++++++++-- 35 files changed, 452 insertions(+), 175 deletions(-) diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture b/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture index 117e93d3c3..8f2b96480d 100644 --- a/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture @@ -1,14 +1,14 @@ # ================================ # Collection "acmeCo/test/binarytypes_537491": 6 Documents # ================================ -{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":"aGVsbG8gd29ybGQ=","id":1} -{"_meta":{"polled":"","index":999},"bool_val":false,"bytes_val":"AAECAw==","id":2} -{"_meta":{"polled":"","index":999},"bool_val":null,"bytes_val":null,"id":3} -{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":null,"id":4} -{"_meta":{"polled":"","index":999},"bool_val":false,"bytes_val":"qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqg==","id":5} -{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":"SGVsbG8sIOS4lueVjCE=","id":6} +{"_meta":{"polled":"","index":0,"row_id":0},"bool_val":true,"bytes_val":"aGVsbG8gd29ybGQ=","id":1} +{"_meta":{"polled":"","index":1,"row_id":1},"bool_val":false,"bytes_val":"AAECAw==","id":2} +{"_meta":{"polled":"","index":2,"row_id":2},"bool_val":null,"bytes_val":null,"id":3} +{"_meta":{"polled":"","index":3,"row_id":3},"bool_val":true,"bytes_val":null,"id":4} +{"_meta":{"polled":"","index":4,"row_id":4},"bool_val":false,"bytes_val":"qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqg==","id":5} +{"_meta":{"polled":"","index":5,"row_id":5},"bool_val":true,"bytes_val":"SGVsbG8sIOS4lueVjCE=","id":6} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"binarytypes_537491":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":""}}} +{"bindingStateV1":{"binarytypes_537491":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":"","DocumentCount":6}}} diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery index 64410793c1..9eb2219bfc 100644 --- a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture index faac47173c..65ccc93fa9 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture @@ -1,18 +1,18 @@ # ================================ # Collection "acmeCo/test/capturewithdatetimecursor_877736": 10 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00.000000"} -{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00.000000"} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00.000000"} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00.000000"} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00.000000"} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00.000000"} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00.000000"} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00.000000"} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00.000000"} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00.000000"} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00.000000"} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00.000000"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithdatetimecursor_877736":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00.000000"],"LastPolled":""}}} +{"bindingStateV1":{"capturewithdatetimecursor_877736":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00.000000"],"DocumentCount":10,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery index c04a2060b4..27fd4ce4c2 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture index ae9c0d67b0..1aa3dd9c78 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture +++ b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture @@ -1,18 +1,18 @@ # ================================ # Collection "acmeCo/test/capturewithemptypoll_890703": 10 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithemptypoll_890703":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} +{"bindingStateV1":{"capturewithemptypoll_890703":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"DocumentCount":10,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery index 58d00e01d5..01157a872f 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture index 239a6c891f..54dc029023 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture +++ b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture @@ -1,18 +1,18 @@ # ================================ # Collection "acmeCo/test/capturewithmodifications_786099": 10 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Initial value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} -{"_meta":{"polled":"","index":999},"data":"Initial value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} -{"_meta":{"polled":"","index":999},"data":"Initial value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} -{"_meta":{"polled":"","index":999},"data":"Initial value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} -{"_meta":{"polled":"","index":999},"data":"Initial value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} -{"_meta":{"polled":"","index":999},"data":"Modified value for row 3","id":3,"updated_at":"2025-02-13T12:15:00Z"} -{"_meta":{"polled":"","index":999},"data":"Modified value for row 4","id":4,"updated_at":"2025-02-13T12:16:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:20:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:21:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:22:00Z"} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Initial value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Modified value for row 3","id":3,"updated_at":"2025-02-13T12:15:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Modified value for row 4","id":4,"updated_at":"2025-02-13T12:16:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:20:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:21:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:22:00Z"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithmodifications_786099":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:22:00Z"],"LastPolled":""}}} +{"bindingStateV1":{"capturewithmodifications_786099":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:22:00Z"],"DocumentCount":10,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery index d5e972743b..ee8d57fdbc 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 index 033e59308c..52a171ae15 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 @@ -1,13 +1,13 @@ # ================================ # Collection "acmeCo/test/capturewithnullcursor_662607": 5 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Another NULL cursor","id":2,"sort_col":null} -{"_meta":{"polled":"","index":999},"data":"Third NULL cursor","id":4,"sort_col":null} -{"_meta":{"polled":"","index":999},"data":"Value with NULL cursor","id":0,"sort_col":null} -{"_meta":{"polled":"","index":999},"data":"Value with cursor 10","id":1,"sort_col":10} -{"_meta":{"polled":"","index":999},"data":"Value with cursor 20","id":3,"sort_col":20} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Another NULL cursor","id":2,"sort_col":null} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value with NULL cursor","id":0,"sort_col":null} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Third NULL cursor","id":4,"sort_col":null} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value with cursor 10","id":1,"sort_col":10} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value with cursor 20","id":3,"sort_col":20} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[20],"LastPolled":""}}} +{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[20],"LastPolled":"","DocumentCount":5}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 index eff1455dc6..4d8befe966 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 @@ -1,10 +1,10 @@ # ================================ # Collection "acmeCo/test/capturewithnullcursor_662607": 2 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Final value cursor 30","id":9,"sort_col":30} -{"_meta":{"polled":"","index":999},"data":"Value with cursor 25","id":7,"sort_col":25} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Value with cursor 25","id":7,"sort_col":25} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Final value cursor 30","id":9,"sort_col":30} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[30],"LastPolled":""}}} +{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[30],"DocumentCount":7,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery index 3cacc6ed0f..c230e112dd 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture index fab387e5c1..5fd061298b 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture +++ b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture @@ -1,20 +1,20 @@ # ================================ # Collection "acmeCo/test/capturewithtwocolumncursor_321285": 12 Documents # ================================ -{"_meta":{"polled":"","index":999},"col1":1,"col2":1,"data":"Value for row 0","id":0} -{"_meta":{"polled":"","index":999},"col1":1,"col2":2,"data":"Value for row 1","id":1} -{"_meta":{"polled":"","index":999},"col1":1,"col2":3,"data":"Value for row 2","id":2} -{"_meta":{"polled":"","index":999},"col1":2,"col2":1,"data":"Value for row 3","id":3} -{"_meta":{"polled":"","index":999},"col1":2,"col2":2,"data":"Value for row 4","id":4} -{"_meta":{"polled":"","index":999},"col1":2,"col2":3,"data":"Value for row 5","id":5} -{"_meta":{"polled":"","index":999},"col1":3,"col2":1,"data":"Value for row 8","id":8} -{"_meta":{"polled":"","index":999},"col1":3,"col2":2,"data":"Value for row 9","id":9} -{"_meta":{"polled":"","index":999},"col1":3,"col2":3,"data":"Value for row 10","id":10} -{"_meta":{"polled":"","index":999},"col1":4,"col2":1,"data":"Value for row 11","id":11} -{"_meta":{"polled":"","index":999},"col1":4,"col2":2,"data":"Value for row 12","id":12} -{"_meta":{"polled":"","index":999},"col1":4,"col2":3,"data":"Value for row 13","id":13} +{"_meta":{"polled":"","index":0,"row_id":0},"col1":1,"col2":1,"data":"Value for row 0","id":0} +{"_meta":{"polled":"","index":1,"row_id":1},"col1":1,"col2":2,"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":2,"row_id":2},"col1":1,"col2":3,"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":3,"row_id":3},"col1":2,"col2":1,"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":4,"row_id":4},"col1":2,"col2":2,"data":"Value for row 4","id":4} +{"_meta":{"polled":"","index":5,"row_id":5},"col1":2,"col2":3,"data":"Value for row 5","id":5} +{"_meta":{"polled":"","index":0,"row_id":6},"col1":3,"col2":1,"data":"Value for row 8","id":8} +{"_meta":{"polled":"","index":1,"row_id":7},"col1":3,"col2":2,"data":"Value for row 9","id":9} +{"_meta":{"polled":"","index":2,"row_id":8},"col1":3,"col2":3,"data":"Value for row 10","id":10} +{"_meta":{"polled":"","index":3,"row_id":9},"col1":4,"col2":1,"data":"Value for row 11","id":11} +{"_meta":{"polled":"","index":4,"row_id":10},"col1":4,"col2":2,"data":"Value for row 12","id":12} +{"_meta":{"polled":"","index":5,"row_id":11},"col1":4,"col2":3,"data":"Value for row 13","id":13} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithtwocolumncursor_321285":{"CursorNames":["col1","col2"],"CursorValues":[4,3],"LastPolled":""}}} +{"bindingStateV1":{"capturewithtwocolumncursor_321285":{"CursorNames":["col1","col2"],"CursorValues":[4,3],"DocumentCount":12,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery index 0ffa4330f0..c6ceb73b07 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery @@ -22,7 +22,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -35,12 +35,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture index b4be37a09b..e22088724e 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture @@ -1,18 +1,18 @@ # ================================ # Collection "acmeCo/test/capturewithupdatedatcursor_792371": 10 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"capturewithupdatedatcursor_792371":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} +{"bindingStateV1":{"capturewithupdatedatcursor_792371":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"DocumentCount":10,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery index cb2c8da87a..84c269948e 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture b/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture index 178d51f206..bb8d83c9ad 100644 --- a/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture @@ -1,11 +1,11 @@ # ================================ # Collection "acmeCo/test/compositetypes_483529": 3 Documents # ================================ -{"_meta":{"polled":"","index":999},"array_struct":[[1,"first"],[2,"second"]],"id":1,"int_array":[1,2,3],"string_array":["a","b","c"],"struct_val":["Alice",25,[90.5,85,92.3]]} -{"_meta":{"polled":"","index":999},"array_struct":null,"id":2,"int_array":null,"string_array":null,"struct_val":["Bob",30,null]} -{"_meta":{"polled":"","index":999},"array_struct":null,"id":3,"int_array":null,"string_array":null,"struct_val":null} +{"_meta":{"polled":"","index":0,"row_id":0},"array_struct":[[1,"first"],[2,"second"]],"id":1,"int_array":[1,2,3],"string_array":["a","b","c"],"struct_val":["Alice",25,[90.5,85,92.3]]} +{"_meta":{"polled":"","index":1,"row_id":1},"array_struct":null,"id":2,"int_array":null,"string_array":null,"struct_val":["Bob",30,null]} +{"_meta":{"polled":"","index":2,"row_id":2},"array_struct":null,"id":3,"int_array":null,"string_array":null,"struct_val":null} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"compositetypes_483529":{"CursorNames":["id"],"CursorValues":[3],"LastPolled":""}}} +{"bindingStateV1":{"compositetypes_483529":{"CursorNames":["id"],"CursorValues":[3],"LastPolled":"","DocumentCount":3}}} diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery index c639196250..68c6a0cee6 100644 --- a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 index 660f21e432..dd73e9a382 100644 --- a/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 @@ -1,12 +1,12 @@ # ================================ # Collection "acmeCo/test/fullrefresh_902536": 4 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 0","id":0} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"fullrefresh_902536":{"CursorNames":null,"CursorValues":null,"LastPolled":""}}} +{"bindingStateV1":{"fullrefresh_902536":{"CursorNames":null,"CursorValues":null,"LastPolled":"","DocumentCount":4}}} diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 index c4faa2476b..565c65e8d1 100644 --- a/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 @@ -1,16 +1,16 @@ # ================================ # Collection "acmeCo/test/fullrefresh_902536": 8 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 0","id":0} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 6","id":6} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 7","id":7} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value for row 5","id":5} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Value for row 4","id":4} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Value for row 3","id":3} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"fullrefresh_902536":{"LastPolled":""}}} +{"bindingStateV1":{"fullrefresh_902536":{"DocumentCount":8,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery index 51b8e6eebe..aeeeea1193 100644 --- a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery @@ -31,12 +31,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture b/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture index e51173e80f..af28f60bcd 100644 --- a/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture @@ -1,13 +1,13 @@ # ================================ # Collection "acmeCo/test/integertypes_795898": 5 Documents # ================================ -{"_meta":{"polled":"","index":999},"bigint_val":-9223372036854775808,"byteint_val":-9223372036854775808,"id":1,"int_val":-9223372036854775808,"integer_val":-9223372036854775808,"smallint_val":-9223372036854775808,"tinyint_val":-9223372036854775808} -{"_meta":{"polled":"","index":999},"bigint_val":9223372036854775807,"byteint_val":9223372036854775807,"id":2,"int_val":9223372036854775807,"integer_val":9223372036854775807,"smallint_val":9223372036854775807,"tinyint_val":9223372036854775807} -{"_meta":{"polled":"","index":999},"bigint_val":0,"byteint_val":0,"id":3,"int_val":0,"integer_val":0,"smallint_val":0,"tinyint_val":0} -{"_meta":{"polled":"","index":999},"bigint_val":null,"byteint_val":null,"id":4,"int_val":null,"integer_val":null,"smallint_val":null,"tinyint_val":null} -{"_meta":{"polled":"","index":999},"bigint_val":789,"byteint_val":34,"id":5,"int_val":42,"integer_val":-456,"smallint_val":123,"tinyint_val":-12} +{"_meta":{"polled":"","index":0,"row_id":0},"bigint_val":-9223372036854775808,"byteint_val":-9223372036854775808,"id":1,"int_val":-9223372036854775808,"integer_val":-9223372036854775808,"smallint_val":-9223372036854775808,"tinyint_val":-9223372036854775808} +{"_meta":{"polled":"","index":1,"row_id":1},"bigint_val":9223372036854775807,"byteint_val":9223372036854775807,"id":2,"int_val":9223372036854775807,"integer_val":9223372036854775807,"smallint_val":9223372036854775807,"tinyint_val":9223372036854775807} +{"_meta":{"polled":"","index":2,"row_id":2},"bigint_val":0,"byteint_val":0,"id":3,"int_val":0,"integer_val":0,"smallint_val":0,"tinyint_val":0} +{"_meta":{"polled":"","index":3,"row_id":3},"bigint_val":null,"byteint_val":null,"id":4,"int_val":null,"integer_val":null,"smallint_val":null,"tinyint_val":null} +{"_meta":{"polled":"","index":4,"row_id":4},"bigint_val":789,"byteint_val":34,"id":5,"int_val":42,"integer_val":-456,"smallint_val":123,"tinyint_val":-12} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"integertypes_795898":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} +{"bindingStateV1":{"integertypes_795898":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":"","DocumentCount":5}}} diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery index 2de58074d5..541d173acd 100644 --- a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Capture b/source-bigquery-batch/.snapshots/TestJSONType-Capture index 24d2591be9..674fb134fa 100644 --- a/source-bigquery-batch/.snapshots/TestJSONType-Capture +++ b/source-bigquery-batch/.snapshots/TestJSONType-Capture @@ -1,16 +1,16 @@ # ================================ # Collection "acmeCo/test/jsontype_519262": 8 Documents # ================================ -{"_meta":{"polled":"","index":999},"id":1,"json_val":{"age":30,"name":"Alice"}} -{"_meta":{"polled":"","index":999},"id":2,"json_val":[1,2,3,"four",true,null]} -{"_meta":{"polled":"","index":999},"id":3,"json_val":{"orders":[{"id":1,"items":["apple","banana"]},{"id":2,"items":["orange"]}],"user":{"email":"bob@example.com","name":"Bob"}}} -{"_meta":{"polled":"","index":999},"id":4,"json_val":null} -{"_meta":{"polled":"","index":999},"id":5,"json_val":null} -{"_meta":{"polled":"","index":999},"id":6,"json_val":{}} -{"_meta":{"polled":"","index":999},"id":7,"json_val":[]} -{"_meta":{"polled":"","index":999},"id":8,"json_val":{"message":"Hello, 世界!\nNew line\"Quotes\"\\Backslash"}} +{"_meta":{"polled":"","index":0,"row_id":0},"id":1,"json_val":{"age":30,"name":"Alice"}} +{"_meta":{"polled":"","index":1,"row_id":1},"id":2,"json_val":[1,2,3,"four",true,null]} +{"_meta":{"polled":"","index":2,"row_id":2},"id":3,"json_val":{"orders":[{"id":1,"items":["apple","banana"]},{"id":2,"items":["orange"]}],"user":{"email":"bob@example.com","name":"Bob"}}} +{"_meta":{"polled":"","index":3,"row_id":3},"id":4,"json_val":null} +{"_meta":{"polled":"","index":4,"row_id":4},"id":5,"json_val":null} +{"_meta":{"polled":"","index":5,"row_id":5},"id":6,"json_val":{}} +{"_meta":{"polled":"","index":6,"row_id":6},"id":7,"json_val":[]} +{"_meta":{"polled":"","index":7,"row_id":7},"id":8,"json_val":{"message":"Hello, 世界!\nNew line\"Quotes\"\\Backslash"}} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"jsontype_519262":{"CursorNames":["id"],"CursorValues":[8],"LastPolled":""}}} +{"bindingStateV1":{"jsontype_519262":{"CursorNames":["id"],"CursorValues":[8],"LastPolled":"","DocumentCount":8}}} diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Discovery b/source-bigquery-batch/.snapshots/TestJSONType-Discovery index aa38f09f59..b58b294daa 100644 --- a/source-bigquery-batch/.snapshots/TestJSONType-Discovery +++ b/source-bigquery-batch/.snapshots/TestJSONType-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Capture b/source-bigquery-batch/.snapshots/TestNumericTypes-Capture index 6ae3aeb965..d1b7d5c3fd 100644 --- a/source-bigquery-batch/.snapshots/TestNumericTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Capture @@ -1,13 +1,13 @@ # ================================ # Collection "acmeCo/test/numerictypes_559424": 5 Documents # ================================ -{"_meta":{"polled":"","index":999},"bigdecimal_val":"99999999999999999999999999999999999999/1000000000","bignumeric_val":"99999999999999999999999999999999999999/1000000000","decimal_val":"99999999999999999999999999999999999999/1000000000","float64_val":1.7976931348623157e+308,"id":1,"numeric_val":"99999999999999999999999999999999999999/1000000000"} -{"_meta":{"polled":"","index":999},"bigdecimal_val":"-99999999999999999999999999999999999999/1000000000","bignumeric_val":"-99999999999999999999999999999999999999/1000000000","decimal_val":"-99999999999999999999999999999999999999/1000000000","float64_val":-1.7976931348623157e+308,"id":2,"numeric_val":"-99999999999999999999999999999999999999/1000000000"} -{"_meta":{"polled":"","index":999},"bigdecimal_val":"0","bignumeric_val":"0","decimal_val":"0","float64_val":0,"id":3,"numeric_val":"0"} -{"_meta":{"polled":"","index":999},"bigdecimal_val":"5632716/625","bignumeric_val":"6172839/5000","decimal_val":"197253/250","float64_val":3.14159,"id":4,"numeric_val":"15432/125"} -{"_meta":{"polled":"","index":999},"bigdecimal_val":"10000","bignumeric_val":"1000","decimal_val":"100","float64_val":12345,"id":5,"numeric_val":"42"} +{"_meta":{"polled":"","index":0,"row_id":0},"bigdecimal_val":"99999999999999999999999999999999999999/1000000000","bignumeric_val":"99999999999999999999999999999999999999/1000000000","decimal_val":"99999999999999999999999999999999999999/1000000000","float64_val":1.7976931348623157e+308,"id":1,"numeric_val":"99999999999999999999999999999999999999/1000000000"} +{"_meta":{"polled":"","index":1,"row_id":1},"bigdecimal_val":"-99999999999999999999999999999999999999/1000000000","bignumeric_val":"-99999999999999999999999999999999999999/1000000000","decimal_val":"-99999999999999999999999999999999999999/1000000000","float64_val":-1.7976931348623157e+308,"id":2,"numeric_val":"-99999999999999999999999999999999999999/1000000000"} +{"_meta":{"polled":"","index":2,"row_id":2},"bigdecimal_val":"0","bignumeric_val":"0","decimal_val":"0","float64_val":0,"id":3,"numeric_val":"0"} +{"_meta":{"polled":"","index":3,"row_id":3},"bigdecimal_val":"5632716/625","bignumeric_val":"6172839/5000","decimal_val":"197253/250","float64_val":3.14159,"id":4,"numeric_val":"15432/125"} +{"_meta":{"polled":"","index":4,"row_id":4},"bigdecimal_val":"10000","bignumeric_val":"1000","decimal_val":"100","float64_val":12345,"id":5,"numeric_val":"42"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"numerictypes_559424":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} +{"bindingStateV1":{"numerictypes_559424":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":"","DocumentCount":5}}} diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery index dcaa9b2d8b..29a335b33b 100644 --- a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture index 73972489f4..e62af538c0 100644 --- a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture +++ b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture @@ -1,18 +1,18 @@ # ================================ # Collection "acmeCo/test/querytemplateoverride_638679": 10 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} -{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"querytemplateoverride_638679":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} +{"bindingStateV1":{"querytemplateoverride_638679":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"DocumentCount":10,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery index 77e37c3f86..c64d3623ee 100644 --- a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery +++ b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery @@ -20,7 +20,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -33,12 +33,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture b/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture index 9999ea6c69..5fb1dc2057 100644 --- a/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture @@ -1,16 +1,16 @@ # ================================ # Collection "acmeCo/test/simplecapture_140272": 8 Documents # ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} -{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2} -{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3} -{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4} -{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5} -{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6} -{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Value for row 4","id":4} +{"_meta":{"polled":"","index":0,"row_id":4},"data":"Value for row 5","id":5} +{"_meta":{"polled":"","index":1,"row_id":5},"data":"Value for row 6","id":6} +{"_meta":{"polled":"","index":2,"row_id":6},"data":"Value for row 7","id":7} +{"_meta":{"polled":"","index":3,"row_id":7},"data":"Value for row 8","id":8} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"simplecapture_140272":{"CursorNames":["id"],"CursorValues":[8],"LastPolled":""}}} +{"bindingStateV1":{"simplecapture_140272":{"CursorNames":["id"],"CursorValues":[8],"DocumentCount":8,"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery index 52a9c52966..031fda8d38 100644 --- a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestSpec b/source-bigquery-batch/.snapshots/TestSpec index f7dbdb8616..26e6e9c272 100644 --- a/source-bigquery-batch/.snapshots/TestSpec +++ b/source-bigquery-batch/.snapshots/TestSpec @@ -30,6 +30,11 @@ "title": "Default Polling Schedule", "description": "When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset.", "pattern": "^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$" + }, + "feature_flags": { + "type": "string", + "title": "Feature Flags", + "description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support." } }, "additionalProperties": false, diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Capture b/source-bigquery-batch/.snapshots/TestStringTypes-Capture index 4d7d0aceda..d72c6b461b 100644 --- a/source-bigquery-batch/.snapshots/TestStringTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Capture @@ -1,14 +1,14 @@ # ================================ # Collection "acmeCo/test/stringtypes_339419": 6 Documents # ================================ -{"_meta":{"polled":"","index":999},"id":1,"string_len_val":"","string_val":""} -{"_meta":{"polled":"","index":999},"id":2,"string_len_val":null,"string_val":null} -{"_meta":{"polled":"","index":999},"id":3,"string_len_val":"Unicode: ñ, é, ü","string_val":"Hello, 世界!"} -{"_meta":{"polled":"","index":999},"id":4,"string_len_val":"Path: C:\\Program Files\\","string_val":"He said \"Hello\""} -{"_meta":{"polled":"","index":999},"id":5,"string_len_val":"Col1\tCol2\tCol3","string_val":"Line 1\nLine 2"} -{"_meta":{"polled":"","index":999},"id":6,"string_len_val":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb","string_val":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} +{"_meta":{"polled":"","index":0,"row_id":0},"id":1,"string_len_val":"","string_val":""} +{"_meta":{"polled":"","index":1,"row_id":1},"id":2,"string_len_val":null,"string_val":null} +{"_meta":{"polled":"","index":2,"row_id":2},"id":3,"string_len_val":"Unicode: ñ, é, ü","string_val":"Hello, 世界!"} +{"_meta":{"polled":"","index":3,"row_id":3},"id":4,"string_len_val":"Path: C:\\Program Files\\","string_val":"He said \"Hello\""} +{"_meta":{"polled":"","index":4,"row_id":4},"id":5,"string_len_val":"Col1\tCol2\tCol3","string_val":"Line 1\nLine 2"} +{"_meta":{"polled":"","index":5,"row_id":5},"id":6,"string_len_val":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb","string_val":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"stringtypes_339419":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":""}}} +{"bindingStateV1":{"stringtypes_339419":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":"","DocumentCount":6}}} diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery index c3e109de52..feebe01d7a 100644 --- a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture b/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture index 43d2a09cd3..ff76f41092 100644 --- a/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture @@ -1,13 +1,13 @@ # ================================ # Collection "acmeCo/test/temporaltypes_137023": 5 Documents # ================================ -{"_meta":{"polled":"","index":999},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.999999","id":1,"time_val":"15:04:05.999999000","timestamp_val":"2025-02-18T15:04:05.999999Z"} -{"_meta":{"polled":"","index":999},"date_val":"0001-01-01","datetime_val":"0001-01-01T00:00:00.000000","id":2,"time_val":"00:00:00","timestamp_val":"0001-01-01T00:00:00Z"} -{"_meta":{"polled":"","index":999},"date_val":null,"datetime_val":null,"id":3,"time_val":null,"timestamp_val":null} -{"_meta":{"polled":"","index":999},"date_val":"9999-12-31","datetime_val":"9999-12-31T23:59:59.999999","id":4,"time_val":"23:59:59.999999000","timestamp_val":"9999-12-31T23:59:59.999999Z"} -{"_meta":{"polled":"","index":999},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.000000","id":5,"time_val":"15:04:05","timestamp_val":"2025-02-18T23:04:05Z"} +{"_meta":{"polled":"","index":0,"row_id":0},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.999999","id":1,"time_val":"15:04:05.999999000","timestamp_val":"2025-02-18T15:04:05.999999Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"date_val":"0001-01-01","datetime_val":"0001-01-01T00:00:00.000000","id":2,"time_val":"00:00:00","timestamp_val":"0001-01-01T00:00:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"date_val":null,"datetime_val":null,"id":3,"time_val":null,"timestamp_val":null} +{"_meta":{"polled":"","index":3,"row_id":3},"date_val":"9999-12-31","datetime_val":"9999-12-31T23:59:59.999999","id":4,"time_val":"23:59:59.999999000","timestamp_val":"9999-12-31T23:59:59.999999Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.000000","id":5,"time_val":"15:04:05","timestamp_val":"2025-02-18T23:04:05Z"} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"temporaltypes_137023":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} +{"bindingStateV1":{"temporaltypes_137023":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":"","DocumentCount":5}}} diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery index 7e67e0b464..64a562d567 100644 --- a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery @@ -21,7 +21,7 @@ Binding 0: ], "properties": { "_meta": { - "$schema": "http://json-schema.org/draft/2020-12/schema", + "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", "properties": { "polled": { @@ -34,12 +34,29 @@ Binding 0: "type": "integer", "title": "Result Index", "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" } }, "type": "object", "required": [ "polled", - "index" + "index", + "row_id" ] }, "id": { From 1f1e7be7d18ccf7e38a25b3710305ee3b27263e3 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Tue, 25 Feb 2025 18:49:34 -0600 Subject: [PATCH 3/5] source-bigquery-batch: New keyless capture tests --- ...stFeatureFlagKeylessRowID-Disabled-Capture | 42 ++++ ...FeatureFlagKeylessRowID-Disabled-Discovery | 69 ++++++ ...estFeatureFlagKeylessRowID-Enabled-Capture | 43 ++++ ...tFeatureFlagKeylessRowID-Enabled-Discovery | 68 ++++++ .../.snapshots/TestKeylessCapture-Capture | 25 +++ .../.snapshots/TestKeylessCapture-Discovery | 72 +++++++ .../TestKeylessFullRefreshCapture-Capture | 43 ++++ .../TestKeylessFullRefreshCapture-Discovery | 69 ++++++ source-bigquery-batch/main_test.go | 201 ++++++++++++++++++ 9 files changed, 632 insertions(+) create mode 100644 source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Capture create mode 100644 source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Discovery create mode 100644 source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Capture create mode 100644 source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Discovery create mode 100644 source-bigquery-batch/.snapshots/TestKeylessCapture-Capture create mode 100644 source-bigquery-batch/.snapshots/TestKeylessCapture-Discovery create mode 100644 source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Capture create mode 100644 source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Discovery diff --git a/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Capture b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Capture new file mode 100644 index 0000000000..417da3f8e4 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Capture @@ -0,0 +1,42 @@ +# ================================ +# Collection "acmeCo/test/featureflagkeylessrowid_disabled_906545": 34 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Additional row 5","value":5} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Additional row 7","value":7} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Additional row 6","value":6} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":9,"row_id":9},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Additional row 6 (updated)","value":6} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Initial row 4 (updated)","value":4} +{"_meta":{"polled":"","index":9,"row_id":9},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"New row A","value":20} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Additional row 8","value":8} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"featureflagkeylessrowid_disabled_906545":{"DocumentCount":9,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Discovery b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Discovery new file mode 100644 index 0000000000..6ccbcbecc0 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Disabled-Discovery @@ -0,0 +1,69 @@ +Binding 0: +{ + "resource_config_json": { + "name": "featureflagkeylessrowid_disabled_906545", + "schema": "testdata", + "table": "featureflagkeylessrowid_disabled_906545" + }, + "resource_path": [ + "featureflagkeylessrowid_disabled_906545" + ], + "collection": { + "name": "acmeCo/test/featureflagkeylessrowid_disabled_906545", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "featureflagkeylessrowid_disabled_906545" + } + diff --git a/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Capture b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Capture new file mode 100644 index 0000000000..1a46e6b9e5 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Capture @@ -0,0 +1,43 @@ +# ================================ +# Collection "acmeCo/test/featureflagkeylessrowid_enabled_905157": 35 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0,"op":"c"},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":1,"row_id":1,"op":"c"},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":2,"row_id":2,"op":"c"},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":3,"row_id":3,"op":"c"},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":4,"row_id":4,"op":"c"},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":0,"row_id":0,"op":"u"},"data":"Additional row 5","value":5} +{"_meta":{"polled":"","index":1,"row_id":1,"op":"u"},"data":"Additional row 6","value":6} +{"_meta":{"polled":"","index":2,"row_id":2,"op":"u"},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":3,"row_id":3,"op":"u"},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":4,"row_id":4,"op":"u"},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":5,"row_id":5,"op":"c"},"data":"Additional row 7","value":7} +{"_meta":{"polled":"","index":6,"row_id":6,"op":"c"},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":7,"row_id":7,"op":"c"},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":8,"row_id":8,"op":"c"},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":9,"row_id":9,"op":"c"},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":0,"row_id":0,"op":"u"},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":1,"row_id":1,"op":"u"},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":2,"row_id":2,"op":"u"},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":3,"row_id":3,"op":"u"},"data":"Initial row 4 (updated)","value":4} +{"_meta":{"polled":"","index":4,"row_id":4,"op":"u"},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":5,"row_id":5,"op":"u"},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":6,"row_id":6,"op":"u"},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":7,"row_id":7,"op":"u"},"data":"Additional row 6 (updated)","value":6} +{"_meta":{"polled":"","index":8,"row_id":8,"op":"u"},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":9,"row_id":9,"op":"u"},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":0,"row_id":0,"op":"u"},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":1,"row_id":1,"op":"u"},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":2,"row_id":2,"op":"u"},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":3,"row_id":3,"op":"u"},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":4,"row_id":4,"op":"u"},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":5,"row_id":5,"op":"u"},"data":"New row A","value":20} +{"_meta":{"polled":"","index":6,"row_id":6,"op":"u"},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":7,"row_id":7,"op":"u"},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":8,"row_id":8,"op":"u"},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":9,"row_id":9,"op":"d"}} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"featureflagkeylessrowid_enabled_905157":{"DocumentCount":9,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Discovery b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Discovery new file mode 100644 index 0000000000..5247c40e1e --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestFeatureFlagKeylessRowID-Enabled-Discovery @@ -0,0 +1,68 @@ +Binding 0: +{ + "resource_config_json": { + "name": "featureflagkeylessrowid_enabled_905157", + "schema": "testdata", + "table": "featureflagkeylessrowid_enabled_905157" + }, + "resource_path": [ + "featureflagkeylessrowid_enabled_905157" + ], + "collection": { + "name": "acmeCo/test/featureflagkeylessrowid_enabled_905157", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/row_id" + ], + "projections": null + }, + "state_key": "featureflagkeylessrowid_enabled_905157" + } + diff --git a/source-bigquery-batch/.snapshots/TestKeylessCapture-Capture b/source-bigquery-batch/.snapshots/TestKeylessCapture-Capture new file mode 100644 index 0000000000..0ddeb6726e --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestKeylessCapture-Capture @@ -0,0 +1,25 @@ +# ================================ +# Collection "acmeCo/test/keylesscapture_315710": 17 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 0","updated_at":"2025-02-13T12:00:00Z","value":0} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 1","updated_at":"2025-02-13T12:01:00Z","value":1} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 2","updated_at":"2025-02-13T12:02:00Z","value":2} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Initial row 3","updated_at":"2025-02-13T12:03:00Z","value":3} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial row 4","updated_at":"2025-02-13T12:04:00Z","value":4} +{"_meta":{"polled":"","index":0,"row_id":5},"data":"Additional row 5","updated_at":"2025-02-13T12:10:00Z","value":5} +{"_meta":{"polled":"","index":1,"row_id":6},"data":"Additional row 6","updated_at":"2025-02-13T12:11:00Z","value":6} +{"_meta":{"polled":"","index":2,"row_id":7},"data":"Additional row 7","updated_at":"2025-02-13T12:12:00Z","value":7} +{"_meta":{"polled":"","index":3,"row_id":8},"data":"Additional row 8","updated_at":"2025-02-13T12:13:00Z","value":8} +{"_meta":{"polled":"","index":4,"row_id":9},"data":"Additional row 9","updated_at":"2025-02-13T12:14:00Z","value":9} +{"_meta":{"polled":"","index":0,"row_id":10},"data":"Additional row 6 (updated)","updated_at":"2025-02-13T12:15:00Z","value":6} +{"_meta":{"polled":"","index":1,"row_id":11},"data":"Initial row 4 (updated)","updated_at":"2025-02-13T12:15:00Z","value":4} +{"_meta":{"polled":"","index":2,"row_id":12},"data":"Initial row 3 (updated)","updated_at":"2025-02-13T12:15:00Z","value":3} +{"_meta":{"polled":"","index":3,"row_id":13},"data":"Additional row 5 (updated)","updated_at":"2025-02-13T12:15:00Z","value":5} +{"_meta":{"polled":"","index":4,"row_id":14},"data":"Additional row 7 (updated)","updated_at":"2025-02-13T12:15:00Z","value":7} +{"_meta":{"polled":"","index":0,"row_id":15},"data":"Reinserted row 6","updated_at":"2025-02-13T12:20:00Z","value":6} +{"_meta":{"polled":"","index":1,"row_id":16},"data":"Reinserted row 4","updated_at":"2025-02-13T12:20:00Z","value":4} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"keylesscapture_315710":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:20:00Z"],"DocumentCount":17,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestKeylessCapture-Discovery b/source-bigquery-batch/.snapshots/TestKeylessCapture-Discovery new file mode 100644 index 0000000000..aaf6d666c9 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestKeylessCapture-Discovery @@ -0,0 +1,72 @@ +Binding 0: +{ + "resource_config_json": { + "name": "keylesscapture_315710", + "schema": "testdata", + "table": "keylesscapture_315710", + "cursor": [ + "updated_at" + ] + }, + "resource_path": [ + "keylesscapture_315710" + ], + "collection": { + "name": "acmeCo/test/keylesscapture_315710", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "keylesscapture_315710" + } + diff --git a/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Capture b/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Capture new file mode 100644 index 0000000000..c1ca3fa6ac --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Capture @@ -0,0 +1,43 @@ +# ================================ +# Collection "acmeCo/test/keylessfullrefreshcapture_941246": 35 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 4","value":4} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Initial row 3","value":3} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Additional row 6","value":6} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Additional row 5","value":5} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Additional row 7","value":7} +{"_meta":{"polled":"","index":9,"row_id":9},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"Additional row 6 (updated)","value":6} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Initial row 4 (updated)","value":4} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":9,"row_id":9},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":0,"row_id":0},"data":"Additional row 8","value":8} +{"_meta":{"polled":"","index":1,"row_id":1},"data":"Initial row 2","value":2} +{"_meta":{"polled":"","index":2,"row_id":2},"data":"Initial row 1","value":1} +{"_meta":{"polled":"","index":3,"row_id":3},"data":"New row B","value":21} +{"_meta":{"polled":"","index":4,"row_id":4},"data":"Initial row 0","value":0} +{"_meta":{"polled":"","index":5,"row_id":5},"data":"Additional row 7 (updated)","value":7} +{"_meta":{"polled":"","index":6,"row_id":6},"data":"Additional row 9","value":9} +{"_meta":{"polled":"","index":7,"row_id":7},"data":"Additional row 5 (updated)","value":5} +{"_meta":{"polled":"","index":8,"row_id":8},"data":"Initial row 3 (updated)","value":3} +{"_meta":{"polled":"","index":9,"row_id":9},"data":"New row A","value":20} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"keylessfullrefreshcapture_941246":{"DocumentCount":10,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Discovery b/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Discovery new file mode 100644 index 0000000000..c2b9671a64 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestKeylessFullRefreshCapture-Discovery @@ -0,0 +1,69 @@ +Binding 0: +{ + "resource_config_json": { + "name": "keylessfullrefreshcapture_941246", + "schema": "testdata", + "table": "keylessfullrefreshcapture_941246" + }, + "resource_path": [ + "keylessfullrefreshcapture_941246" + ], + "collection": { + "name": "acmeCo/test/keylessfullrefreshcapture_941246", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "keylessfullrefreshcapture_941246" + } + diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index cbf0f96753..723f1870be 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -1023,3 +1023,204 @@ func TestQueryTemplateOverride(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()) }) } + +// TestKeylessCapture exercises discovery and capture from a table without +// a defined primary key, but using a user-specified updated_at cursor. +func TestKeylessCapture(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(data STRING, value INTEGER, updated_at TIMESTAMP)") // No PRIMARY KEY + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "updated_at") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // Initial batch of data + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value, updated_at) VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {fmt.Sprintf("Initial row %d", 0), 0, baseTime.Add(0 * time.Minute)}, + {fmt.Sprintf("Initial row %d", 1), 1, baseTime.Add(1 * time.Minute)}, + {fmt.Sprintf("Initial row %d", 2), 2, baseTime.Add(2 * time.Minute)}, + {fmt.Sprintf("Initial row %d", 3), 3, baseTime.Add(3 * time.Minute)}, + {fmt.Sprintf("Initial row %d", 4), 4, baseTime.Add(4 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // Add more rows with later timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value, updated_at) VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {fmt.Sprintf("Additional row %d", 5), 5, baseTime.Add(10 * time.Minute)}, + {fmt.Sprintf("Additional row %d", 6), 6, baseTime.Add(11 * time.Minute)}, + {fmt.Sprintf("Additional row %d", 7), 7, baseTime.Add(12 * time.Minute)}, + {fmt.Sprintf("Additional row %d", 8), 8, baseTime.Add(13 * time.Minute)}, + {fmt.Sprintf("Additional row %d", 9), 9, baseTime.Add(14 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // Update some rows with new timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("UPDATE %s SET data = CONCAT(data, ' (updated)'), updated_at = @p0 WHERE value = @p1", tableName), + [][]any{ + {baseTime.Add(15 * time.Minute), 3}, + {baseTime.Add(15 * time.Minute), 4}, + {baseTime.Add(15 * time.Minute), 5}, + {baseTime.Add(15 * time.Minute), 6}, + {baseTime.Add(15 * time.Minute), 7}, + })) + cs.Capture(ctx, t, nil) + + // Delete and reinsert some rows + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("DELETE FROM %s WHERE value IN (4, 6)", tableName))) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value, updated_at) VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {"Reinserted row 4", 4, baseTime.Add(20 * time.Minute)}, + {"Reinserted row 6", 6, baseTime.Add(20 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestKeylessFullRefreshCapture exercises discovery and capture from a table +// without a defined primary key, and with the cursor left empty to test +// full-refresh behavior. +func TestKeylessFullRefreshCapture(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(data STRING, value INTEGER)") // No PRIMARY KEY + + // Discover the table and verify discovery snapshot + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + // Initial batch of data + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{ + {fmt.Sprintf("Initial row %d", 0), 0}, + {fmt.Sprintf("Initial row %d", 1), 1}, + {fmt.Sprintf("Initial row %d", 2), 2}, + {fmt.Sprintf("Initial row %d", 3), 3}, + {fmt.Sprintf("Initial row %d", 4), 4}, + })) + cs.Capture(ctx, t, nil) + + // Add more rows - these should appear in the next full refresh + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{ + {fmt.Sprintf("Additional row %d", 5), 5}, + {fmt.Sprintf("Additional row %d", 6), 6}, + {fmt.Sprintf("Additional row %d", 7), 7}, + {fmt.Sprintf("Additional row %d", 8), 8}, + {fmt.Sprintf("Additional row %d", 9), 9}, + })) + cs.Capture(ctx, t, nil) + + // Modify some existing rows - changes should appear in next full refresh + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("UPDATE %s SET data = CONCAT(data, ' (updated)') WHERE value = @p0", tableName), + [][]any{{3}, {4}, {5}, {6}, {7}})) + cs.Capture(ctx, t, nil) + + // Delete some rows and add new ones - changes should appear in next full refresh + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("DELETE FROM %s WHERE value IN (4, 6)", tableName))) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{{"New row A", 20}, {"New row B", 21}})) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestFeatureFlagKeylessRowID exercises discovery and capture from a keyless +// full-refresh table (as in TestKeylessFullRefreshCapture), but with the +// keyless_row_id feature flag explicitly set to true and false in distinct +// subtests. +func TestFeatureFlagKeylessRowID(t *testing.T) { + for _, tc := range []struct { + name string + flag string + }{ + {"Enabled", "keyless_row_id"}, + {"Disabled", "no_keyless_row_id"}, + } { + t.Run(tc.name, func(t *testing.T) { + var ctx, control = context.Background(), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(data STRING, value INTEGER)") // No PRIMARY KEY + + // Create capture spec with specific feature flag + var cs = testCaptureSpec(t) + cs.EndpointSpec.(*Config).Advanced.FeatureFlags = tc.flag + + // Discover the table and verify discovery snapshot + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + + // Empty cursor forces full refresh behavior + setCursorColumns(t, cs.Bindings[0]) + + t.Run("Discovery", func(t *testing.T) { + cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) + }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + // Initial data batch + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{ + {fmt.Sprintf("Initial row %d", 0), 0}, + {fmt.Sprintf("Initial row %d", 1), 1}, + {fmt.Sprintf("Initial row %d", 2), 2}, + {fmt.Sprintf("Initial row %d", 3), 3}, + {fmt.Sprintf("Initial row %d", 4), 4}, + })) + cs.Capture(ctx, t, nil) + + // Add more rows + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{ + {fmt.Sprintf("Additional row %d", 5), 5}, + {fmt.Sprintf("Additional row %d", 6), 6}, + {fmt.Sprintf("Additional row %d", 7), 7}, + {fmt.Sprintf("Additional row %d", 8), 8}, + {fmt.Sprintf("Additional row %d", 9), 9}, + })) + cs.Capture(ctx, t, nil) + + // Modify some existing rows + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("UPDATE %s SET data = CONCAT(data, ' (updated)') WHERE value = @p0", tableName), + [][]any{{3}, {4}, {5}, {6}, {7}})) + cs.Capture(ctx, t, nil) + + // Delete and add new rows + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("DELETE FROM %s WHERE value IN (4, 6)", tableName))) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (data, value) VALUES (@p0, @p1)", tableName), + [][]any{{"New row A", 20}})) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) + }) + } +} From eb7a1fcadb675c42a2a9b962df8a6902372a8a5b Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Tue, 25 Feb 2025 19:06:14 -0600 Subject: [PATCH 4/5] source-bigquery-batch: Option 'Discover Views' --- .../.snapshots/TestCaptureFromView-Capture | 45 ++++++ .../TestCaptureFromView-DiscoveryWithViews | 137 ++++++++++++++++++ .../TestCaptureFromView-DiscoveryWithoutViews | 69 +++++++++ source-bigquery-batch/.snapshots/TestSpec | 5 + source-bigquery-batch/driver.go | 4 +- source-bigquery-batch/main.go | 29 ++-- source-bigquery-batch/main_test.go | 83 ++++++++++- 7 files changed, 357 insertions(+), 15 deletions(-) create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-Capture create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture b/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture new file mode 100644 index 0000000000..917721419d --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture @@ -0,0 +1,45 @@ +# ================================ +# Collection "acmeCo/test/capturefromview_194890": 23 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13T12:00:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":1},"id":1,"name":"Row 1","updated_at":"2025-02-13T12:01:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":2},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:02:00Z","visible":true} +{"_meta":{"polled":"","index":3,"row_id":3},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:03:00Z","visible":false} +{"_meta":{"polled":"","index":4,"row_id":4},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:04:00Z","visible":true} +{"_meta":{"polled":"","index":5,"row_id":5},"id":5,"name":"Row 5","updated_at":"2025-02-13T12:05:00Z","visible":false} +{"_meta":{"polled":"","index":6,"row_id":6},"id":6,"name":"Row 6","updated_at":"2025-02-13T12:06:00Z","visible":true} +{"_meta":{"polled":"","index":7,"row_id":7},"id":7,"name":"Row 7","updated_at":"2025-02-13T12:07:00Z","visible":false} +{"_meta":{"polled":"","index":8,"row_id":8},"id":8,"name":"Row 8","updated_at":"2025-02-13T12:08:00Z","visible":true} +{"_meta":{"polled":"","index":9,"row_id":9},"id":9,"name":"Row 9","updated_at":"2025-02-13T12:09:00Z","visible":false} +{"_meta":{"polled":"","index":0,"row_id":10},"id":10,"name":"Row 10","updated_at":"2025-02-13T12:10:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":11},"id":11,"name":"Row 11","updated_at":"2025-02-13T12:11:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":12},"id":12,"name":"Row 12","updated_at":"2025-02-13T12:12:00Z","visible":true} +{"_meta":{"polled":"","index":3,"row_id":13},"id":13,"name":"Row 13","updated_at":"2025-02-13T12:13:00Z","visible":false} +{"_meta":{"polled":"","index":4,"row_id":14},"id":14,"name":"Row 14","updated_at":"2025-02-13T12:14:00Z","visible":true} +{"_meta":{"polled":"","index":5,"row_id":15},"id":15,"name":"Row 15","updated_at":"2025-02-13T12:15:00Z","visible":false} +{"_meta":{"polled":"","index":6,"row_id":16},"id":16,"name":"Row 16","updated_at":"2025-02-13T12:16:00Z","visible":true} +{"_meta":{"polled":"","index":7,"row_id":17},"id":17,"name":"Row 17","updated_at":"2025-02-13T12:17:00Z","visible":false} +{"_meta":{"polled":"","index":8,"row_id":18},"id":18,"name":"Row 18","updated_at":"2025-02-13T12:18:00Z","visible":true} +{"_meta":{"polled":"","index":9,"row_id":19},"id":19,"name":"Row 19","updated_at":"2025-02-13T12:19:00Z","visible":false} +{"_meta":{"polled":"","index":0,"row_id":20},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:20:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":21},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:20:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":22},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:20:00Z","visible":false} +# ================================ +# Collection "acmeCo/test/capturefromview_227836": 11 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"id":6,"name":"Row 6","updated_at":"2025-02-13T12:06:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"id":8,"name":"Row 8","updated_at":"2025-02-13T12:08:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"id":10,"name":"Row 10","updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"id":12,"name":"Row 12","updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"id":14,"name":"Row 14","updated_at":"2025-02-13T12:14:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"id":16,"name":"Row 16","updated_at":"2025-02-13T12:16:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"id":18,"name":"Row 18","updated_at":"2025-02-13T12:18:00Z"} +{"_meta":{"polled":"","index":0,"row_id":10},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:20:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturefromview_194890":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:20:00Z"],"DocumentCount":23,"LastPolled":""},"capturefromview_227836":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:20:00Z"],"DocumentCount":11,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews new file mode 100644 index 0000000000..72993b328c --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews @@ -0,0 +1,137 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturefromview_194890", + "schema": "testdata", + "table": "capturefromview_194890" + }, + "resource_path": [ + "capturefromview_194890" + ], + "collection": { + "name": "acmeCo/test/capturefromview_194890", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_194890" + } +Binding 1: +{ + "resource_config_json": { + "name": "capturefromview_227836", + "schema": "testdata", + "table": "capturefromview_227836" + }, + "resource_path": [ + "capturefromview_227836" + ], + "collection": { + "name": "acmeCo/test/capturefromview_227836", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_227836" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews new file mode 100644 index 0000000000..9880da201a --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews @@ -0,0 +1,69 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturefromview_194890", + "schema": "testdata", + "table": "capturefromview_194890" + }, + "resource_path": [ + "capturefromview_194890" + ], + "collection": { + "name": "acmeCo/test/capturefromview_194890", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_194890" + } + diff --git a/source-bigquery-batch/.snapshots/TestSpec b/source-bigquery-batch/.snapshots/TestSpec index 26e6e9c272..da9d3d11ba 100644 --- a/source-bigquery-batch/.snapshots/TestSpec +++ b/source-bigquery-batch/.snapshots/TestSpec @@ -25,6 +25,11 @@ }, "advanced": { "properties": { + "discover_views": { + "type": "boolean", + "title": "Discover Views", + "description": "When set views will be automatically discovered as resources. If unset only tables will be discovered." + }, "poll": { "type": "string", "title": "Default Polling Schedule", diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 5e257ad3d0..220d3434a1 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -63,7 +63,7 @@ type BatchSQLDriver struct { ConfigSchema json.RawMessage Connect func(ctx context.Context, cfg *Config) (*bigquery.Client, error) - GenerateResource func(resourceName, schemaName, tableName, tableType string) (*Resource, error) + GenerateResource func(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error) ExcludedSystemSchemas []string SelectQueryTemplate func(res *Resource) (string, error) } @@ -238,7 +238,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove var tableID = table.Schema + "." + table.Name var recommendedName = recommendedCatalogName(table.Name) - var res, err = drv.GenerateResource(recommendedName, table.Schema, table.Name, table.Type) + var res, err = drv.GenerateResource(&cfg, recommendedName, table.Schema, table.Name, table.Type) if err != nil { log.WithFields(log.Fields{ "reason": err, diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index 9f00d7f4a0..46e65d6683 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -35,8 +35,9 @@ type Config struct { } type advancedConfig struct { - PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` - FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` + DiscoverViews bool `json:"discover_views,omitempty" jsonschema:"title=Discover Views,description=When set views will be automatically discovered as resources. If unset only tables will be discovered."` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` + FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` parsedFeatureFlags map[string]bool // Parsed feature flags setting with defaults applied } @@ -168,16 +169,22 @@ func quoteIdentifier(name string) string { return "`" + strings.ReplaceAll(name, "`", "\\`") + "`" } -func generateBigQueryResource(resourceName, schemaName, tableName, tableType string) (*Resource, error) { - if !strings.EqualFold(tableType, "BASE TABLE") { - return nil, fmt.Errorf("discovery will not autogenerate resource configs for entities of type %q, but you may add them manually", tableType) +func generateBigQueryResource(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error) { + if strings.EqualFold(tableType, "BASE TABLE") { + return &Resource{ + Name: resourceName, + SchemaName: schemaName, + TableName: tableName, + }, nil } - - return &Resource{ - Name: resourceName, - SchemaName: schemaName, - TableName: tableName, - }, nil + if strings.EqualFold(tableType, "VIEW") && cfg.Advanced.DiscoverViews { + return &Resource{ + Name: resourceName, + SchemaName: schemaName, + TableName: tableName, + }, nil + } + return nil, fmt.Errorf("unsupported entity type %q", tableType) } var bigqueryDriver = &BatchSQLDriver{ diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index 723f1870be..65d9dfac63 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -260,8 +260,7 @@ func TestSpec(t *testing.T) { // TestQueryTemplate is a unit test which verifies that the default query template produces // the expected output for initial/subsequent polling queries with different cursors. func TestQueryTemplate(t *testing.T) { - res, err := bigqueryDriver.GenerateResource("foobar", "testdata", "foobar", "BASE TABLE") - require.NoError(t, err) + var res = &Resource{Name: "foobar", SchemaName: "testdata", TableName: "foobar"} tmplString, err := bigqueryDriver.SelectQueryTemplate(res) require.NoError(t, err) @@ -1224,3 +1223,83 @@ func TestFeatureFlagKeylessRowID(t *testing.T) { }) } } + +// TestCaptureFromView exercises discovery and capture from a view with an updated_at cursor. +func TestCaptureFromView(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var baseTableName, tableID = testTableName(t, uniqueTableID(t)) + var viewName, viewID = testTableName(t, uniqueTableID(t, "view")) + + // Create base table and view + createTestTable(ctx, t, control, baseTableName, `( + id INTEGER NOT NULL, + name STRING, + visible BOOL, + updated_at TIMESTAMP + )`) + require.NoError(t, executeSetupQuery(ctx, t, control, fmt.Sprintf(` + CREATE VIEW %s AS + SELECT id, name, updated_at + FROM %s + WHERE visible = true`, viewName, baseTableName))) + t.Cleanup(func() { + _ = executeSetupQuery(ctx, t, control, fmt.Sprintf("DROP VIEW IF EXISTS %s", viewName)) + }) + + // By default views should not be discovered. + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID)) + t.Run("DiscoveryWithoutViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + // Enable view discovery and re-discover bindings, then set a cursor for capturing the view. + cs.EndpointSpec.(*Config).Advanced.DiscoverViews = true + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID)) + t.Run("DiscoveryWithViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + // Update both the base table and the view incrementally using the updated_at column. + setCursorColumns(t, cs.Bindings[0], "updated_at") + setCursorColumns(t, cs.Bindings[1], "updated_at") + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // First batch: Insert rows into base table, some visible in view + var firstBatch [][]any + for i := 0; i < 10; i++ { + firstBatch = append(firstBatch, []any{ + i, + fmt.Sprintf("Row %d", i), + i%2 == 0, // Even numbered rows are visible + baseTime.Add(time.Duration(i) * time.Minute), + }) + } + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (@p0, @p1, @p2, @p3)", baseTableName), + firstBatch)) + cs.Capture(ctx, t, nil) + + // Second batch: More rows with later timestamps + var secondBatch [][]any + for i := 10; i < 20; i++ { + secondBatch = append(secondBatch, []any{ + i, + fmt.Sprintf("Row %d", i), + i%2 == 0, // Even numbered rows are visible + baseTime.Add(time.Duration(i) * time.Minute), + }) + } + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (@p0, @p1, @p2, @p3)", baseTableName), + secondBatch)) + cs.Capture(ctx, t, nil) + + // Update some rows to change their visibility + updateTime := baseTime.Add(20 * time.Minute) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("UPDATE %s SET visible = NOT visible, updated_at = @p0 WHERE id IN (@p1, @p2, @p3)", baseTableName), + [][]any{{updateTime, 2, 3, 4}})) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} From 24d37d0ff25be11666db39eeeb6fde2fe4b9c851 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Wed, 26 Feb 2025 17:18:17 -0600 Subject: [PATCH 5/5] source-bigquery-batch: Require `row_id` only when `keyless_row_id` --- .../.snapshots/TestBinaryTypes-Discovery | 3 +-- .../TestCaptureWithDatetimeCursor-Discovery | 3 +-- .../TestCaptureWithUpdatedAtCursor-Discovery | 3 +-- .../.snapshots/TestCompositeTypes-Discovery | 3 +-- .../.snapshots/TestFullRefresh-Discovery | 3 +-- .../.snapshots/TestIntegerTypes-Discovery | 3 +-- .../.snapshots/TestJSONType-Discovery | 3 +-- .../.snapshots/TestNumericTypes-Discovery | 3 +-- .../.snapshots/TestSimpleCapture-Discovery | 3 +-- .../.snapshots/TestStringTypes-Discovery | 3 +-- .../.snapshots/TestTemporalTypes-Discovery | 3 +-- source-bigquery-batch/driver.go | 24 +++++++++---------- 12 files changed, 23 insertions(+), 34 deletions(-) diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery index 9eb2219bfc..51c2c75a74 100644 --- a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery index 27fd4ce4c2..934873c25a 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery index 84c269948e..5d71921047 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery index 68c6a0cee6..ceba593a60 100644 --- a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery index aeeeea1193..7091ad1cb0 100644 --- a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery @@ -52,8 +52,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery index 541d173acd..17db01e692 100644 --- a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Discovery b/source-bigquery-batch/.snapshots/TestJSONType-Discovery index b58b294daa..939897b75c 100644 --- a/source-bigquery-batch/.snapshots/TestJSONType-Discovery +++ b/source-bigquery-batch/.snapshots/TestJSONType-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery index 29a335b33b..77e692a05c 100644 --- a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery index 031fda8d38..aa87744bec 100644 --- a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery index feebe01d7a..8218205ba2 100644 --- a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery index 64a562d567..69d5b88325 100644 --- a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 220d3434a1..6ffc38a022 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -125,13 +125,16 @@ var ( fallbackKeyOld = []string{"/_meta/polled", "/_meta/index"} ) -func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) { +func generateCollectionSchema(cfg *Config, keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) { // Generate schema for the metadata via reflection var reflector = jsonschema.Reflector{ ExpandedStruct: true, DoNotReference: true, } var metadataSchema = reflector.ReflectFromType(reflect.TypeOf(documentMetadata{})) + if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { // Don't include row_id as required on old captures with keyless_row_id off + metadataSchema.Required = slices.DeleteFunc(metadataSchema.Required, func(s string) bool { return s == "row_id" }) + } metadataSchema.Definitions = nil metadataSchema.AdditionalProperties = nil @@ -167,13 +170,6 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsons return json.RawMessage(bs), nil } -var minimalSchema = func() json.RawMessage { - var schema, err = generateCollectionSchema(nil, nil) - if err != nil { - panic(err) - } - return schema -}() // Spec returns metadata about the capture connector. func (drv *BatchSQLDriver) Spec(ctx context.Context, req *pc.Request_Spec) (*pc.Response_Spec, error) { @@ -252,15 +248,19 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove return nil, fmt.Errorf("error serializing resource spec: %w", err) } - // Try to generate a useful collection schema, but on error fall back to the - // minimal schema with a fallback collection key which is always present. - var collectionSchema = minimalSchema + // Start with a minimal schema and a fallback collection key, which will be + // replaced with more useful versions if we have sufficient information. + collectionSchema, err := generateCollectionSchema(&cfg, nil, nil) + if err != nil { + return nil, fmt.Errorf("error generating minimal collection schema: %w", err) + } var collectionKey = fallbackKey if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { collectionKey = fallbackKeyOld } + if tableKey, ok := keysByTable[tableID]; ok { - if generatedSchema, err := generateCollectionSchema(tableKey.Columns, tableKey.ColumnTypes); err == nil { + if generatedSchema, err := generateCollectionSchema(&cfg, tableKey.Columns, tableKey.ColumnTypes); err == nil { collectionSchema = generatedSchema collectionKey = nil for _, colName := range tableKey.Columns {