Skip to content

Commit

Permalink
check response
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 19, 2025
1 parent 787a9f8 commit 0e058d1
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 25 deletions.
9 changes: 7 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ message FastInsertRequest {
}

message FastInsertResponse {
// Optional error message for failed task.
string error_message = 1;
enum Status {
UNSPECIFIED = 0;
SUCCEEDED = 1;
DML_FAILED = 2;
}
Status status = 1;
string error_message = 2;
}

message CancelTaskRequest {
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/fast_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ mod tests {

let json_value = json!(header_map);
let jsonb_val = JsonbVal::from(json_value);
// Add 4 ListValues to ArrayBuilder
builder.append(Some(jsonb_val.as_scalar_ref()));

// Use builder to obtain a single (List) column DataChunk
Expand Down
18 changes: 14 additions & 4 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{FastInsertNode, TaskOutputId};
use risingwave_pb::task_service::task_service_server::TaskService;
use risingwave_pb::task_service::{
CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, FastInsertRequest,
FastInsertResponse, GetDataResponse, TaskInfoResponse,
fast_insert_response, CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest,
FastInsertRequest, FastInsertResponse, GetDataResponse, TaskInfoResponse,
};
use risingwave_storage::dispatch_state_store;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -132,9 +132,19 @@ impl TaskService for BatchServiceImpl {
let res = self.do_fast_insert(insert_node, req.wait_epoch).await;
match res {
Ok(_) => Ok(Response::new(FastInsertResponse {
error_message: String::from("success"),
status: fast_insert_response::Status::Succeeded.into(),
error_message: "".to_string(),
})),
Err(e) => Err(e.into()),
Err(e) => match e {
BatchError::Dml(e) => Ok(Response::new(FastInsertResponse {
status: fast_insert_response::Status::DmlFailed.into(),
error_message: format!("{}", e.as_report()),
})),
_ => {
error!(error = %e.as_report(), "failed to fast insert");
Err(e.into())
}
},
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,6 @@ impl QueryExecution {
compute_client_pool: ComputeClientPoolRef,
catalog_reader: CatalogReader,
) -> HashMap<StageId, Arc<StageExecution>> {
// println!(
// "WKXLOG self.query.stage_graph.stages.len(): {}, self.query.stage_graph.stages: {:?}",
// self.query.stage_graph.stages.len(),
// self.query.stage_graph.stages
// );
let mut stage_executions: HashMap<StageId, Arc<StageExecution>> =
HashMap::with_capacity(self.query.stage_graph.stages.len());

Expand Down
7 changes: 3 additions & 4 deletions src/frontend/src/scheduler/fast_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::batch_plan::FastInsertNode;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::task_service::FastInsertRequest;
use risingwave_pb::task_service::{FastInsertRequest, FastInsertResponse};

use crate::catalog::TableId;
use crate::scheduler::{SchedulerError, SchedulerResult};
Expand Down Expand Up @@ -55,7 +55,7 @@ impl FastInsertExecution {
}
}

pub async fn my_execute(self) -> SchedulerResult<()> {
pub async fn my_execute(self) -> SchedulerResult<FastInsertResponse> {
let workers = self.choose_worker(
&TableId::new(self.fast_insert_node.table_id),
self.fast_insert_node.session_id,
Expand All @@ -67,8 +67,7 @@ impl FastInsertExecution {
wait_epoch: self.wait_for_persistence,
};
let response = client.fast_insert(request).await?;
println!("WKXLOG response: {:?}", response);
Ok(())
Ok(response)
}

#[inline(always)]
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ impl LocalQueryExecution {
let plan_fragment = self.create_plan_fragment()?;
let plan_node = plan_fragment.root.unwrap();

// println!("WKXLOG plan_node: {:?}", plan_node);

let executor = ExecutorBuilder::new(
&plan_node,
&task_id,
Expand All @@ -123,8 +121,6 @@ impl LocalQueryExecution {

let executor = executor.build().await?;

// println!("WKXLOG executor: {:?}", executor);

#[for_await]
for chunk in executor.execute() {
yield chunk?;
Expand Down
20 changes: 15 additions & 5 deletions src/frontend/src/webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub(super) mod handlers {
use risingwave_pb::batch_plan::FastInsertNode;
use risingwave_pb::catalog::WebhookSourceInfo;
use risingwave_pb::plan_common::DefaultColumns;
use risingwave_pb::task_service::fast_insert_response;
// use risingwave_sqlparser::ast::{Query, SetExpr, Statement, Value, Values};
use utils::{header_map_to_json, verify_signature};

Expand Down Expand Up @@ -133,10 +134,13 @@ pub(super) mod handlers {
}

let mut builder = JsonbArrayBuilder::with_type(1, DataType::Jsonb);
// TODO(kexiang): handle errors
let json_value = Value::from_text(&body.to_vec()).unwrap();
let json_value = Value::from_text(&body.to_vec()).map_err(|e| {
err(
anyhow!(e).context("Failed to parse body"),
StatusCode::UNPROCESSABLE_ENTITY,
)
})?;
let jsonb_val = JsonbVal::from(json_value);
// Add 4 ListValues to ArrayBuilder
builder.append(Some(jsonb_val.as_scalar_ref()));

// Use builder to obtain a single (List) column DataChunk
Expand All @@ -150,8 +154,14 @@ pub(super) mod handlers {
session.clone(),
);
let res = execution.my_execute().await.unwrap();

Ok(())
if res.status == fast_insert_response::Status::Succeeded as i32 {
Ok(())
} else {
Err(err(
anyhow!("Failed to fast insert: {}", res.error_message),
StatusCode::INTERNAL_SERVER_ERROR,
))
}
}

async fn acquire_table_info(
Expand Down

0 comments on commit 0e058d1

Please sign in to comment.