Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): webhook waiting for persistency. #20164

Merged
merged 27 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions e2e_test/webhook/check_1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ select data ->> 'source', data->> 'auth_algo' from segment_hmac_sha1;
segment hmac_sha1

query TT
select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
hubspot sha256_v2
github hmac_sha1
7 changes: 6 additions & 1 deletion e2e_test/webhook/check_2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ query TT
select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
----
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2

query TT
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
github hmac_sha1
7 changes: 6 additions & 1 deletion e2e_test/webhook/check_3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
----
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2

query TT
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
github hmac_sha1
13 changes: 13 additions & 0 deletions e2e_test/webhook/create_table.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,16 @@ create table hubspot_sha256_v2 (
, 'UTF8')
), 'hex')
);

statement ok
create table test_primary_key (
data JSONB PRIMARY KEY
) WITH (
connector = 'webhook',
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'x-hub-signature',
'sha1=' || encode(hmac(test_secret, data, 'sha1'), 'hex')
);

statement error Adding/dropping a column of a table with webhook has not been implemented.
ALTER TABLE github_hmac_sha1 ADD COLUMN new_col int;
3 changes: 3 additions & 0 deletions e2e_test/webhook/drop_table.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
DROP TABLE test_primary_key;

statement ok
DROP TABLE hubspot_sha256_v2;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/webhook/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ def send_github_hmac_sha1(secret):
send_webhook(url, headers, payload_json)


def send_test_primary_key(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "hmac_sha1"
url = SERVER_URL + "test_primary_key"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha1', "sha1=")
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


def send_github_hmac_sha256(secret):
payload = message
payload['source'] = "github"
Expand Down Expand Up @@ -143,3 +159,6 @@ def send_hubspot_sha256_v2(secret):
send_segment_hmac_sha1(secret)
# hubspot
send_hubspot_sha256_v2(secret)

# ensure the single column can still work as normal
send_test_primary_key(secret)
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ message StreamSourceInfo {
message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
expr.ExprNode signature_expr = 2;
// Return until the data is persisted in the storage layer or not. Default is true.
bool wait_for_persistence = 3;
}

message Source {
Expand Down
31 changes: 31 additions & 0 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ message GetDataResponse {
data.DataChunk record_batch = 2;
}

message FastInsertRequest {
// Id of the table to perform inserting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 2;
repeated uint32 column_indices = 3;
data.DataChunk data_chunk = 4;

// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
// be filled in streaming.
optional uint32 row_id_index = 5;

// Use this number to assign the insert req to different worker nodes and dml channels.
uint32 request_id = 6;
bool wait_for_persistence = 7;
// TODO(kexiang): add support for default columns. plan_common.ExprContext expr_context is needed for it.
}

message FastInsertResponse {
enum Status {
UNSPECIFIED = 0;
SUCCEEDED = 1;
DML_FAILED = 2;
}
Status status = 1;
string error_message = 2;
}

message ExecuteRequest {
batch_plan.TaskId task_id = 1;
batch_plan.PlanFragment plan = 2;
Expand All @@ -73,6 +102,8 @@ service TaskService {
// Cancel an already-died (self execution-failure, previous aborted, completed) task will still succeed.
rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse);
rpc Execute(ExecuteRequest) returns (stream GetDataResponse);
// A lightweight version insert, only for non-pgwire insert, such as inserts from webhooks and websockets.
rpc FastInsert(FastInsertRequest) returns (FastInsertResponse);
}

message GetDataRequest {
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
either = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod tests {
assert_eq!(*chunk.columns()[2], array);
});

assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_));
assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(..));
let epoch = u64::MAX;
let full_range = (Bound::Unbounded, Bound::Unbounded);
let store_content = store
Expand Down
Loading
Loading