diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 9ec54ebede1d2..1ee0876bc431f 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -72,6 +72,8 @@ Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method in (DataGeneratorSource.java:120) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method in (DataGeneratorSource.java:141) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> has parameter of type in (DataGeneratorSource.java:0) +Constructor (org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.datagen.source.GeneratorFunction)> calls method in (DoubleEmittingSourceReaderWithCheckpointsInBetween.java:73) +Constructor (org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.datagen.source.GeneratorFunction, java.util.function.BooleanSupplier)> calls method in (DoubleEmittingSourceReaderWithCheckpointsInBetween.java:66) Constructor (org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.datagen.source.GeneratorFunction)> calls method in (GeneratingIteratorSourceReader.java:46) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method in (GeneratorSourceReaderFactory.java:54) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method in (GeneratorSourceReaderFactory.java:55) @@ -299,6 +301,8 @@ Method is annotated with in (FromElementsGeneratorFunction.java:0) Method calls method in (FromElementsGeneratorFunction.java:97) Method calls method in (FromElementsGeneratorFunction.java:151) +Method is annotated with in (IndexLookupGeneratorFunction.java:0) +Method calls method in (IndexLookupGeneratorFunction.java:128) Method is annotated with in (DataGeneratorSource.java:0) Method calls constructor (org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in (GeneratorSourceReaderFactory.java:63) Method calls method in (GeneratorSourceReaderFactory.java:62) @@ -311,30 +315,30 @@ Method has return type in (RandomGeneratorVisitor.java:0) Method has return type in (RandomGeneratorVisitor.java:0) Method has return type in (RandomGeneratorVisitor.java:0) -Method calls method in (RandomGeneratorVisitor.java:364) -Method calls method in (RandomGeneratorVisitor.java:236) -Method calls method in (RandomGeneratorVisitor.java:237) -Method calls method in (RandomGeneratorVisitor.java:126) -Method calls method in (RandomGeneratorVisitor.java:126) -Method calls method in (RandomGeneratorVisitor.java:141) -Method calls method in (RandomGeneratorVisitor.java:305) -Method calls method in (RandomGeneratorVisitor.java:306) -Method calls method in (RandomGeneratorVisitor.java:262) -Method calls method in (RandomGeneratorVisitor.java:263) -Method calls method in (RandomGeneratorVisitor.java:249) -Method calls method in (RandomGeneratorVisitor.java:250) -Method calls method in (RandomGeneratorVisitor.java:223) -Method calls method in (RandomGeneratorVisitor.java:224) -Method calls method in (RandomGeneratorVisitor.java:343) -Method calls method in (RandomGeneratorVisitor.java:428) -Method calls method in (RandomGeneratorVisitor.java:390) -Method calls method in (RandomGeneratorVisitor.java:388) -Method calls method in (RandomGeneratorVisitor.java:209) -Method calls method in (RandomGeneratorVisitor.java:211) -Method calls method in (RandomGeneratorVisitor.java:319) -Method calls method in (RandomGeneratorVisitor.java:195) -Method calls method in (RandomGeneratorVisitor.java:197) -Method calls method in (RandomGeneratorVisitor.java:203) +Method calls method in (RandomGeneratorVisitor.java:352) +Method calls method in (RandomGeneratorVisitor.java:231) +Method calls method in (RandomGeneratorVisitor.java:232) +Method calls method in (RandomGeneratorVisitor.java:137) +Method calls method in (RandomGeneratorVisitor.java:137) +Method calls method in (RandomGeneratorVisitor.java:144) +Method calls method in (RandomGeneratorVisitor.java:300) +Method calls method in (RandomGeneratorVisitor.java:301) +Method calls method in (RandomGeneratorVisitor.java:257) +Method calls method in (RandomGeneratorVisitor.java:258) +Method calls method in (RandomGeneratorVisitor.java:244) +Method calls method in (RandomGeneratorVisitor.java:245) +Method calls method in (RandomGeneratorVisitor.java:218) +Method calls method in (RandomGeneratorVisitor.java:219) +Method calls method in (RandomGeneratorVisitor.java:338) +Method calls method in (RandomGeneratorVisitor.java:402) +Method calls method in (RandomGeneratorVisitor.java:371) +Method calls method in (RandomGeneratorVisitor.java:369) +Method calls method in (RandomGeneratorVisitor.java:204) +Method calls method in (RandomGeneratorVisitor.java:206) +Method calls method in (RandomGeneratorVisitor.java:314) +Method calls method in (RandomGeneratorVisitor.java:190) +Method calls method in (RandomGeneratorVisitor.java:192) +Method calls method in (RandomGeneratorVisitor.java:179) Method calls method in (RandomGeneratorVisitor.java:158) Method calls method in (RandomGeneratorVisitor.java:172) Method calls method in (RandomGeneratorVisitor.java:292) @@ -361,6 +365,9 @@ Method has return type in (FileSink.java:0) Method calls constructor (org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.BulkWriter$Factory)> in (FileSink.java:674) Method has return type in (FileSink.java:0) +Method calls method in (FileSink.java:638) +Method calls method in (FileSink.java:639) +Method calls method in (FileSink.java:640) Method calls method in (FileSink.java:590) Method calls method in (FileSink.java:591) Method calls method in (FileSink.java:668) @@ -464,20 +471,20 @@ Method has return type in (CompactService.java:0) Method calls constructor (java.lang.String)> in (CompactService.java:70) Method calls method in (CompactService.java:70) -Method calls constructor ([B)> in (CompactorOperator.java:299) -Method calls method in (CompactorOperator.java:323) -Method calls method in (CompactorOperator.java:326) +Method calls constructor ([B)> in (CompactorOperator.java:301) +Method calls method in (CompactorOperator.java:325) +Method calls method in (CompactorOperator.java:328) Method has parameter of type in (CompactorOperator.java:0) -Method calls constructor (int)> in (CompactorOperator.java:290) -Method calls method in (CompactorOperator.java:293) -Method calls method in (CompactorOperator.java:291) -Method calls method in (CompactorOperator.java:313) -Method calls method in (CompactorOperator.java:315) +Method calls constructor (int)> in (CompactorOperator.java:292) +Method calls method in (CompactorOperator.java:295) +Method calls method in (CompactorOperator.java:293) +Method calls method in (CompactorOperator.java:315) +Method calls method in (CompactorOperator.java:317) Method has parameter of type in (CompactorOperator.java:0) Method calls constructor (int, int, java.lang.Long, int, int, int)> in (CompactorOperator.java:254) -Method calls constructor (java.lang.Object, java.lang.Long, int)> in (CompactorOperator.java:260) +Method calls constructor (java.lang.Object, java.lang.Long, int)> in (CompactorOperator.java:262) Method calls constructor (java.lang.Object)> in (CompactorOperator.java:256) -Method calls constructor (java.lang.Object)> in (CompactorOperator.java:261) +Method calls constructor (java.lang.Object)> in (CompactorOperator.java:263) Method calls method in (CompactorOperator.java:250) Method calls method in (CompactorOperator.java:251) Method calls method in (CompactorOperator.java:262) @@ -719,8 +726,8 @@ Method calls method in (BatchCompactCoordinator.java:83) Method has generic parameter type > with type argument depending on in (BatchCompactCoordinator.java:0) Method has parameter of type in (BatchCompactCoordinator.java:0) -Method calls method in (BatchCompactOperator.java:141) -Method calls constructor (java.lang.Object)> in (BatchCompactOperator.java:124) +Method calls method in (BatchCompactOperator.java:142) +Method calls constructor (java.lang.Object)> in (BatchCompactOperator.java:125) Method calls method in (BatchCompactOperator.java:94) Method calls method in (BatchCompactOperator.java:94) Method calls method in (BatchCompactOperator.java:103) @@ -737,29 +744,29 @@ Method calls method in (BatchFileWriter.java:116) Method has generic parameter type > with type argument depending on in (BatchFileWriter.java:0) Method has parameter of type in (BatchFileWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:104) +Method calls method in (AbstractStreamingWriter.java:106) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:109) +Method calls method in (AbstractStreamingWriter.java:111) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:165) +Method calls method in (AbstractStreamingWriter.java:167) Method calls method in (AbstractStreamingWriter.java:90) -Method calls method in (AbstractStreamingWriter.java:155) -Method calls method in (AbstractStreamingWriter.java:156) -Method calls constructor (org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:120) -Method calls method in (AbstractStreamingWriter.java:99) -Method calls method in (AbstractStreamingWriter.java:113) -Method calls method in (AbstractStreamingWriter.java:120) +Method calls method in (AbstractStreamingWriter.java:157) +Method calls method in (AbstractStreamingWriter.java:158) +Method calls constructor (org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:122) +Method calls method in (AbstractStreamingWriter.java:101) +Method calls method in (AbstractStreamingWriter.java:115) +Method calls method in (AbstractStreamingWriter.java:122) Method calls method in (AbstractStreamingWriter.java:98) -Method calls method in (AbstractStreamingWriter.java:140) -Method calls method in (AbstractStreamingWriter.java:143) -Method calls method in (AbstractStreamingWriter.java:141) -Method calls method in (AbstractStreamingWriter.java:143) -Method calls method in (AbstractStreamingWriter.java:142) +Method calls method in (AbstractStreamingWriter.java:142) +Method calls method in (AbstractStreamingWriter.java:145) +Method calls method in (AbstractStreamingWriter.java:143) +Method calls method in (AbstractStreamingWriter.java:145) +Method calls method in (AbstractStreamingWriter.java:144) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:129) +Method calls method in (AbstractStreamingWriter.java:131) Method has parameter of type in (PartitionCommitTrigger.java:0) Method calls method in (PartitionCommitter.java:167) Method calls method in (PartitionCommitter.java:172) @@ -784,8 +791,8 @@ Method calls method in (CompactBucketWriter.java:49) Method has generic parameter type , java.io.IOException>> with type argument depending on in (CompactBucketWriter.java:0) Method calls method in (CompactBucketWriter.java:44) -Method calls constructor (java.lang.Object)> in (CompactCoordinator.java:186) -Method calls constructor (java.lang.Object)> in (CompactCoordinator.java:194) +Method calls constructor (java.lang.Object)> in (CompactCoordinator.java:190) +Method calls constructor (java.lang.Object)> in (CompactCoordinator.java:198) Method calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) Method calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) Method calls constructor (java.lang.Class, org.apache.flink.api.common.serialization.SerializerConfig)> in (CompactCoordinator.java:118) @@ -815,7 +822,7 @@ Method has generic parameter type > with type argument depending on in (CompactOperator.java:0) Method has parameter of type in (CompactOperator.java:0) Method calls method in (CompactFileUtils.java:117) -Method calls method in (PrintTableSinkFactory.java:187) +Method calls method in (PrintTableSinkFactory.java:189) Method calls method in (PrintTableSinkFactory.java:180) Method calls method in (PrintTableSinkFactory.java:181) Method calls method in (PrintTableSinkFactory.java:182) @@ -833,6 +840,3 @@ Static Initializer ()> calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (ProcTimeCommitTrigger.java:47) Static Initializer ()> gets field in (ProcTimeCommitTrigger.java:47) Static Initializer ()> gets field in (ProcTimeCommitTrigger.java:47) -Method calls method in (FileSink.java:638) -Method calls method in (FileSink.java:639) -Method calls method in (FileSink.java:640) diff --git a/flink-connectors/flink-connector-datagen/pom.xml b/flink-connectors/flink-connector-datagen/pom.xml index a29b6feebb9a0..2a7ff88892e60 100644 --- a/flink-connectors/flink-connector-datagen/pom.xml +++ b/flink-connectors/flink-connector-datagen/pom.xml @@ -42,4 +42,20 @@ + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/IndexLookupGeneratorFunction.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/IndexLookupGeneratorFunction.java new file mode 100644 index 0000000000000..7809f1aae713c --- /dev/null +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/IndexLookupGeneratorFunction.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * A stream generator function that returns elements from the collection based on their index. + * + *

This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class IndexLookupGeneratorFunction implements GeneratorFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(IndexLookupGeneratorFunction.class); + + /** The (de)serializer to be used for the data elements. */ + private final TypeSerializer serializer; + + /** The actual data elements, in serialized form. */ + private byte[] elementsSerialized; + + private int numElements; + + private transient DataInputView input; + + private transient Map lookupMap; + + public IndexLookupGeneratorFunction(TypeInformation typeInfo, Iterable elements) { + this(typeInfo, new ExecutionConfig(), elements); + } + + public IndexLookupGeneratorFunction( + TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { + // must not have null elements and mixed elements + checkIterable(elements, typeInfo.getTypeClass()); + this.serializer = typeInfo.createSerializer(config); + trySerialize(elements); + } + + @VisibleForTesting + @Nullable + public TypeSerializer getSerializer() { + return serializer; + } + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized); + this.input = new DataInputViewStreamWrapper(bais); + lookupMap = new HashMap<>(); + buildLookup(); + } + + @Override + public OUT map(Long index) throws Exception { + return lookupMap.get(index); + } + + /** + * Verifies that all elements in the iterable are non-null, and are of the given class, or a + * subclass thereof. + * + * @param elements The iterable to check. + * @param viewedAs The class to which the elements must be assignable to. + */ + private void checkIterable(Iterable elements, Class viewedAs) { + for (OUT elem : elements) { + numElements++; + if (elem == null) { + throw new IllegalArgumentException("The collection contains a null element"); + } + + if (!viewedAs.isAssignableFrom(elem.getClass())) { + throw new IllegalArgumentException( + "The elements in the collection are not all subclasses of " + + viewedAs.getCanonicalName()); + } + } + } + + private void serializeElements(Iterable elements) throws IOException { + Preconditions.checkState(serializer != null, "serializer not set"); + LOG.info("Serializing elements using {}", serializer); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + + try { + for (OUT element : elements) { + serializer.serialize(element, wrapper); + } + } catch (Exception e) { + throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); + } + this.elementsSerialized = baos.toByteArray(); + } + + private OUT tryDeserialize() throws IOException { + try { + return serializer.deserialize(input); + } catch (EOFException eof) { + throw new NoSuchElementException( + "Reached the end of the collection. This could be caused by issues with the " + + "serializer or by calling the map() function more times than there " + + "are elements in the collection. Make sure that you set the number " + + "of records to be produced by the DataGeneratorSource equal to the " + + "number of elements in the collection."); + } catch (Exception e) { + throw new IOException( + "Failed to deserialize an element from the source. " + + "If you are using user-defined serialization (Value and Writable " + + "types), check the serialization functions.\nSerializer is " + + serializer, + e); + } + } + + private void buildLookup() throws IOException { + for (long i = 0; i < numElements; i++) { + lookupMap.put(i, tryDeserialize()); + } + } + + private void trySerialize(Iterable elements) { + try { + serializeElements(elements); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } +} diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java index 3d2416c1e16d4..9260c8cc5a351 100644 --- a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java @@ -142,7 +142,7 @@ public DataGeneratorSource( rateLimiterStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); } - private DataGeneratorSource( + DataGeneratorSource( SourceReaderFactory sourceReaderFactory, GeneratorFunction generatorFunction, long count, diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DoubleEmittingSourceReaderWithCheckpointsInBetween.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DoubleEmittingSourceReaderWithCheckpointsInBetween.java new file mode 100644 index 0000000000000..fdad0e4440e70 --- /dev/null +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DoubleEmittingSourceReaderWithCheckpointsInBetween.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceReader} that synchronizes emission of N elements on the arrival of the checkpoint + * barriers It 1) emits a list of elements without checkpoints in-between, 2) then waits for two + * checkpoints to complete, 3) then re-emits the same elements before 4) waiting for another two + * checkpoints and 5) exiting. + * + *

This lockstep execution is possible because {@code pollNext} and {@code snapshotState} are + * executed in the same thread and the fact that {@code pollNext} can emit N elements at once. This + * reader is meant to be used solely for testing purposes as the substitution for the {@code + * FiniteTestSource} which implements the deprecated {@code SourceFunction} API. + */ +@Experimental +public class DoubleEmittingSourceReaderWithCheckpointsInBetween< + E, O, IterT extends Iterator, SplitT extends IteratorSourceSplit> + extends IteratorSourceReaderBase { + + private final GeneratorFunction generatorFunction; + + private BooleanSupplier allowedToExit; + private int snapshotsCompleted; + private int snapshotsToWaitFor; + private boolean done; + + public DoubleEmittingSourceReaderWithCheckpointsInBetween( + SourceReaderContext context, + GeneratorFunction generatorFunction, + @Nullable BooleanSupplier allowedToExit) { + super(context); + this.generatorFunction = checkNotNull(generatorFunction); + this.allowedToExit = allowedToExit; + } + + public DoubleEmittingSourceReaderWithCheckpointsInBetween( + SourceReaderContext context, GeneratorFunction generatorFunction) { + super(context); + this.generatorFunction = checkNotNull(generatorFunction); + } + + // ------------------------------------------------------------------------ + + @Override + public void start(SourceReaderContext context) { + try { + generatorFunction.open(context); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to open the GeneratorFunction", e); + } + } + + @Override + public InputStatus pollNext(ReaderOutput output) { + // This is the termination path after the test data has been emitted twice + if (done) { + if (allowedToExit != null) { // Termination is controlled externally + return allowedToExit.getAsBoolean() + ? InputStatus.END_OF_INPUT + : InputStatus.NOTHING_AVAILABLE; + } else { + return InputStatus.END_OF_INPUT; + } + } + // This is the initial path + if (currentSplit == null) { + InputStatus inputStatus = tryMoveToNextSplit(); + switch (inputStatus) { + case MORE_AVAILABLE: + emitElements(output); + break; + case END_OF_INPUT: + // This can happen if the source parallelism is larger than the number of + // available splits + return inputStatus; + } + } else { + // This is the path that emits the same split the second time + emitElements(output); + done = true; + } + availability = new CompletableFuture<>(); + return InputStatus.NOTHING_AVAILABLE; + } + + private void emitElements(ReaderOutput output) { + iterator = currentSplit.getIterator(); + while (iterator.hasNext()) { + E next = iterator.next(); + O converted = convert(next); + output.collect(converted); + } + // Always wait for two snapshots after emitting the elements + snapshotsToWaitFor = 2; + snapshotsCompleted = 0; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + snapshotsCompleted++; + if (snapshotsCompleted >= snapshotsToWaitFor) { + availability.complete(null); + } + + if (allowedToExit != null) { + if (allowedToExit.getAsBoolean()) { + availability.complete(null); + } + } + } + + @Override + protected O convert(E value) { + try { + return generatorFunction.map(value); + } catch (Exception e) { + String message = + String.format( + "A user-provided generator function threw an exception on this input: %s", + value.toString()); + throw new FlinkRuntimeException(message, e); + } + } +} diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/TestDataGenerators.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/TestDataGenerators.java new file mode 100644 index 0000000000000..182828f38c1dc --- /dev/null +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/TestDataGenerators.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.SourceReaderFactory; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit; +import org.apache.flink.connector.datagen.functions.IndexLookupGeneratorFunction; + +import java.util.Collection; +import java.util.function.BooleanSupplier; + +/** A collection of factory methods for creating data generator-based sources. */ +@Experimental +public class TestDataGenerators { + + /** + * Creates a source that emits provided {@code data}, waits for two checkpoints and emits the + * same {@code data } again. It is intended only to be used for test purposes. See {@link + * DoubleEmittingSourceReaderWithCheckpointsInBetween} for details. + * + * @param typeInfo The type information of the elements. + * @param data The collection of elements to create the stream from. + * @param The type of the returned data stream. + */ + public static DataGeneratorSource fromDataWithSnapshotsLatch( + Collection data, TypeInformation typeInfo) { + IndexLookupGeneratorFunction generatorFunction = + new IndexLookupGeneratorFunction<>(typeInfo, data); + + return new DataGeneratorSource<>( + (SourceReaderFactory) + (readerContext) -> + new DoubleEmittingSourceReaderWithCheckpointsInBetween<>( + readerContext, generatorFunction), + generatorFunction, + data.size(), + typeInfo); + } + + /** + * Creates a source that emits provided {@code data}, waits for two checkpoints and emits the + * same {@code data } again. It is intended only to be used for test purposes. See {@link + * DoubleEmittingSourceReaderWithCheckpointsInBetween} for details. + * + * @param typeInfo The type information of the elements. + * @param data The collection of elements to create the stream from. + * @param The type of the returned data stream. + * @param allowedToExit The boolean supplier that makes it possible to delay termination based + * on external conditions. + */ + public static DataGeneratorSource fromDataWithSnapshotsLatch( + Collection data, TypeInformation typeInfo, BooleanSupplier allowedToExit) { + IndexLookupGeneratorFunction generatorFunction = + new IndexLookupGeneratorFunction<>(typeInfo, data); + + return new DataGeneratorSource<>( + (SourceReaderFactory) + (readerContext) -> + new DoubleEmittingSourceReaderWithCheckpointsInBetween<>( + readerContext, generatorFunction, allowedToExit), + generatorFunction, + data.size(), + typeInfo); + } +} diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 4efcce668de37..2acd7f070decc 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -694,6 +694,14 @@ under the License. test + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + org.apache.flink diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index 7d5ca401c6182..3131279eca170 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -18,14 +18,15 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.connector.file.table.FileSystemConnectorOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.Expressions; @@ -792,15 +793,21 @@ private void testStreamingWriteWithCustomPartitionCommitPolicy( Row.of(3, "x", "y", "2020-05-03", "9"), Row.of(4, "x", "y", "2020-05-03", "10"), Row.of(5, "x", "y", "2020-05-03", "11")); + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + Types.INT, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING); + DataStream stream = - env.addSource( - new FiniteTestSource<>(data), - new RowTypeInfo( - Types.INT, - Types.STRING, - Types.STRING, - Types.STRING, - Types.STRING)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, rowTypeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source"); + tEnv.createTemporaryView( "my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e")); @@ -886,17 +893,21 @@ private void testStreamingWrite( Row.of(3, "x", "y", "2020-05-03", "9"), Row.of(4, "x", "y", "2020-05-03", "10"), Row.of(5, "x", "y", "2020-05-03", "11")); + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + Types.INT, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING); + DataStream stream = - env.addSource( - new FiniteTestSource<>(data), - new RowTypeInfo( - Types.INT, - Types.STRING, - Types.STRING, - Types.STRING, - Types.STRING)); - /*tEnv.createTemporaryView( - "my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));*/ + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, rowTypeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source"); + tEnv.createTemporaryView( "my_table", stream, diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index fe46926d05f8d..402ca54e50f5e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -25,11 +26,11 @@ import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; @@ -979,25 +980,28 @@ public void testReadParquetWithNullableComplexType() throws Exception { List rows = generateRows(); List expectedRows = generateExpectedRows(rows); + + RowTypeInfo typeInfo = + new RowTypeInfo( + new TypeInformation[] { + Types.INT, + Types.STRING, + new RowTypeInfo( + new TypeInformation[] {Types.STRING, Types.INT, Types.INT}, + new String[] {"c1", "c2", "c3"}), + new MapTypeInfo<>(Types.STRING, Types.STRING), + Types.OBJECT_ARRAY(Types.STRING), + Types.STRING + }, + new String[] {"a", "b", "c", "d", "e", "f"}); + DataStream stream = - env.addSource( - new FiniteTestSource<>(rows), - new RowTypeInfo( - new TypeInformation[] { - Types.INT, - Types.STRING, - new RowTypeInfo( - new TypeInformation[] { - Types.STRING, Types.INT, Types.INT - }, - new String[] {"c1", "c2", "c3"}), - new MapTypeInfo<>(Types.STRING, Types.STRING), - Types.OBJECT_ARRAY(Types.STRING), - Types.STRING - }, - new String[] {"a", "b", "c", "d", "e", "f"})) + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch(rows, typeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source") .filter((FilterFunction) value -> true) - .setParallelism(3); // to parallel tasks + .setParallelism(3); tEnv.createTemporaryView("my_table", stream); assertResults(executeAndGetResult(tEnv), expectedRows); diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java index 1e6237881a5d9..308ed93a92626 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java @@ -58,19 +58,19 @@ public abstract class IteratorSourceReaderBase< private final SourceReaderContext context; /** The availability future. This reader is available as soon as a split is assigned. */ - private CompletableFuture availability; + protected CompletableFuture availability; /** * The iterator producing data. Non-null after a split has been assigned. This field is null or * non-null always together with the {@link #currentSplit} field. */ - @Nullable private IterT iterator; + @Nullable protected IterT iterator; /** * The split whose data we return. Non-null after a split has been assigned. This field is null * or non-null always together with the {@link #iterator} field. */ - @Nullable private SplitT currentSplit; + @Nullable protected SplitT currentSplit; /** The remaining splits that were assigned but not yet processed. */ private final Queue remainingSplits; @@ -115,7 +115,7 @@ public InputStatus pollNext(ReaderOutput output) { protected abstract O convert(E value); - private void finishSplit() { + protected void finishSplit() { iterator = null; currentSplit = null; @@ -127,7 +127,7 @@ private void finishSplit() { } } - private InputStatus tryMoveToNextSplit() { + protected InputStatus tryMoveToNextSplit() { currentSplit = remainingSplits.poll(); if (currentSplit != null) { iterator = currentSplit.getIterator(); diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index ac01d84e4dfb2..4121f4c500d9b 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -126,6 +126,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java index e2752aa75e548..4f68b7c82c60b 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.formats.avro; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; @@ -26,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.util.AbstractTestBase; import org.apache.avro.Schema; @@ -77,7 +78,12 @@ public void testWriteAvroSpecific() throws Exception { AvroWriterFactory

avroWriterFactory = AvroWriters.forSpecificRecord(Address.class); DataStream
stream = - env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, TypeInformation.of(Address.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); + stream.addSink( StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory) .withBucketAssigner(new UniqueBucketAssigner<>("test")) @@ -100,7 +106,12 @@ public void testWriteAvroGeneric() throws Exception { AvroWriterFactory avroWriterFactory = AvroWriters.forGenericRecord(schema); DataStream stream = - env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, new GenericRecordAvroTypeInfo(schema)), + WatermarkStrategy.noWatermarks(), + "Test Source"); + stream.addSink( StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory) .withBucketAssigner(new UniqueBucketAssigner<>("test")) @@ -121,8 +132,14 @@ public void testWriteAvroReflect() throws Exception { env.enableCheckpointing(100); AvroWriterFactory avroWriterFactory = AvroWriters.forReflectRecord(Datum.class); + DataStream stream = - env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, TypeInformation.of(Datum.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); + stream.addSink( StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory) .withBucketAssigner(new UniqueBucketAssigner<>("test")) diff --git a/flink-formats/flink-compress/pom.xml b/flink-formats/flink-compress/pom.xml index 2772a2c0fbdd3..3fa0800f57fa1 100644 --- a/flink-formats/flink-compress/pom.xml +++ b/flink-formats/flink-compress/pom.xml @@ -91,6 +91,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java index c2d08112f6712..83d819440b181 100644 --- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java +++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java @@ -18,14 +18,15 @@ package org.apache.flink.formats.compress; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.compress.extractor.DefaultExtractor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.hadoop.conf.Configuration; @@ -68,7 +69,10 @@ void testWriteCompressedFile(@TempDir java.nio.file.Path tmpDir) throws Exceptio env.enableCheckpointing(100); DataStream stream = - env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(String.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch(testData, Types.STRING), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.map(str -> str) .addSink( diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml index ad6c09687dbfe..587ae392ab9f7 100644 --- a/flink-formats/flink-csv/pom.xml +++ b/flink-formats/flink-csv/pom.xml @@ -109,6 +109,14 @@ under the License. test + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-hadoop-bulk/pom.xml b/flink-formats/flink-hadoop-bulk/pom.xml index 98b9fb12ac6f1..22ff715222ff7 100644 --- a/flink-formats/flink-hadoop-bulk/pom.xml +++ b/flink-formats/flink-hadoop-bulk/pom.xml @@ -163,6 +163,14 @@ under the License. + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterITCase.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterITCase.java index ee476fa1c7f7e..1aba3ca334a90 100644 --- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterITCase.java +++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterITCase.java @@ -18,13 +18,14 @@ package org.apache.flink.formats.hadoop.bulk; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder; import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.util.AbstractTestBase; import org.apache.hadoop.conf.Configuration; @@ -79,10 +80,14 @@ public void testWriteFile() throws Exception { env.setParallelism(1); env.enableCheckpointing(100); - // FiniteTestSource will generate two elements with a checkpoint trigger in between the two - // elements + // This data generator source will emit data elements twice with two checkpoints completed + // in between DataStream stream = - env.addSource(new FiniteTestSource<>(data), TypeInformation.of(String.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch(data, Types.STRING), + WatermarkStrategy.noWatermarks(), + "Test Source"); + Configuration configuration = new Configuration(); // Elements from source are going to be assigned to one bucket HadoopPathBasedBulkFormatBuilder builder = diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index 002dbd4c519ac..94d0b1bdf297d 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -116,6 +116,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + org.apache.flink diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index 0e76c5e64676a..3d35b0c277445 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -182,6 +182,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java index aefcf188b2321..49b9a66cb74b5 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.orc.writer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.orc.data.Record; import org.apache.flink.orc.util.OrcBulkWriterTestUtil; @@ -27,7 +29,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; @@ -59,7 +60,12 @@ void testOrcBulkWriter(@TempDir File outDir) throws Exception { env.enableCheckpointing(100); DataStream stream = - env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(Record.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + testData, TypeInformation.of(Record.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); + stream.map(str -> str) .addSink( StreamingFileSink.forBulkFormat(new Path(outDir.toURI()), factory) diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 9baa9687f17b1..c8f94ed1a5784 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -249,6 +249,14 @@ under the License. test + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java index 91de5fbc4fe99..b2f00180297ec 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.formats.parquet.avro; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.formats.parquet.generated.Address; @@ -26,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.avro.Schema; @@ -76,7 +77,11 @@ void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception { env.enableCheckpointing(100); DataStream
stream = - env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, TypeInformation.of(Address.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.addSink( StreamingFileSink.forBulkFormat( @@ -102,7 +107,11 @@ void testWriteParquetAvroGeneric(@TempDir File folder) throws Exception { env.enableCheckpointing(100); DataStream stream = - env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, new GenericRecordAvroTypeInfo(schema)), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.addSink( StreamingFileSink.forBulkFormat( @@ -132,7 +141,11 @@ void testWriteParquetAvroReflect(@TempDir File folder) throws Exception { env.enableCheckpointing(100); DataStream stream = - env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, TypeInformation.of(Datum.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.addSink( StreamingFileSink.forBulkFormat( diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java index 3b70aa158c388..13360060d9f0c 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java @@ -18,13 +18,14 @@ package org.apache.flink.formats.parquet.protobuf; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.junit5.MiniClusterExtension; import com.google.protobuf.Message; @@ -66,8 +67,11 @@ void testParquetProtoWriters(@TempDir File folder) throws Exception { env.enableCheckpointing(100); DataStream stream = - env.addSource( - new FiniteTestSource<>(data), TypeInformation.of(SimpleProtoRecord.class)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, TypeInformation.of(SimpleProtoRecord.class)), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.addSink( StreamingFileSink.forBulkFormat( diff --git a/flink-formats/flink-sequence-file/pom.xml b/flink-formats/flink-sequence-file/pom.xml index bb42b88cf0fb8..cd65f1b98413d 100644 --- a/flink-formats/flink-sequence-file/pom.xml +++ b/flink-formats/flink-sequence-file/pom.xml @@ -97,6 +97,14 @@ under the License. test + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java index 8b8ba2f3ef3a0..cb3bf66355148 100644 --- a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java +++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java @@ -18,16 +18,17 @@ package org.apache.flink.formats.sequencefile; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.hadoop.conf.Configuration; @@ -67,9 +68,12 @@ void testWriteSequenceFile(@TempDir final File folder) throws Exception { env.enableCheckpointing(100); DataStream> stream = - env.addSource( - new FiniteTestSource<>(testData), - TypeInformation.of(new TypeHint>() {})); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + testData, + TypeInformation.of(new TypeHint>() {})), + WatermarkStrategy.noWatermarks(), + "Test Source"); stream.map( new MapFunction, Tuple2>() { diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 8104c865a88fd..184120ca896bf 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -227,7 +227,15 @@ under the License. ${project.version} test - + + + org.apache.flink + flink-connector-datagen + ${project.version} + test-jar + test + + diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java index 5232062d52a46..c5ea97325658d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java @@ -18,13 +18,14 @@ package org.apache.flink.table.planner.runtime.stream.sql; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.scala.DataStream; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.junit.utils.TempDirUtils; @@ -81,16 +82,19 @@ protected void init() throws IOException { this.expectedRows.addAll(rows); this.expectedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0))); + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING, Types.STRING}, + new String[] {"a", "b", "c"}); + DataStream stream = new DataStream<>( env().getJavaEnv() - .addSource( - new FiniteTestSource<>(rows), - new RowTypeInfo( - new TypeInformation[] { - Types.INT, Types.STRING, Types.STRING - }, - new String[] {"a", "b", "c"}))) + .fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + rows, rowTypeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source")) .filter((FilterFunction) value -> true) .setParallelism(3); // to parallel tasks diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 8c46a28d0a7d4..4e7063d6a673e 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -78,6 +78,8 @@ under the License. org.apache.flink flink-connector-datagen ${project.version} + test + test-jar diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index 2eaa88b045c17..6fbdbca636ede 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -18,11 +18,13 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSink; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.util.AbstractTestBase; import org.junit.Before; @@ -116,18 +118,21 @@ public void init() { @Test public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource source = - new FiniteTestSource<>(BOTH_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) - .sinkTo( - TestSink.newBuilder() - .setDefaultCommitter( - (Supplier> & Serializable) () -> COMMIT_QUEUE) - .setGlobalCommitter( - (Supplier> & Serializable) - () -> GLOBAL_COMMIT_QUEUE) - .build()); + final DataStream stream = + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + SOURCE_DATA, Types.INT, BOTH_QUEUE_RECEIVE_ALL_DATA), + WatermarkStrategy.noWatermarks(), + "Test Source"); + + stream.sinkTo( + TestSink.newBuilder() + .setDefaultCommitter( + (Supplier> & Serializable) () -> COMMIT_QUEUE) + .setGlobalCommitter( + (Supplier> & Serializable) () -> GLOBAL_COMMIT_QUEUE) + .build()); env.execute(); @@ -174,15 +179,19 @@ public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exce @Test public void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource source = - new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) - .sinkTo( - TestSink.newBuilder() - .setDefaultCommitter( - (Supplier> & Serializable) () -> COMMIT_QUEUE) - .build()); + final DataStream stream = + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + SOURCE_DATA, Types.INT, COMMIT_QUEUE_RECEIVE_ALL_DATA), + WatermarkStrategy.noWatermarks(), + "Test Source"); + + stream.sinkTo( + TestSink.newBuilder() + .setDefaultCommitter( + (Supplier> & Serializable) () -> COMMIT_QUEUE) + .build()); env.execute(); assertThat( COMMIT_QUEUE, @@ -207,18 +216,20 @@ public void writerAndCommitterExecuteInBatchMode() throws Exception { @Test public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource source = - new FiniteTestSource<>(GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) - .sinkTo( - TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .setGlobalCommitter( - (Supplier> & Serializable) - () -> GLOBAL_COMMIT_QUEUE) - .build()); + final DataStream stream = + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + SOURCE_DATA, Types.INT, GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA), + WatermarkStrategy.noWatermarks(), + "Test Source"); + + stream.sinkTo( + TestSink.newBuilder() + .setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE) + .setGlobalCommitter( + (Supplier> & Serializable) () -> GLOBAL_COMMIT_QUEUE) + .build()); env.execute();