Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): resolve $ref in json schema #20446

Merged
merged 10 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ prost = { workspace = true, features = ["no-recursion-limit"] }
prost-reflect = { version = "0.14", features = ["serde"] }
prost-types = "0.13"
protox = "0.7"
reqwest = { version = "0.12.2", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
Expand All @@ -32,6 +33,7 @@ thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
tracing = "0.1"
url = "2"

[dev-dependencies]
expect-test = "1"
Expand Down
162 changes: 161 additions & 1 deletion src/connector/codec/src/decoder/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,170 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::{fs, mem};

use anyhow::Context;
use risingwave_pb::plan_common::ColumnDesc;
use serde_json::Value;
use thiserror::Error;
use url::Url;

use super::avro::{avro_schema_to_column_descs, MapHandling};

#[derive(Debug, Error, thiserror_ext::ContextInto)]
pub enum Error {
#[error("could not open schema from {filename}")]
SchemaFromFile {
filename: String,
source: std::io::Error,
},
#[error("parse error for url {url}")]
UrlParse {
url: String,
source: url::ParseError,
},
#[error("schema from {url} not valid JSON")]
SchemaNotJson { url: String, source: std::io::Error },
#[error("request error")]
Request { url: String, source: reqwest::Error },
#[error("schema from {url} not valid JSON")]
SchemaNotJsonSerde {
url: String,
source: serde_json::Error,
},
#[error("ref `{ref_string}` can not be resolved as pointer, `{ref_fragment}` can not be found in the schema")]
JsonRefPointerNotFound {
ref_string: String,
ref_fragment: String,
},
#[error("json ref error")]
JSONRef {
#[from]
source: std::io::Error,
},
#[error("need url to be a file or a http based, got {url}")]
UnsupportedUrl { url: String },
#[error(transparent)]
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
),
}

type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug)]
pub struct JsonRef {
Comment on lines +91 to +92
Copy link
Contributor

@xiangjinwu xiangjinwu Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledge that $id and $anchor are currently ignored and unsupported. To be honest I feel like this shall be supported by the json schema library rather than by a hack in the caller that replaces the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did check the jsonschema library. It's more like designed for validation and is vary hard to use it for dereffing the schema file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move the impl to a standalone crate? I feel that the kernel should not touch the logic of JsonRef.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We may need to hack it to support confluent schema registry. The confluent schema registry should be handled differently. IIUC they can use reference name to retrieve a reference instead of URI.

schema_cache: HashMap<String, Value>,
reference_key: Option<String>,
}

impl JsonRef {
fn new() -> JsonRef {
JsonRef {
schema_cache: HashMap::new(),
reference_key: Some("__reference__".to_owned()),
}
}

async fn deref_value(&mut self, value: &mut Value, id: String) -> Result<()> {
let id_url = Url::parse(&id).into_url_parse(&id)?.to_string();
self.schema_cache.insert(id_url.clone(), value.clone());
self.deref(value, id_url, &vec![]).await?;
Ok(())
}

async fn deref(
&mut self,
value: &mut Value,
id: String,
used_refs: &Vec<String>,
) -> Result<()> {
if let Some(obj) = value.as_object_mut() {
if let Some(ref_value) = obj.remove("$ref") {
if let Some(ref_string) = ref_value.as_str() {
let id_url = Url::parse(&id).into_url_parse(&id)?;
let ref_url = id_url.join(ref_string).into_url_parse(ref_string)?;
let mut ref_url_no_fragment = ref_url.clone();
ref_url_no_fragment.set_fragment(None);
let url_schema = ref_url_no_fragment.scheme();
let ref_no_fragment = ref_url_no_fragment.to_string();

let mut schema = match self.schema_cache.get(&ref_no_fragment) {
Some(cached_schema) => cached_schema.clone(),
None => {
if url_schema == "http" || url_schema == "https" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want to support schema reference from http/fs?
Should we support retrieve references from schema registry instead? https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#schema-references-in-json-schemas

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this feature a customer request? Please provide more context, thanks.

Copy link
Contributor Author

@yuhao-su yuhao-su Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User only requested schema ref resolution without an external link.

I thought the HTTP support can be a general approve to support schema registry. But I just realized there is a version segment after the subject. I'll change this pr to only support schema resolution without external resource for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we do allow users using http and file. So it won't hurt to add those support.

It's just for the schema registry, we need to investigate more into how it works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though schemas are identified by URIs, those identifiers are not necessarily network-addressable. They are just identifiers. Generally, implementations don't make HTTP requests (https://) or read from the file system (file://) to fetch schemas. Instead, they provide a way to load schemas into an internal schema database. When a schema is referenced by it's URI identifier, the schema is retrieved from the internal schema database.

https://json-schema.org/understanding-json-schema/structuring

Copy link
Contributor Author

@yuhao-su yuhao-su Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they provide a way to load schemas into an internal schema database

It's true a for a deref library to focus on derefing instead of fetching (in fact it's referencing do). But if we want do do so we will have to ask users to provide all the URL in the schema ref or parse the schema to get all the $ref recurrently before ref.

reqwest::get(ref_url_no_fragment)
.await
.into_request(&ref_no_fragment)?
.json()
.await
.into_request(&ref_no_fragment)?
} else if url_schema == "file" {
let file_path =
ref_url_no_fragment.to_file_path().map_err(|_| {
anyhow::anyhow!(
"could not convert url {} to file path",
&ref_url_no_fragment
)
})?;
let file = fs::File::open(file_path)
.into_schema_from_file(&ref_no_fragment)?;
serde_json::from_reader(file)
.into_schema_not_json_serde(ref_no_fragment.clone())?
} else {
return Err(Error::UnsupportedUrl {
url: ref_no_fragment,
});
}
}
};

if !self.schema_cache.contains_key(&ref_no_fragment) {
self.schema_cache
.insert(ref_no_fragment.clone(), schema.clone());
}

let ref_url_string = ref_url.to_string();
if let Some(ref_fragment) = ref_url.fragment() {
schema = schema
.pointer(ref_fragment)
.ok_or(Error::JsonRefPointerNotFound {
ref_string: ref_string.to_owned(),
ref_fragment: ref_fragment.to_owned(),
})?
.clone();
}
// Do not deref a url twice to prevent infinite loops
if used_refs.contains(&ref_url_string) {
return Ok(());
}
let mut new_used_refs = used_refs.clone();
new_used_refs.push(ref_url_string);

Box::pin(self.deref(&mut schema, ref_no_fragment, &new_used_refs)).await?;
let old_value = mem::replace(value, schema);

if let Some(reference_key) = &self.reference_key {
if let Some(new_obj) = value.as_object_mut() {
new_obj.insert(reference_key.clone(), old_value);
}
}
}
}
}

if let Some(obj) = value.as_object_mut() {
for obj_value in obj.values_mut() {
Box::pin(self.deref(obj_value, id.clone(), used_refs)).await?
}
}
Ok(())
}
}

impl crate::JsonSchema {
/// FIXME: when the JSON schema is invalid, it will panic.
///
Expand All @@ -26,7 +185,8 @@ impl crate::JsonSchema {
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
pub fn json_schema_to_columns(&self) -> anyhow::Result<Vec<ColumnDesc>> {
pub async fn json_schema_to_columns(&mut self, uri: String) -> anyhow::Result<Vec<ColumnDesc>> {
JsonRef::new().deref_value(&mut self.0, uri).await?;
let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string();
let schema =
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
Expand Down
177 changes: 177 additions & 0 deletions src/connector/codec/tests/integration_tests/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use std::fs;

use risingwave_common::catalog::ColumnDesc;
use risingwave_connector_codec::JsonSchema;

use crate::utils::{ColumnDescTestDisplay, *};

#[tokio::test]
async fn test_json_schema_parse() {
let test_id_type_file_path = fs::canonicalize("tests/test_data/id_type.json")
.unwrap()
.to_string_lossy()
.to_string();
let test_id_type_http_url = "https://gist.githubusercontent.com/yuhao-su/a1b23e4073b4f1ca4e614c89a785575d/raw/ec8ccd6b3bcf6fafe5c57173a4fdf4129c63625d/idType.txt";

let schema = format!(
r##" {{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"definitions": {{
"stringType": {{
"type": "string"
}},
"marketObj": {{
"type": "object",
"additionalProperties": {{
"$ref": "{test_id_type_http_url}#/definitions/idType"
}}
}},
"marketArray": {{
"type": "object",
"additionalProperties": {{
"type": "array",
"items": {{
"type": "string"
}}
}}
}}
}},
"properties": {{
"id": {{
"$ref": "file://{test_id_type_file_path}#/definitions/idType"
}},
"name": {{
"$ref": "#/definitions/marketObj",
"description": "Name of the market subject"
}},
"cats": {{
"$ref": "#/definitions/marketArray"
}},
"meta": {{
"type": "object",
"properties": {{
"active": {{
"$ref": "#/definitions/marketObj"
}},
"tags": {{
"$ref": "#/definitions/marketArray"
}}
}}
}}
}},
"required": [
"id",
"name"
]
}}"##
);

let mut json_schema = JsonSchema::parse_str(&schema).unwrap();

let columns = json_schema
.json_schema_to_columns("http://test_schema_uri.test".to_string())
.await
.unwrap()
.into_iter()
.map(|c| ColumnDesc::from(c))
.collect_vec();
let column_display = columns
.iter()
.map(|c| ColumnDescTestDisplay(c))
.collect_vec();

expect![[r#"
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"marketArray": {
"additionalProperties": {
"items": {
"type": "string"
},
"type": "array"
},
"type": "object"
},
"marketObj": {
"additionalProperties": {
"__reference__": {},
"type": "string"
},
"type": "object"
},
"stringType": {
"type": "string"
}
},
"properties": {
"cats": {
"__reference__": {},
"additionalProperties": {
"items": {
"type": "string"
},
"type": "array"
},
"type": "object"
},
"id": {
"__reference__": {},
"type": "string"
},
"meta": {
"properties": {
"active": {
"__reference__": {},
"additionalProperties": {
"__reference__": {},
"type": "string"
},
"type": "object"
},
"tags": {
"__reference__": {},
"additionalProperties": {
"items": {
"type": "string"
},
"type": "array"
},
"type": "object"
}
},
"type": "object"
},
"name": {
"__reference__": {
"description": "Name of the market subject"
},
"additionalProperties": {
"__reference__": {},
"type": "string"
},
"type": "object"
}
},
"required": [
"id",
"name"
],
"type": "object"
}"#]]
.assert_eq(&serde_json::to_string_pretty(&json_schema.0).unwrap());

expect![[r#"
[
cats(#1): Jsonb,
id(#2): Varchar,
meta(#3): Struct {
active: Jsonb,
tags: Jsonb,
},
name(#4): Jsonb,
]
"#]]
.assert_debug_eq(&column_display);
}
1 change: 1 addition & 0 deletions src/connector/codec/tests/integration_tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
//! Because the expected data type for data mapping comes from the schema mapping.

mod avro;
mod json;
mod protobuf;

pub mod utils;
Loading
Loading