-
Notifications
You must be signed in to change notification settings - Fork 610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): resolve $ref
in json schema
#20446
Changes from 2 commits
ba34fed
207a272
183d4dc
86b5082
0f15841
510e13f
b9aa44c
c4ab9d7
94c9b00
f4d076d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,11 +12,170 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::collections::HashMap; | ||
use std::{fs, mem}; | ||
|
||
use anyhow::Context; | ||
use risingwave_pb::plan_common::ColumnDesc; | ||
use serde_json::Value; | ||
use thiserror::Error; | ||
use url::Url; | ||
|
||
use super::avro::{avro_schema_to_column_descs, MapHandling}; | ||
|
||
#[derive(Debug, Error, thiserror_ext::ContextInto)] | ||
pub enum Error { | ||
#[error("could not open schema from {filename}")] | ||
SchemaFromFile { | ||
filename: String, | ||
source: std::io::Error, | ||
}, | ||
#[error("parse error for url {url}")] | ||
UrlParse { | ||
url: String, | ||
source: url::ParseError, | ||
}, | ||
#[error("schema from {url} not valid JSON")] | ||
SchemaNotJson { url: String, source: std::io::Error }, | ||
#[error("request error")] | ||
Request { url: String, source: reqwest::Error }, | ||
#[error("schema from {url} not valid JSON")] | ||
SchemaNotJsonSerde { | ||
url: String, | ||
source: serde_json::Error, | ||
}, | ||
#[error("ref `{ref_string}` can not be resolved as pointer, `{ref_fragment}` can not be found in the schema")] | ||
JsonRefPointerNotFound { | ||
ref_string: String, | ||
ref_fragment: String, | ||
}, | ||
#[error("json ref error")] | ||
JSONRef { | ||
#[from] | ||
source: std::io::Error, | ||
}, | ||
#[error("need url to be a file or a http based, got {url}")] | ||
UnsupportedUrl { url: String }, | ||
#[error(transparent)] | ||
Uncategorized( | ||
#[from] | ||
#[backtrace] | ||
anyhow::Error, | ||
), | ||
} | ||
|
||
type Result<T, E = Error> = std::result::Result<T, E>; | ||
|
||
#[derive(Debug)] | ||
pub struct JsonRef { | ||
schema_cache: HashMap<String, Value>, | ||
reference_key: Option<String>, | ||
} | ||
|
||
impl JsonRef { | ||
fn new() -> JsonRef { | ||
JsonRef { | ||
schema_cache: HashMap::new(), | ||
reference_key: Some("__reference__".to_owned()), | ||
} | ||
} | ||
|
||
async fn deref_value(&mut self, value: &mut Value, id: String) -> Result<()> { | ||
let id_url = Url::parse(&id).into_url_parse(&id)?.to_string(); | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.schema_cache.insert(id_url.clone(), value.clone()); | ||
self.deref(value, id_url, &vec![]).await?; | ||
Ok(()) | ||
} | ||
|
||
async fn deref( | ||
&mut self, | ||
value: &mut Value, | ||
id: String, | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
used_refs: &Vec<String>, | ||
) -> Result<()> { | ||
if let Some(obj) = value.as_object_mut() { | ||
if let Some(ref_value) = obj.remove("$ref") { | ||
if let Some(ref_string) = ref_value.as_str() { | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let id_url = Url::parse(&id).into_url_parse(&id)?; | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let ref_url = id_url.join(ref_string).into_url_parse(ref_string)?; | ||
let mut ref_url_no_fragment = ref_url.clone(); | ||
ref_url_no_fragment.set_fragment(None); | ||
let url_schema = ref_url_no_fragment.scheme(); | ||
let ref_no_fragment = ref_url_no_fragment.to_string(); | ||
|
||
let mut schema = match self.schema_cache.get(&ref_no_fragment) { | ||
Some(cached_schema) => cached_schema.clone(), | ||
None => { | ||
if url_schema == "http" || url_schema == "https" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure we want to support schema reference from http/fs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this feature a customer request? Please provide more context, thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. User only requested schema ref resolution without an external link. I thought the HTTP support can be a general approve to support schema registry. But I just realized there is a version segment after the subject. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, we do allow users using http and file. So it won't hurt to add those support. It's just for the schema registry, we need to investigate more into how it works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
https://json-schema.org/understanding-json-schema/structuring There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's true a for a deref library to focus on derefing instead of fetching (in fact it's referencing do). But if we want do do so we will have to ask users to provide all the URL in the schema ref or parse the schema to get all the $ref recurrently before ref. |
||
reqwest::get(ref_url_no_fragment) | ||
.await | ||
.into_request(&ref_no_fragment)? | ||
.json() | ||
.await | ||
.into_request(&ref_no_fragment)? | ||
} else if url_schema == "file" { | ||
let file_path = | ||
ref_url_no_fragment.to_file_path().map_err(|_| { | ||
anyhow::anyhow!( | ||
"could not convert url {} to file path", | ||
&ref_url_no_fragment | ||
) | ||
})?; | ||
let file = fs::File::open(file_path) | ||
.into_schema_from_file(&ref_no_fragment)?; | ||
serde_json::from_reader(file) | ||
.into_schema_not_json_serde(ref_no_fragment.clone())? | ||
} else { | ||
return Err(Error::UnsupportedUrl { | ||
url: ref_no_fragment, | ||
}); | ||
} | ||
} | ||
}; | ||
|
||
if !self.schema_cache.contains_key(&ref_no_fragment) { | ||
self.schema_cache | ||
.insert(ref_no_fragment.clone(), schema.clone()); | ||
} | ||
|
||
let ref_url_string = ref_url.to_string(); | ||
if let Some(ref_fragment) = ref_url.fragment() { | ||
schema = schema | ||
.pointer(ref_fragment) | ||
.ok_or(Error::JsonRefPointerNotFound { | ||
ref_string: ref_string.to_owned(), | ||
ref_fragment: ref_fragment.to_owned(), | ||
})? | ||
.clone(); | ||
} | ||
// Do not deref a url twice to prevent infinite loops | ||
if used_refs.contains(&ref_url_string) { | ||
return Ok(()); | ||
} | ||
let mut new_used_refs = used_refs.clone(); | ||
new_used_refs.push(ref_url_string); | ||
|
||
Box::pin(self.deref(&mut schema, ref_no_fragment, &new_used_refs)).await?; | ||
let old_value = mem::replace(value, schema); | ||
|
||
if let Some(reference_key) = &self.reference_key { | ||
if let Some(new_obj) = value.as_object_mut() { | ||
new_obj.insert(reference_key.clone(), old_value); | ||
} | ||
} | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
if let Some(obj) = value.as_object_mut() { | ||
for obj_value in obj.values_mut() { | ||
Box::pin(self.deref(obj_value, id.clone(), used_refs)).await? | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl crate::JsonSchema { | ||
/// FIXME: when the JSON schema is invalid, it will panic. | ||
/// | ||
|
@@ -26,7 +185,8 @@ impl crate::JsonSchema { | |
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280> | ||
/// | ||
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. | ||
pub fn json_schema_to_columns(&self) -> anyhow::Result<Vec<ColumnDesc>> { | ||
pub async fn json_schema_to_columns(&mut self, uri: String) -> anyhow::Result<Vec<ColumnDesc>> { | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
JsonRef::new().deref_value(&mut self.0, uri).await?; | ||
let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string(); | ||
let schema = | ||
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
use std::fs; | ||
yuhao-su marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
use risingwave_common::catalog::ColumnDesc; | ||
use risingwave_connector_codec::JsonSchema; | ||
|
||
use crate::utils::{ColumnDescTestDisplay, *}; | ||
|
||
#[tokio::test] | ||
async fn test_json_schema_parse() { | ||
let test_id_type_file_path = fs::canonicalize("tests/test_data/id_type.json") | ||
.unwrap() | ||
.to_string_lossy() | ||
.to_string(); | ||
let test_id_type_http_url = "https://gist.githubusercontent.com/yuhao-su/a1b23e4073b4f1ca4e614c89a785575d/raw/ec8ccd6b3bcf6fafe5c57173a4fdf4129c63625d/idType.txt"; | ||
|
||
let schema = format!( | ||
r##" {{ | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"type": "object", | ||
"definitions": {{ | ||
"stringType": {{ | ||
"type": "string" | ||
}}, | ||
"marketObj": {{ | ||
"type": "object", | ||
"additionalProperties": {{ | ||
"$ref": "{test_id_type_http_url}#/definitions/idType" | ||
}} | ||
}}, | ||
"marketArray": {{ | ||
"type": "object", | ||
"additionalProperties": {{ | ||
"type": "array", | ||
"items": {{ | ||
"type": "string" | ||
}} | ||
}} | ||
}} | ||
}}, | ||
"properties": {{ | ||
"id": {{ | ||
"$ref": "file://{test_id_type_file_path}#/definitions/idType" | ||
}}, | ||
"name": {{ | ||
"$ref": "#/definitions/marketObj", | ||
"description": "Name of the market subject" | ||
}}, | ||
"cats": {{ | ||
"$ref": "#/definitions/marketArray" | ||
}}, | ||
"meta": {{ | ||
"type": "object", | ||
"properties": {{ | ||
"active": {{ | ||
"$ref": "#/definitions/marketObj" | ||
}}, | ||
"tags": {{ | ||
"$ref": "#/definitions/marketArray" | ||
}} | ||
}} | ||
}} | ||
}}, | ||
"required": [ | ||
"id", | ||
"name" | ||
] | ||
}}"## | ||
); | ||
|
||
let mut json_schema = JsonSchema::parse_str(&schema).unwrap(); | ||
|
||
let columns = json_schema | ||
.json_schema_to_columns("http://test_schema_uri.test".to_string()) | ||
.await | ||
.unwrap() | ||
.into_iter() | ||
.map(|c| ColumnDesc::from(c)) | ||
.collect_vec(); | ||
let column_display = columns | ||
.iter() | ||
.map(|c| ColumnDescTestDisplay(c)) | ||
.collect_vec(); | ||
|
||
expect![[r#" | ||
{ | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"definitions": { | ||
"marketArray": { | ||
"additionalProperties": { | ||
"items": { | ||
"type": "string" | ||
}, | ||
"type": "array" | ||
}, | ||
"type": "object" | ||
}, | ||
"marketObj": { | ||
"additionalProperties": { | ||
"__reference__": {}, | ||
"type": "string" | ||
}, | ||
"type": "object" | ||
}, | ||
"stringType": { | ||
"type": "string" | ||
} | ||
}, | ||
"properties": { | ||
"cats": { | ||
"__reference__": {}, | ||
"additionalProperties": { | ||
"items": { | ||
"type": "string" | ||
}, | ||
"type": "array" | ||
}, | ||
"type": "object" | ||
}, | ||
"id": { | ||
"__reference__": {}, | ||
"type": "string" | ||
}, | ||
"meta": { | ||
"properties": { | ||
"active": { | ||
"__reference__": {}, | ||
"additionalProperties": { | ||
"__reference__": {}, | ||
"type": "string" | ||
}, | ||
"type": "object" | ||
}, | ||
"tags": { | ||
"__reference__": {}, | ||
"additionalProperties": { | ||
"items": { | ||
"type": "string" | ||
}, | ||
"type": "array" | ||
}, | ||
"type": "object" | ||
} | ||
}, | ||
"type": "object" | ||
}, | ||
"name": { | ||
"__reference__": { | ||
"description": "Name of the market subject" | ||
}, | ||
"additionalProperties": { | ||
"__reference__": {}, | ||
"type": "string" | ||
}, | ||
"type": "object" | ||
} | ||
}, | ||
"required": [ | ||
"id", | ||
"name" | ||
], | ||
"type": "object" | ||
}"#]] | ||
.assert_eq(&serde_json::to_string_pretty(&json_schema.0).unwrap()); | ||
|
||
expect![[r#" | ||
[ | ||
cats(#1): Jsonb, | ||
id(#2): Varchar, | ||
meta(#3): Struct { | ||
active: Jsonb, | ||
tags: Jsonb, | ||
}, | ||
name(#4): Jsonb, | ||
] | ||
"#]] | ||
.assert_debug_eq(&column_display); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledge that
$id
and$anchor
are currently ignored and unsupported. To be honest I feel like this shall be supported by the json schema library rather than by a hack in the caller that replaces the schema.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did check the jsonschema library. It's more like designed for validation and is vary hard to use it for dereffing the schema file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move the impl to a standalone crate? I feel that the kernel should not touch the logic of JsonRef.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We may need to hack it to support confluent schema registry. The confluent schema registry should be handled differently. IIUC they can use reference name to retrieve a reference instead of URI.