Skip to content

Commit

Permalink
move shouldReplaceOnComparisonTie to base class to be more reusable (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince authored Jun 11, 2024
1 parent ad5ca34 commit d09cd0c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
Expand Down Expand Up @@ -560,6 +562,46 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM
addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
}

/**
* <li> When the replacing segment and current segment are of {@link LLCSegmentName} then the PK should resolve to
* row in segment with higher sequence id.
* <li> If either or both are not LLC segment, then resolve based on creation time of segment. If creation time is
* same then prefer uploaded segment if other is LLCSegmentName
* <li> If both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to
* current segment.
*
* @param segmentName replacing segment name
* @param currentSegmentName current segment name having the record for the given primary key
* @param segmentCreationTimeMs replacing segment creation time
* @param currentSegmentCreationTimeMs current segment creation time
* @return true if the record in replacing segment should replace the record in current segment
*/
protected boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName,
long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
// resolve using sequence id if both are LLCSegmentName
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
LLCSegmentName currentLLCSegmentName = LLCSegmentName.of(currentSegmentName);
if (llcSegmentName != null && currentLLCSegmentName != null) {
return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber();
}

// either or both are uploaded segments, prefer the latest segment
int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);
if (creationTimeComparisonRes != 0) {
return creationTimeComparisonRes > 0;
}

// if both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to
// current segment
if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
return false;
}
if (UploadedRealtimeSegmentName.of(segmentName) != null) {
return true;
}
return false;
}

@Override
public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
_gotFirstConsumingSegment = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.utils.HashUtils;
Expand Down Expand Up @@ -158,47 +156,6 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
}
}

/**
* <li> When the replacing segment and current segment are of {@link LLCSegmentName} then the PK should resolve to
* row in segment with higher sequence id.
* <li> If either or both are not LLC segment, then resolve based on creation time of segment. If creation time is
* same then prefer uploaded segment if other is LLCSegmentName
* <li> If both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to
* current segment.
*
* @param segmentName replacing segment name
* @param currentSegmentName current segment name having the record for the given primary key
* @param segmentCreationTimeMs replacing segment creation time
* @param currentSegmentCreationTimeMs current segment creation time
* @return true if the record in replacing segment should replace the record in current segment
*/
protected boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName,
long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {

// resolve using sequence id if both are LLCSegmentName
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
LLCSegmentName currentLLCSegmentName = LLCSegmentName.of(currentSegmentName);
if (llcSegmentName != null && currentLLCSegmentName != null) {
return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber();
}

// either or both are uploaded segments, prefer the latest segment
int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);
if (creationTimeComparisonRes != 0) {
return creationTimeComparisonRes > 0;
}

// if both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to
// current segment
if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
return false;
}
if (UploadedRealtimeSegmentName.of(segmentName) != null) {
return true;
}
return false;
}

@Override
protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
Expand Down

0 comments on commit d09cd0c

Please sign in to comment.