Skip to content

Commit

Permalink
[Flink] Fix flink source enum (#573)
Browse files Browse the repository at this point in the history
* add more log

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix task assign logics

Signed-off-by: chenxu <chenxu@dmetasoul.com>

---------

Signed-off-by: chenxu <chenxu@dmetasoul.com>
Co-authored-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jan 17, 2025
1 parent ebb8f45 commit fe80113
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat
private long startTime;
private long nextStartTime;
private int hashBucketNum = -1;
String fullTableName;

public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List<String> partitionColumns, Plan partitionFilters) {
this.context = context;
Expand All @@ -66,6 +67,9 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSou
this.partitionArrowSchema = new Schema(partitionFields);
this.partitionFilters = partitionFilters;
tableInfo = DataOperation.dbManager().getTableInfoByTableId(tableId);
fullTableName = tableInfo.getTableNamespace() + "." + tableInfo.getTableName();
LOG.info("Create Dyn enumerator for table name {}, tableId {}, context {}",
fullTableName, tableId, System.identityHashCode(context));
}

@Override
Expand All @@ -83,6 +87,8 @@ public synchronized void handleSplitRequest(int subtaskId, @Nullable String requ
}
int tasksSize = context.registeredReaders().size();
if (tasksSize == 0) {
LOG.info("handleSplitRequest: Task size is 0 for subtaskId {} for table {}", subtaskId, fullTableName);
taskIdsAwaitingSplit.add(subtaskId);
return;
}
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
Expand Down Expand Up @@ -134,10 +140,10 @@ private synchronized void processDiscoveredSplits(
LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits,
tasksSize, System.identityHashCode(this),
Thread.currentThread().getId());
this.splitAssigner.addSplits(splits);
if (tasksSize == 0) {
return;
}
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
Expand Down

0 comments on commit fe80113

Please sign in to comment.