Skip to content

Commit

Permalink
refactor: migrate target
Browse files Browse the repository at this point in the history
migrate abstract target
AdswerveBigQuery
DatamillCoPostgres
GenericTarget
MeltanoSnowflake
Oracle
  • Loading branch information
mgabelle committed Dec 17, 2024
1 parent e86144d commit 74bdd0e
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.singer.targets;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.singer.AbstractPythonSinger;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -37,11 +38,11 @@ public abstract class AbstractPythonTarget extends AbstractPythonSinger {
)
@NotNull
@Valid
private String from;
private Property<String> from;

protected AbstractPythonTarget.Output runTarget(RunContext runContext) throws Exception {
// from
URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());
Path tempFile = runContext.workingDir().createTempFile();
Files.copy(runContext.storage().getFile(from), tempFile, StandardCopyOption.REPLACE_EXISTING);

Expand All @@ -68,7 +69,7 @@ protected void tapsSync(Path tempFile, RunContext runContext) throws Exception {
this.runSinger(commands, runContext);
}

protected void runSinger(List<String> commands, RunContext runContext) throws Exception {
protected void runSinger(List<String> commands, RunContext runContext) throws Exception {
Flux
.<String>create(
throwConsumer(emitter -> {
Expand Down
52 changes: 22 additions & 30 deletions src/main/java/io/kestra/plugin/singer/targets/AdswerveBigQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,101 +47,93 @@ public class AdswerveBigQuery extends AbstractPythonTarget implements RunnableTa
@Schema(
title = "The Dataset location."
)
@PluginProperty(dynamic = true)
private String location;
private Property<String> location;

@NotNull
@Schema(
title = "Validate every single record message to the corresponding JSON schema.",
description = "This option is disabled by default and invalid RECORD messages will fail only at load time by " +
"Postgres. Enabling this option will detect invalid records earlier but could cause performance degradation.."
)
@PluginProperty
private final Boolean validateRecords = false;
private final Property<Boolean> validateRecords = Property.of(false);

@Schema(
title = "Add singer Metadata columns.",
description = "Add `_time_extracted` and `_time_loaded` metadata columns."
)
@PluginProperty
@Builder.Default
private final Boolean addMetadataColumns = false;
private final Property<Boolean> addMetadataColumns = Property.of(false);

@Schema(
title = "The replication method, `append` or `truncate`."
)
@PluginProperty
@Builder.Default
private final ReplicationMethod replicationMethod = ReplicationMethod.append;
private final Property<ReplicationMethod> replicationMethod = Property.of(ReplicationMethod.append);

@Schema(
title = "Add prefix to table name."
)
@PluginProperty(dynamic = true)
private String tablePrefix;
private Property<String> tablePrefix;

@Schema(
title = "Add suffix to table name."
)
@PluginProperty(dynamic = true)
private String tableSuffix;
private Property<String> tableSuffix;

@Schema(
title = "Maximum cache size in MB."
)
@PluginProperty
@Builder.Default
private final Integer maxCache = 50;
private final Property<Integer> maxCache = Property.of(50);

@Schema(
title = "The JSON service account key as string."
)
@PluginProperty(dynamic = true)
protected String serviceAccount;
protected Property<String> serviceAccount;

@Schema(
title = "Enable control state flush.",
description = "default: merges multiple state messages from the tap into the state file, if true : uses the last state message as the state file."
)
@PluginProperty
@Builder.Default
protected Boolean mergeStateMessages = false;
protected Property<Boolean> mergeStateMessages = Property.of(false);

@Schema(
title = "Table configs."
)
@PluginProperty
protected Map<String, Object> tableConfigs;
protected Property<Map<String, Object>> tableConfigs;

@SneakyThrows
@Override
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("project_id", runContext.render(this.projectId))
.put("dataset_id", runContext.render(this.datasetId))
.put("validate_records", this.validateRecords)
.put("add_metadata_columns", this.addMetadataColumns)
.put("replication_method", this.replicationMethod)
.put("max_cache", this.maxCache);
.put("validate_records", runContext.render(this.validateRecords).as(Boolean.class).orElseThrow())
.put("add_metadata_columns", runContext.render(this.addMetadataColumns).as(Boolean.class).orElseThrow())
.put("replication_method", runContext.render(this.replicationMethod).as(ReplicationMethod.class).orElseThrow())
.put("max_cache", runContext.render(this.maxCache).as(Integer.class).orElseThrow());

if (this.location != null) {
builder.put("location", runContext.render(this.location));
builder.put("location", runContext.render(this.location).as(String.class).orElseThrow());
}

if (this.tablePrefix != null) {
builder.put("table_prefix", runContext.render(this.tablePrefix));
builder.put("table_prefix", runContext.render(this.tablePrefix).as(String.class).orElseThrow());
}

if (this.tableSuffix != null) {
builder.put("table_suffix", runContext.render(this.tableSuffix));
builder.put("table_suffix", runContext.render(this.tableSuffix).as(String.class).orElseThrow());
}

if (this.mergeStateMessages) {
if (runContext.render(this.mergeStateMessages).as(Boolean.class).orElseThrow()) {
builder.put("merge_state_messages", "0");
}

if (this.tableConfigs != null) {
this.writeSingerFiles("table-config.json", runContext.render(this.tableConfigs));
var renderedConfigs = runContext.render(this.tableConfigs).asMap(String.class, Object.class);
if (!renderedConfigs.isEmpty()) {
this.writeSingerFiles("table-config.json", renderedConfigs);
builder.put("table_config", workingDirectory.toAbsolutePath() + "/table-config.json");
}

Expand All @@ -154,7 +146,7 @@ protected Map<String, String> environmentVariables(RunContext runContext) throws
HashMap<String, String> env = new HashMap<>(super.environmentVariables(runContext));

if (this.serviceAccount != null) {
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount));
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount).as(String.class).orElseThrow());
env.put("GOOGLE_APPLICATION_CREDENTIALS", workingDirectory.toAbsolutePath() + "/google-credentials.json");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,78 +45,67 @@ public class DatamillCoPostgres extends AbstractPythonTarget implements Runnable
@Schema(
title = "The database user's password."
)
@PluginProperty(dynamic = true)
private String password;
private Property<String> password;

@Schema(
title = "The database name."
)
@PluginProperty(dynamic = true)
private String dbName;
private Property<String> dbName;

@NotNull
@Schema(
title = "The database port"
)
@PluginProperty
private Integer port;
private Property<Integer> port;

@Schema(
title = "The database schema."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final String schema = "public";
private final Property<String> schema = Property.of("public");

@Schema(
title = "Refer to the [libpq](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) docs for more information about SSL."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final String sslMode = "prefer";
private final Property<String> sslMode = Property.of("prefer");

@Schema(
title = "Crash on invalid records."
)
@PluginProperty
@Builder.Default
private final Boolean invalidRecordsDetect = true;
private final Property<Boolean> invalidRecordsDetect = Property.of(true);

@Schema(
title = "Include a positive value n in your config to allow to encounter at most n invalid records per stream before giving up."
)
@PluginProperty
@Builder.Default
private final Integer invalidRecordsThreshold = 0;
private final Property<Integer> invalidRecordsThreshold = Property.of(0);

@Schema(
title = "The level for logging.",
description = "Set to DEBUG to get things like queries executed, timing of those queries, etc. See Python's Logger Levels for information about valid values."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final String loggingLevel = "INFO";
private final Property<String> loggingLevel = Property.of("INFO");

@Schema(
title = "Whether the Target should create tables which have no records present in Remote."
)
@PluginProperty
@Builder.Default
private final Boolean persistEmptyTables = false;
private final Property<Boolean> persistEmptyTables = Property.of(false);

@Schema(
title = "The maximum number of rows to buffer in memory before writing to the destination table in Postgres."
)
@PluginProperty
@Builder.Default
private final Integer maxBatchRows = 200000;
private final Property<Integer> maxBatchRows = Property.of(200000);

@Schema(
title = "The maximum number of bytes to buffer in memory before writing to the destination table in Postgres."
)
@PluginProperty
@Builder.Default
private final Integer maxBufferSize = 104857600;
private final Property<Integer> maxBufferSize = Property.of(104857600);

@Schema(
title = "How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary.",
Expand All @@ -125,60 +114,56 @@ public class DatamillCoPostgres extends AbstractPythonTarget implements Runnable
" to set as the default is dynamically adjusted to check reasonably often. \n\n" +
"Default is 5000, or 1/40th `maxBatchRows`"
)
@PluginProperty
private Integer batchDetectionThreshold;
private Property<Integer> batchDetectionThreshold;

@Schema(
title = "Whether the Target should create column indexes on the important columns used during data loading.",
description = "These indexes will make data loading slightly slower but the deduplication phase much faster. Defaults to on for better baseline performance."
)
@PluginProperty
@Builder.Default
private final Boolean addUpsertIndexes = true;
private final Property<Boolean> addUpsertIndexes = Property.of(true);

@Schema(
title = "Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target.",
description = "Useful for setup like SET ROLE or other connection state that is important."
)
@PluginProperty
private String beforeRunSql;
private Property<String> beforeRunSql;

@Schema(
title = "Raw SQL statement(s) to before closing the connection to Postgres."
)
@PluginProperty
private String afterRunSql;
private Property<String> afterRunSql;

@Override
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("postgres_username", runContext.render(this.username))
.put("postgres_password", runContext.render(this.password))
.put("postgres_password", runContext.render(this.password).as(String.class).orElse(null))
.put("postgres_host", runContext.render(this.host))
.put("postgres_port", this.port)
.put("postgres_database", runContext.render(this.dbName))
.put("postgres_schema", runContext.render(this.schema))
.put("postgres_sslmode", this.sslMode)
.put("invalid_records_detect", this.invalidRecordsDetect)
.put("invalid_records_threshold", this.invalidRecordsThreshold)
.put("postgres_port", runContext.render(this.port).as(Integer.class).orElseThrow())
.put("postgres_database", runContext.render(this.dbName).as(String.class).orElse(null))
.put("postgres_schema", runContext.render(this.schema).as(String.class).orElseThrow())
.put("postgres_sslmode", runContext.render(this.sslMode).as(String.class).orElseThrow())
.put("invalid_records_detect", runContext.render(this.invalidRecordsDetect).as(Boolean.class).orElseThrow())
.put("invalid_records_threshold", runContext.render(this.invalidRecordsThreshold).as(Integer.class).orElseThrow())
.put("disable_collection", true)
.put("logging_level", this.loggingLevel)
.put("persist_empty_tables", this.persistEmptyTables)
.put("max_batch_rows", this.maxBatchRows)
.put("max_buffer_size", this.maxBufferSize)
.put("logging_level", runContext.render(this.loggingLevel).as(String.class).orElseThrow())
.put("persist_empty_tables", runContext.render(this.persistEmptyTables).as(Boolean.class).orElseThrow())
.put("max_batch_rows", runContext.render(this.maxBatchRows).as(Integer.class).orElseThrow())
.put("max_buffer_size", runContext.render(this.maxBufferSize).as(Integer.class).orElseThrow())
.put("state_support", true)
.put("add_upsert_indexes", this.addUpsertIndexes);
.put("add_upsert_indexes", runContext.render(this.addUpsertIndexes).as(Boolean.class).orElseThrow());

if (this.batchDetectionThreshold != null) {
builder.put("batch_detection_threshold", this.batchDetectionThreshold);
builder.put("batch_detection_threshold", runContext.render(this.batchDetectionThreshold).as(Integer.class).orElseThrow());
}

if (this.beforeRunSql != null) {
builder.put("before_run_sql", runContext.render(this.beforeRunSql));
builder.put("before_run_sql", runContext.render(this.beforeRunSql).as(String.class).orElseThrow());
}

if (this.afterRunSql != null) {
builder.put("after_run_sql", runContext.render(this.afterRunSql));
builder.put("after_run_sql", runContext.render(this.afterRunSql).as(String.class).orElseThrow());
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public class GenericTarget extends AbstractPythonTarget implements RunnableTask<
description = "Will be save on config.json and used as arguments"
)
@PluginProperty(dynamic = true)
private Map<String, Object> configs;
private Property<Map<String, Object>> configs;

@Override
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.render(this.configs);
return runContext.render(this.configs).asMap(String.class, Object.class);
}

@Override
Expand Down
Loading

0 comments on commit 74bdd0e

Please sign in to comment.