Skip to content

Commit 0264a7b

Browse files
authored
fix(issues1784): fix upload burst cause object index out of order (AutoMQ#1785)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 242e7b3 commit 0264a7b

File tree

4 files changed

+59
-6
lines changed

4 files changed

+59
-6
lines changed

s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java

-4
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ public void burst() {
111111
}
112112

113113
private CompletableFuture<Void> acquireLimiter(int size) {
114-
if (this.burst) {
115-
return CompletableFuture.completedFuture(null);
116-
}
117-
118114
return limiter.acquire(size);
119115
}
120116

s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java

+33-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public interface ObjectWriter {
3838
// TODO: first n bit is the compressed flag
3939
byte DATA_BLOCK_DEFAULT_FLAG = 0x02;
4040

41-
static ObjectWriter writer(long objectId, ObjectStorage objectStorage, int blockSizeThreshold, int partSizeThreshold) {
41+
static ObjectWriter writer(long objectId, ObjectStorage objectStorage, int blockSizeThreshold,
42+
int partSizeThreshold) {
4243
return new DefaultObjectWriter(objectId, objectStorage, blockSizeThreshold, partSizeThreshold);
4344
}
4445

@@ -79,11 +80,14 @@ class DefaultObjectWriter implements ObjectWriter {
7980
private IndexBlock indexBlock;
8081
private long size;
8182

83+
private long lastStreamId = Constants.NOOP_STREAM_ID;
84+
private long lastEndOffset = Constants.NOOP_OFFSET;
85+
8286
/**
8387
* Create a new object writer.
8488
*
8589
* @param objectId object id
86-
* @param objectStorage S3 operator
90+
* @param objectStorage S3 operator
8791
* @param blockSizeThreshold the max size of a block
8892
* @param partSizeThreshold the max size of a part. If it is smaller than {@link Writer#MIN_PART_SIZE}, it will be set to {@link Writer#MIN_PART_SIZE}.
8993
*/
@@ -98,6 +102,7 @@ public DefaultObjectWriter(long objectId, ObjectStorage objectStorage, int block
98102
}
99103

100104
public synchronized void write(long streamId, List<StreamRecordBatch> records) {
105+
check(streamId, records);
101106
List<List<StreamRecordBatch>> blocks = groupByBlock(records);
102107
for (List<StreamRecordBatch> blockRecords : blocks) {
103108
DataBlock block = new DataBlock(streamId, blockRecords);
@@ -109,6 +114,32 @@ public synchronized void write(long streamId, List<StreamRecordBatch> records) {
109114
}
110115
}
111116

117+
private void check(long streamId, List<StreamRecordBatch> records) {
118+
if (records.isEmpty()) {
119+
return;
120+
}
121+
long recordsEndOffset = records.get(records.size() - 1).getLastOffset();
122+
if (lastStreamId == Constants.NOOP_STREAM_ID) {
123+
lastStreamId = streamId;
124+
lastEndOffset = recordsEndOffset;
125+
return;
126+
}
127+
if (lastStreamId > streamId) {
128+
throw new IllegalArgumentException(String.format("The incoming streamId=%s is less than last streamId=%s", streamId, lastStreamId));
129+
} else if (lastStreamId == streamId) {
130+
long recordsStartOffset = records.get(0).getBaseOffset();
131+
if (recordsStartOffset < lastEndOffset) {
132+
throw new IllegalArgumentException(String.format("The incoming streamId=%s startOffset=%s is less than lastEndOffset=%s",
133+
streamId, recordsStartOffset, lastEndOffset));
134+
} else {
135+
lastEndOffset = recordsEndOffset;
136+
}
137+
} else {
138+
lastStreamId = streamId;
139+
lastEndOffset = recordsEndOffset;
140+
}
141+
}
142+
112143
private List<List<StreamRecordBatch>> groupByBlock(List<StreamRecordBatch> records) {
113144
List<List<StreamRecordBatch>> blocks = new LinkedList<>();
114145
List<StreamRecordBatch> blockRecords = new ArrayList<>(records.size());

s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public AsyncRateLimiter(double bytesPerSec) {
3636
tickTask = SCHEDULER.scheduleAtFixedRate(this::tick, 1, 1, TimeUnit.MILLISECONDS);
3737
}
3838

39+
/**
40+
* Async acquire permit. The returned future will be orderly completed when the permit is acquired.
41+
*/
3942
public synchronized CompletableFuture<Void> acquire(int size) {
4043
if (acquireQueue.isEmpty() && rateLimiter.tryAcquire(size)) {
4144
return CompletableFuture.completedFuture(null);

s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Iterator;
2222
import java.util.List;
2323
import java.util.concurrent.ExecutionException;
24+
import org.junit.jupiter.api.Assertions;
2425
import org.junit.jupiter.api.Tag;
2526
import org.junit.jupiter.api.Test;
2627

@@ -103,6 +104,28 @@ public void testWrite() throws ExecutionException, InterruptedException {
103104
}
104105
}
105106

107+
@Test
108+
public void testWrite_check() {
109+
S3ObjectMetadata metadata = new S3ObjectMetadata(1, 0, S3ObjectType.STREAM_SET);
110+
111+
ObjectStorage objectStorage = new MemoryObjectStorage();
112+
ObjectWriter objectWriter = ObjectWriter.writer(1, objectStorage, 1024, 1024);
113+
objectWriter.write(233, List.of(
114+
newRecord(233, 10, 5, 512),
115+
newRecord(233, 15, 5, 512)
116+
));
117+
118+
// write smaller stream
119+
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> objectWriter.write(231, List.of(newRecord(231, 10, 5, 512))));
120+
121+
objectWriter.write(233, List.of(newRecord(233, 20, 5, 512)));
122+
123+
// write smaller offset
124+
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> objectWriter.write(233, List.of(newRecord(233, 10, 5, 512))));
125+
126+
objectWriter.write(234, List.of(newRecord(234, 0, 5, 512)));
127+
}
128+
106129
StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) {
107130
return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize));
108131
}

0 commit comments

Comments
 (0)