diff --git a/Cargo.lock b/Cargo.lock index e564d37474703..b9d671525b57b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11055,6 +11055,7 @@ dependencies = [ "prost-reflect", "prost-types 0.13.4", "protox", + "reqwest 0.12.4", "risingwave_common", "risingwave_pb", "rust_decimal", @@ -11063,6 +11064,7 @@ dependencies = [ "thiserror-ext", "time", "tracing", + "url", "workspace-hack", ] diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index 50b3a53887404..ce559c0fc1692 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -24,6 +24,7 @@ prost = { workspace = true, features = ["no-recursion-limit"] } prost-reflect = { version = "0.14", features = ["serde"] } prost-types = "0.13" protox = "0.7" +reqwest = { version = "0.12.2", features = ["json"] } risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" @@ -32,6 +33,7 @@ thiserror = "1" thiserror-ext = { workspace = true } time = "0.3.30" tracing = "0.1" +url = "2" [dev-dependencies] expect-test = "1" diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs index 4dec2e43b8367..2d470c62a86cf 100644 --- a/src/connector/codec/src/decoder/json/mod.rs +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -12,11 +12,181 @@ // See the License for the specific language governing permissions and // limitations under the License. +// The MIT License (MIT) +// +// Copyright (c) 2021 David Raznick +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::collections::HashMap; +use std::fs; + use anyhow::Context; use risingwave_pb::plan_common::ColumnDesc; +use serde_json::Value; +use thiserror::Error; +use url::Url; use super::avro::{avro_schema_to_column_descs, MapHandling}; +#[derive(Debug, Error, thiserror_ext::ContextInto)] +pub enum Error { + #[error("could not open schema from {filename}")] + SchemaFromFile { + filename: String, + source: std::io::Error, + }, + #[error("parse error for url {url}")] + UrlParse { + url: String, + source: url::ParseError, + }, + #[error("schema from {url} not valid JSON")] + SchemaNotJson { url: String, source: std::io::Error }, + #[error("request error")] + Request { url: String, source: reqwest::Error }, + #[error("schema from {url} not valid JSON")] + SchemaNotJsonSerde { + url: String, + source: serde_json::Error, + }, + #[error("ref `{ref_string}` can not be resolved as pointer, `{ref_fragment}` can not be found in the schema")] + JsonRefPointerNotFound { + ref_string: String, + ref_fragment: String, + }, + #[error("json ref error")] + JsonRef { + #[from] + source: std::io::Error, + }, + #[error("need url to be a file or a http based, got {url}")] + UnsupportedUrl { url: String }, + #[error(transparent)] + Uncategorized( + #[from] + #[backtrace] + anyhow::Error, + ), +} + +type Result = std::result::Result; + +#[derive(Debug)] +pub struct JsonRef { + schema_cache: HashMap, +} + +impl JsonRef { + fn new() -> JsonRef { + JsonRef { + schema_cache: HashMap::new(), + } + } + + async fn deref_value(&mut self, value: &mut Value, retrieval_url: &Url) -> Result<()> { + self.schema_cache + .insert(retrieval_url.to_string(), value.clone()); + self.deref(value, retrieval_url, &vec![]).await?; + Ok(()) + } + + async fn deref( + &mut self, + value: &mut Value, + base_url: &Url, + used_refs: &Vec, + ) -> Result<()> { + if let Some(obj) = value.as_object_mut() + && let Some(ref_value) = obj.remove("$ref") + && let Some(ref_string) = ref_value.as_str() + { + let ref_url = base_url.join(ref_string).into_url_parse(ref_string)?; + let mut ref_url_no_fragment = ref_url.clone(); + ref_url_no_fragment.set_fragment(None); + let url_schema = ref_url_no_fragment.scheme(); + let ref_no_fragment = ref_url_no_fragment.to_string(); + + let mut schema = match self.schema_cache.get(&ref_no_fragment) { + Some(cached_schema) => cached_schema.clone(), + None => { + if url_schema == "http" || url_schema == "https" { + reqwest::get(ref_url_no_fragment.clone()) + .await + .into_request(&ref_no_fragment)? + .json() + .await + .into_request(&ref_no_fragment)? + } else if url_schema == "file" { + let file_path = ref_url_no_fragment.to_file_path().map_err(|_| { + anyhow::anyhow!( + "could not convert url {} to file path", + &ref_url_no_fragment + ) + })?; + let file = + fs::File::open(file_path).into_schema_from_file(&ref_no_fragment)?; + serde_json::from_reader(file) + .into_schema_not_json_serde(ref_no_fragment.clone())? + } else { + return Err(Error::UnsupportedUrl { + url: ref_no_fragment, + }); + } + } + }; + + if !self.schema_cache.contains_key(&ref_no_fragment) { + self.schema_cache + .insert(ref_no_fragment.clone(), schema.clone()); + } + + let ref_url_string = ref_url.to_string(); + if let Some(ref_fragment) = ref_url.fragment() { + schema = schema + .pointer(ref_fragment) + .ok_or(Error::JsonRefPointerNotFound { + ref_string: ref_string.to_owned(), + ref_fragment: ref_fragment.to_owned(), + })? + .clone(); + } + // Do not deref a url twice to prevent infinite loops + if used_refs.contains(&ref_url_string) { + return Ok(()); + } + let mut new_used_refs = used_refs.clone(); + new_used_refs.push(ref_url_string); + Box::pin(self.deref(&mut schema, &ref_url_no_fragment, &new_used_refs)).await?; + + *value = schema; + } + + if let Some(obj) = value.as_object_mut() { + for obj_value in obj.values_mut() { + Box::pin(self.deref(obj_value, base_url, used_refs)).await? + } + } + Ok(()) + } +} + impl crate::JsonSchema { /// FIXME: when the JSON schema is invalid, it will panic. /// @@ -26,7 +196,13 @@ impl crate::JsonSchema { /// /// /// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. - pub fn json_schema_to_columns(&self) -> anyhow::Result> { + pub async fn json_schema_to_columns( + &mut self, + retrieval_url: Url, + ) -> anyhow::Result> { + JsonRef::new() + .deref_value(&mut self.0, &retrieval_url) + .await?; let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string(); let schema = apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; diff --git a/src/connector/codec/tests/integration_tests/json.rs b/src/connector/codec/tests/integration_tests/json.rs new file mode 100644 index 0000000000000..32abd1fd640f2 --- /dev/null +++ b/src/connector/codec/tests/integration_tests/json.rs @@ -0,0 +1,234 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fs; + +use risingwave_common::catalog::ColumnDesc; +use risingwave_connector_codec::JsonSchema; +use url::Url; + +use crate::utils::{ColumnDescTestDisplay, *}; + +#[tokio::test] +async fn test_json_schema_parse() { + let test_id_type_file_path = fs::canonicalize("tests/test_data/id_type.json") + .unwrap() + .to_string_lossy() + .to_string(); + let test_id_type_http_url = "https://gist.githubusercontent.com/yuhao-su/a1b23e4073b4f1ca4e614c89a785575d/raw/ec8ccd6b3bcf6fafe5c57173a4fdf4129c63625d/idType.txt"; + + let schema = format!( + r##" {{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "definitions": {{ + "stringType": {{ + "type": "string" + }}, + "marketObj": {{ + "type": "object", + "additionalProperties": {{ + "$ref": "{test_id_type_http_url}#/definitions/idType" + }} + }}, + "marketArray": {{ + "type": "object", + "additionalProperties": {{ + "type": "array", + "items": {{ + "type": "string" + }} + }} + }}, + "recurrentType": {{ + "type": "object", + "properties": {{ + "id": {{ + "$ref": "file://{test_id_type_file_path}#/definitions/idType" + }}, + "next": {{ + "$ref": "file://{test_id_type_file_path}#/definitions/recurrentType" + }} + }} + }} + }}, + "properties": {{ + "id": {{ + "$ref": "file://{test_id_type_file_path}#/definitions/idType" + }}, + "name": {{ + "$ref": "#/definitions/marketObj", + "description": "Name of the market subject" + }}, + "cats": {{ + "$ref": "#/definitions/marketArray" + }}, + "meta": {{ + "type": "object", + "properties": {{ + "active": {{ + "$ref": "#/definitions/marketObj" + }}, + "tags": {{ + "$ref": "#/definitions/marketArray" + }} + }} + }}, + "recurrent": {{ + "$ref": "#/definitions/recurrentType" + }} + }}, + "required": [ + "id", + "name" + ] + }}"## + ); + + let mut json_schema = JsonSchema::parse_str(&schema).unwrap(); + + let columns = json_schema + .json_schema_to_columns(Url::parse("http://test_schema_uri.test").unwrap()) + .await + .unwrap() + .into_iter() + .map(ColumnDesc::from) + .collect_vec(); + let column_display = columns.iter().map(ColumnDescTestDisplay).collect_vec(); + + expect![[r#" + { + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "marketArray": { + "additionalProperties": { + "items": { + "type": "string" + }, + "type": "array" + }, + "type": "object" + }, + "marketObj": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "recurrentType": { + "properties": { + "id": { + "type": "string" + }, + "next": { + "properties": { + "id": { + "type": "string" + }, + "next": {} + }, + "type": "object" + } + }, + "type": "object" + }, + "stringType": { + "type": "string" + } + }, + "properties": { + "cats": { + "additionalProperties": { + "items": { + "type": "string" + }, + "type": "array" + }, + "type": "object" + }, + "id": { + "type": "string" + }, + "meta": { + "properties": { + "active": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "tags": { + "additionalProperties": { + "items": { + "type": "string" + }, + "type": "array" + }, + "type": "object" + } + }, + "type": "object" + }, + "name": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "recurrent": { + "properties": { + "id": { + "type": "string" + }, + "next": { + "properties": { + "id": { + "type": "string" + }, + "next": {} + }, + "type": "object" + } + }, + "type": "object" + } + }, + "required": [ + "id", + "name" + ], + "type": "object" + }"#]] + .assert_eq(&serde_json::to_string_pretty(&json_schema.0).unwrap()); + + expect![[r#" + [ + cats(#1): Jsonb, + id(#2): Varchar, + meta(#3): Struct { + active: Jsonb, + tags: Jsonb, + }, + name(#4): Jsonb, + recurrent(#5): Struct { + id: Varchar, + next: Struct { + id: Varchar, + next: Varchar, + }, + }, + ] + "#]] + .assert_debug_eq(&column_display); +} diff --git a/src/connector/codec/tests/integration_tests/main.rs b/src/connector/codec/tests/integration_tests/main.rs index 23800219a39ce..14dc94776c3ce 100644 --- a/src/connector/codec/tests/integration_tests/main.rs +++ b/src/connector/codec/tests/integration_tests/main.rs @@ -37,6 +37,7 @@ //! Because the expected data type for data mapping comes from the schema mapping. mod avro; +mod json; mod protobuf; pub mod utils; diff --git a/src/connector/codec/tests/test_data/id_type.json b/src/connector/codec/tests/test_data/id_type.json new file mode 100644 index 0000000000000..758e5469f13e0 --- /dev/null +++ b/src/connector/codec/tests/test_data/id_type.json @@ -0,0 +1,18 @@ +{ + "definitions": { + "idType": { + "type": "string" + }, + "recurrentType": { + "type": "object", + "properties": { + "id": { + "$ref": "#/definitions/idType" + }, + "next": { + "$ref": "#/definitions/recurrentType" + } + } + } + } +} \ No newline at end of file diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 5b397aca34d0a..03d6020c8287b 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -84,8 +84,8 @@ pub async fn fetch_json_schema_and_map_to_columns( props: &BTreeMap, ) -> ConnectorResult> { let url = handle_sr_list(schema_location)?; - let json_schema = if let Some(schema_registry_auth) = schema_registry_auth { - let client = Client::new(url, &schema_registry_auth)?; + let mut json_schema = if let Some(schema_registry_auth) = schema_registry_auth { + let client = Client::new(url.clone(), &schema_registry_auth)?; let topic = get_kafka_topic(props)?; let schema = client .get_schema_by_subject(&format!("{}-value", topic)) @@ -96,7 +96,10 @@ pub async fn fetch_json_schema_and_map_to_columns( let bytes = bytes_from_url(url, None).await?; JsonSchema::parse_bytes(&bytes)? }; - json_schema.json_schema_to_columns().map_err(Into::into) + json_schema + .json_schema_to_columns(url.first().unwrap().clone()) + .await + .map_err(Into::into) } #[cfg(test)]