Skip to content

Commit

Permalink
IGNITE-24033 Sql schema. Extend Compute API with supporting qualified…
Browse files Browse the repository at this point in the history
… names (#5102)
  • Loading branch information
zstan authored Jan 27, 2025
1 parent 00dbd45 commit 0b3fc07
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@ static BroadcastJobTarget table(String tableName) {
* Creates a job target for partitioned execution. For each partition in the provided table the job will be executed on a node that
* holds the primary replica.
*
* @param tableName Table name.
* @param tableName QualifiedName name.
* @return Job target.
*/
static BroadcastJobTarget table(QualifiedName tableName) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new TableJobTarget(tableName.objectName());
return new TableJobTarget(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,36 @@
package org.apache.ignite.compute;

import java.util.Objects;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.Nullable;

/**
* Colocated job execution target. Indicates a node that hosts the data for the specified key in the provided table.
* Colocated job execution target. Indicates a node that hosts the data for the specified key in the provided {@link QualifiedName}.
*/
public class ColocatedJobTarget implements JobTarget {
private final String tableName;
private final QualifiedName name;

private final Object key;

private final @Nullable Mapper<?> keyMapper;

ColocatedJobTarget(String tableName, Object key, @Nullable Mapper<?> keyMapper) {
Objects.requireNonNull(tableName);
ColocatedJobTarget(QualifiedName name, Object key, @Nullable Mapper<?> keyMapper) {
Objects.requireNonNull(name);
Objects.requireNonNull(key);

if (keyMapper == null && !(key instanceof Tuple)) {
throw new IllegalArgumentException("Key must be an instance of Tuple when keyMapper is not provided.");
}

this.tableName = tableName;
this.name = name;
this.key = key;
this.keyMapper = keyMapper;
}

public String tableName() {
return tableName;
public QualifiedName tableName() {
return name;
}

public Object key() {
Expand Down
18 changes: 8 additions & 10 deletions modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,16 @@ static JobTarget colocated(String tableName, Tuple key) {
}

/**
* Creates a colocated job target for a specific table and key.
* Creates a colocated job target for a specific {@link QualifiedName} and key.
*
* <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table.
*
* @param tableName Table name.
* @param name QualifiedName name.
* @param key Key.
* @return Job target.
*/
static JobTarget colocated(QualifiedName tableName, Tuple key) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new ColocatedJobTarget(tableName.objectName(), key, null);
static JobTarget colocated(QualifiedName name, Tuple key) {
return new ColocatedJobTarget(name, key, null);
}

/**
Expand All @@ -123,16 +122,15 @@ static <K> JobTarget colocated(String tableName, K key, Mapper<K> keyMapper) {
}

/**
* Creates a colocated job target for a specific table and key with mapper.
* Creates a colocated job target for a specific {@link QualifiedName} and key with mapper.
*
* <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table.
*
* @param tableName Table name.
* @param name QualifiedName name.
* @param key Key.
* @return Job target.
*/
static <K> JobTarget colocated(QualifiedName tableName, K key, Mapper<K> keyMapper) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new ColocatedJobTarget(tableName.objectName(), key, keyMapper);
static <K> JobTarget colocated(QualifiedName name, K key, Mapper<K> keyMapper) {
return new ColocatedJobTarget(name, key, keyMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
package org.apache.ignite.compute;

import java.util.Objects;
import org.apache.ignite.table.QualifiedName;

/**
* Partitioned broadcast execution target. Indicates that the job will be executed on nodes that hold primary replicas of the provided
* table's partitions.
*/
public class TableJobTarget implements BroadcastJobTarget {
private final String tableName;
private final QualifiedName tableName;

TableJobTarget(String tableName) {
TableJobTarget(QualifiedName tableName) {
Objects.requireNonNull(tableName);

this.tableName = tableName;
}

public String tableName() {
public QualifiedName tableName() {
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.ignite.internal.client.table.PartitionAwarenessProvider;
import org.apache.ignite.internal.compute.BroadcastJobExecutionImpl;
import org.apache.ignite.internal.compute.FailedExecution;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.ViewUtils;
Expand All @@ -75,7 +74,6 @@
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
Expand Down Expand Up @@ -153,7 +151,7 @@ public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(
return mapSubmitFutures(futures, descriptor, cancellationToken);
} else if (target instanceof TableJobTarget) {
TableJobTarget tableJobTarget = (TableJobTarget) target;
String tableName = tableJobTarget.tableName();
QualifiedName tableName = tableJobTarget.tableName();
return getTable(tableName)
.thenCompose(table -> table.partitionManager().primaryReplicasAsync())
.thenCompose(replicas -> {
Expand Down Expand Up @@ -216,10 +214,12 @@ private <T, R> CompletableFuture<SubmitResult> submit0(JobTarget target, JobDesc
ColocatedJobTarget colocatedTarget = (ColocatedJobTarget) target;
var mapper = (Mapper<? super Object>) colocatedTarget.keyMapper();

QualifiedName qualifiedName = colocatedTarget.tableName();

if (mapper != null) {
return doExecuteColocatedAsync(colocatedTarget.tableName(), colocatedTarget.key(), mapper, descriptor, arg);
return doExecuteColocatedAsync(qualifiedName, colocatedTarget.key(), mapper, descriptor, arg);
} else {
return doExecuteColocatedAsync(colocatedTarget.tableName(), (Tuple) colocatedTarget.key(), descriptor, arg);
return doExecuteColocatedAsync(qualifiedName, (Tuple) colocatedTarget.key(), descriptor, arg);
}
}

Expand Down Expand Up @@ -247,7 +247,7 @@ public <T, R> Collection<R> execute(
}

private <T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(
String tableName,
QualifiedName tableName,
Tuple key,
JobDescriptor<T, R> descriptor,
T arg
Expand All @@ -264,7 +264,7 @@ private <T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(
}

private <K, T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(
String tableName,
QualifiedName tableName,
K key,
Mapper<K> keyMapper,
JobDescriptor<T, R> descriptor,
Expand Down Expand Up @@ -410,7 +410,7 @@ private static <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(
}

private <T, R> CompletableFuture<SubmitResult> doExecutePartitionedAsync(
String tableName,
QualifiedName tableName,
Partition partition,
JobDescriptor<T, R> descriptor,
@Nullable T arg
Expand Down Expand Up @@ -450,19 +450,17 @@ private static <T, R> CompletableFuture<SubmitResult> executePartitioned(
);
}

// TODO IGNITE-24033 Compute API should use QualifiedName.
private CompletableFuture<ClientTable> getTable(String tableName) {
private CompletableFuture<ClientTable> getTable(QualifiedName tableName) {
// Cache tables by name to avoid extra network call on every executeColocated.
QualifiedName qualifiedName = QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, tableName);
var cached = tableCache.get(qualifiedName);
var cached = tableCache.get(tableName);

if (cached != null) {
return completedFuture(cached);
}

return tables.tableAsync(qualifiedName).thenApply(t -> {
return tables.tableAsync(tableName).thenApply(t -> {
if (t == null) {
throw new TableNotFoundException(qualifiedName);
throw new TableNotFoundException(tableName);
}

ClientTable clientTable = (ClientTable) t;
Expand All @@ -473,7 +471,7 @@ private CompletableFuture<ClientTable> getTable(String tableName) {
}

private CompletableFuture<SubmitResult> handleMissingTable(
String tableName,
QualifiedName tableName,
SubmitResult res,
Throwable err,
Supplier<CompletableFuture<SubmitResult>> retry
Expand All @@ -487,9 +485,7 @@ private CompletableFuture<SubmitResult> handleMissingTable(

if (clientEx.code() == TABLE_ID_NOT_FOUND_ERR) {
// Table was dropped - remove from cache.
// TODO IGNITE-24033 Make Client API use QualifiedName.
QualifiedName qualifiedName = QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, tableName);
tableCache.remove(qualifiedName);
tableCache.remove(tableName);

return retry.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
*/
@SuppressWarnings("AssignmentToStaticFieldFromInstanceMethod")
public class ClientComputeTest extends BaseIgniteAbstractTest {
private static final String TABLE_NAME = "tbl1";
private static final String TABLE_NAME = "TBL1";

private FakeIgnite ignite1;
private FakeIgnite ignite2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public <T, R> CompletableFuture<JobExecution<R>> submitAsync(
if (target instanceof ColocatedJobTarget) {
ColocatedJobTarget colocatedTarget = (ColocatedJobTarget) target;
var mapper = (Mapper<? super Object>) colocatedTarget.keyMapper();
String tableName = colocatedTarget.tableName();
QualifiedName tableName = colocatedTarget.tableName();
Object key = colocatedTarget.key();

CompletableFuture<JobExecution<ComputeJobDataHolder>> jobFut;
Expand Down Expand Up @@ -440,13 +440,11 @@ public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitPartitionedIn
));
}

private CompletableFuture<TableViewInternal> requiredTable(String tableName) {
QualifiedName qualifiedName = QualifiedName.fromSimple(tableName);

return tables.tableViewAsync(qualifiedName)
private CompletableFuture<TableViewInternal> requiredTable(QualifiedName tableName) {
return tables.tableViewAsync(tableName)
.thenApply(table -> {
if (table == null) {
throw new TableNotFoundException(qualifiedName);
throw new TableNotFoundException(tableName);
}
return table;
});
Expand Down

0 comments on commit 0b3fc07

Please sign in to comment.