diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 74e315b6cd656..ed919639901a1 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -51,10 +51,9 @@ impl SplitReader for MqttSplitReader { source_ctx: SourceContextRef, _columns: Option>, ) -> Result { - let (client, eventloop) = properties.common.build_client( - source_ctx.source_info.actor_id, - source_ctx.source_info.fragment_id as u64, - )?; + let (client, eventloop) = properties + .common + .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?; let qos = properties.common.qos(); diff --git a/src/connector/src/with_options_test.rs b/src/connector/src/with_options_test.rs index 4ead1685244d8..fd234e880e469 100644 --- a/src/connector/src/with_options_test.rs +++ b/src/connector/src/with_options_test.rs @@ -38,6 +38,10 @@ fn common_mod_path() -> PathBuf { connector_crate_path().join("src").join("common.rs") } +fn mqtt_common_mod_path() -> PathBuf { + connector_crate_path().join("src").join("mqtt_common.rs") +} + pub fn generate_with_options_yaml_source() -> String { generate_with_options_yaml_inner(&source_mod_path()) } @@ -63,6 +67,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String { for entry in walkdir::WalkDir::new(path) .into_iter() .chain(walkdir::WalkDir::new(common_mod_path())) + .chain(walkdir::WalkDir::new(mqtt_common_mod_path())) { let entry = entry.expect("Failed to read directory entry"); if entry.path().extension() == Some("rs".as_ref()) {