Skip to content

Commit

Permalink
Merge pull request #699 from FgForrest/nv-values-error
Browse files Browse the repository at this point in the history
fix(#698): Appropriate version of the StoragePart is not resolved cor…
  • Loading branch information
novoj authored Oct 18, 2024
2 parents b9879c3 + 0e87c07 commit e518164
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -664,51 +664,51 @@ void shouldBuildCorrectHistory(Evita evita) {
replaceTimeStamps(
replaceLag(
"""
Transaction to version: 59, committed at REPLACED_OFFSET_DATE_TIME(processing lag 59ms)(3 mutations, 1 KB):
Transaction to version: 59, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities
Transaction to version: 58, committed at REPLACED_OFFSET_DATE_TIME(processing lag 73ms)(5 mutations, 2 KB):
Transaction to version: 58, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(5 mutations, 2 KB):
- changes in `PRODUCT`: 5 upserted entities
Transaction to version: 57, committed at REPLACED_OFFSET_DATE_TIME(processing lag 87ms)(2 mutations, 920 B):
Transaction to version: 57, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(2 mutations, 1 KB):
- changes in `PRODUCT`: 2 upserted entities
Transaction to version: 56, committed at REPLACED_OFFSET_DATE_TIME(processing lag 105ms)(4 mutations, 2 KB):
Transaction to version: 56, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(4 mutations, 1 KB):
- changes in `PRODUCT`: 4 upserted entities
Transaction to version: 55, committed at REPLACED_OFFSET_DATE_TIME(processing lag 119ms)(1 mutations, 561 B):
Transaction to version: 55, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(1 mutations, 590 B):
- changes in `PRODUCT`: 1 upserted entities
Transaction to version: 54, committed at REPLACED_OFFSET_DATE_TIME(processing lag 57ms)(4 mutations, 2 KB):
Transaction to version: 54, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(4 mutations, 1 KB):
- changes in `PRODUCT`: 4 upserted entities
Transaction to version: 53, committed at REPLACED_OFFSET_DATE_TIME(processing lag 73ms)(5 mutations, 2 KB):
Transaction to version: 53, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(5 mutations, 2 KB):
- changes in `PRODUCT`: 5 upserted entities
Transaction to version: 52, committed at REPLACED_OFFSET_DATE_TIME(processing lag 87ms)(3 mutations, 1 KB):
Transaction to version: 52, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities
Transaction to version: 51, committed at REPLACED_OFFSET_DATE_TIME(processing lag 101ms)(2 mutations, 1 KB):
Transaction to version: 51, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(2 mutations, 1 KB):
- changes in `PRODUCT`: 2 upserted entities
Transaction to version: 50, committed at REPLACED_OFFSET_DATE_TIME(processing lag 60ms)(3 mutations, 1 KB):
Transaction to version: 50, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities
Transaction to version: 49, committed at REPLACED_OFFSET_DATE_TIME(processing lag 74ms)(2 mutations, 1 KB):
Transaction to version: 49, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(2 mutations, 1 KB):
- changes in `PRODUCT`: 2 upserted entities
Transaction to version: 48, committed at REPLACED_OFFSET_DATE_TIME(processing lag 88ms)(5 mutations, 2 KB):
Transaction to version: 48, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(5 mutations, 2 KB):
- changes in `PRODUCT`: 5 upserted entities
Transaction to version: 47, committed at REPLACED_OFFSET_DATE_TIME(processing lag 102ms)(1 mutations, 613 B):
Transaction to version: 47, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(1 mutations, 533 B):
- changes in `PRODUCT`: 1 upserted entities
Transaction to version: 46, committed at REPLACED_OFFSET_DATE_TIME(processing lag 115ms)(4 mutations, 2 KB):
Transaction to version: 46, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(4 mutations, 2 KB):
- changes in `PRODUCT`: 4 upserted entities
Transaction to version: 45, committed at REPLACED_OFFSET_DATE_TIME(processing lag 67ms)(4 mutations, 2 KB):
Transaction to version: 45, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(4 mutations, 2 KB):
- changes in `PRODUCT`: 4 upserted entities
Transaction to version: 44, committed at REPLACED_OFFSET_DATE_TIME(processing lag 84ms)(3 mutations, 1 KB):
Transaction to version: 44, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities
Transaction to version: 43, committed at REPLACED_OFFSET_DATE_TIME(processing lag 101ms)(1 mutations, 696 B):
Transaction to version: 43, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(1 mutations, 566 B):
- changes in `PRODUCT`: 1 upserted entities
Transaction to version: 42, committed at REPLACED_OFFSET_DATE_TIME(processing lag 115ms)(5 mutations, 2 KB):
Transaction to version: 42, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(5 mutations, 2 KB):
- changes in `PRODUCT`: 5 upserted entities
Transaction to version: 41, committed at REPLACED_OFFSET_DATE_TIME(processing lag 66ms)(4 mutations, 2 KB):
Transaction to version: 41, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(4 mutations, 2 KB):
- changes in `PRODUCT`: 4 upserted entities
Transaction to version: 40, committed at REPLACED_OFFSET_DATE_TIME(processing lag 84ms)(5 mutations, 2 KB):
Transaction to version: 40, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(5 mutations, 2 KB):
- changes in `PRODUCT`: 5 upserted entities
Transaction to version: 39, committed at REPLACED_OFFSET_DATE_TIME(processing lag 103ms)(3 mutations, 1 KB):
Transaction to version: 39, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities
Transaction to version: 38, committed at REPLACED_OFFSET_DATE_TIME(processing lag 119ms)(2 mutations, 1 KB):
Transaction to version: 38, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(2 mutations, 1 KB):
- changes in `PRODUCT`: 2 upserted entities
Transaction to version: 37, committed at REPLACED_OFFSET_DATE_TIME(processing lag 136ms)(1 mutations, 747 B):
Transaction to version: 37, committed at REPLACED_OFFSET_DATE_TIME(processing REPLACED_LAG)(1 mutations, 606 B):
- changes in `PRODUCT`: 1 upserted entities"""
)
),
Expand All @@ -735,15 +735,15 @@ void shouldBuildCorrectHistory(Evita evita) {
assertEquals(
replaceTimeStamps(
"""
Catalog version: 36, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (14 mutations, 8 KB):
Catalog version: 36, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (14 mutations, 7 KB):
- changes in `PRODUCT`: 14 upserted entities
Catalog version: 32, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 8 KB):
Catalog version: 32, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 7 KB):
- changes in `PRODUCT`: 15 upserted entities
Catalog version: 27, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (13 mutations, 8 KB):
Catalog version: 27, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (13 mutations, 7 KB):
- changes in `PRODUCT`: 13 upserted entities
Catalog version: 23, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 9 KB):
Catalog version: 23, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 7 KB):
- changes in `PRODUCT`: 15 upserted entities
Catalog version: 18, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (10 mutations, 6 KB):
Catalog version: 18, processed at REPLACED_OFFSET_DATE_TIME with 4 transactions (10 mutations, 5 KB):
- changes in `PRODUCT`: 10 upserted entities"""
),
replaceTimeStamps(
Expand All @@ -763,11 +763,11 @@ void shouldBuildCorrectHistory(Evita evita) {
assertEquals(
replaceTimeStamps(
"""
Catalog version: 14, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 9 KB):
Catalog version: 14, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (15 mutations, 7 KB):
- changes in `PRODUCT`: 15 upserted entities
Catalog version: 9, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (13 mutations, 8 KB):
Catalog version: 9, processed at REPLACED_OFFSET_DATE_TIME with 5 transactions (13 mutations, 6 KB):
- changes in `PRODUCT`: 13 upserted entities
Catalog version: 4, processed at REPLACED_OFFSET_DATE_TIME with 3 transactions (9 mutations, 6 KB):
Catalog version: 4, processed at REPLACED_OFFSET_DATE_TIME with 3 transactions (9 mutations, 4 KB):
- changes in `PRODUCT`: 9 upserted entities
Catalog version: 1, processed at REPLACED_OFFSET_DATE_TIME with 1 transactions (3 mutations, 1 KB):
- changes in `PRODUCT`: 3 upserted entities"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,8 +1196,9 @@ private void promoteNonFlushedValuesToSharedState(int valueCount, @Nonnull Colle
if (currentRecordLength > workingMaxRecordSize) {
workingMaxRecordSize = currentRecordLength;
}
final FileLocation previousValue = newKeyToLocations.put(recordKey, recordLocation);
Assert.isPremiseValid(
newKeyToLocations.put(recordKey, recordLocation) == null,
previousValue == null,
"Record was already present!"
);
count = 1;
Expand Down Expand Up @@ -1736,7 +1737,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) {
}

/**
* Retrieves the non-flushed versioned value associated with the given catalog version and key.
* Retrieves the non-flushed versioned value associated with the given catalog version (or lesser) and key.
*
* @param catalogVersion the catalog version to check against
* @param key the record key
Expand All @@ -1748,13 +1749,19 @@ public Optional<VersionedValue> getNonFlushedValueIfVersionMatches(long catalogV
final long[] nv = this.nonFlushedVersions;
if (nv != null) {
int index = Arrays.binarySearch(nv, catalogVersion);
if (index != -1) {
final int startIndex = index >= 0 ? index - 1 : -index - 2;
for (int ix = nv.length - 1; ix >= startIndex && ix >= 0; ix--) {
final Optional<VersionedValue> versionedValue = ofNullable(nvSet.get(nv[ix]))
.map(it -> it.get(key));
if (versionedValue.isPresent()) {
return versionedValue;
final int startIndex = index >= 0 ? index : -index - 2;
if (startIndex >= 0) {
for (int ix = startIndex; ix >= 0; ix--) {
final NonFlushedValueSet nfvs = nvSet.get(nv[ix]);
if (nfvs != null) {
if (nfvs.removedKeys.contains(key)) {
return empty();
} else {
final Optional<VersionedValue> versionedValue = ofNullable(nfvs.get(key));
if (versionedValue.isPresent()) {
return versionedValue;
}
}
}
}
}
Expand Down Expand Up @@ -1843,7 +1850,7 @@ public Optional<VolatileValueInformation> getVolatileValueInformation(long catal
* @param create whether the record was created or not (affects the histogram)
*/
public void putValue(long catalogVersion, @Nonnull RecordKey key, @Nonnull VersionedValue nonFlushedValue, boolean create) {
getNonFlushedValues(catalogVersion).put(key, nonFlushedValue, create);
getNonFlushedValues(catalogVersion).put(key, nonFlushedValue, create && !contains(key));
}

/**
Expand Down Expand Up @@ -2031,6 +2038,24 @@ public Optional<OffsetDateTime> getOldestRecordKeptTimestamp() {
});
}

/**
* Returns true if the non-flushed values contain the non-removed specified key.
* @param key the record key
* @return true if the non-flushed values contain the non-removed specified key, false otherwise
*/
public boolean contains(@Nonnull RecordKey key) {
for (int i = nonFlushedVersions.length - 1; i >= 0; i--) {
long nonFlushedVersion = nonFlushedVersions[i];
final NonFlushedValueSet nfSet = nonFlushedValues.get(nonFlushedVersion);
if (nfSet.removedKeys.contains(key)) {
return false;
} else if (nfSet.addedKeys.contains(key)) {
return true;
}
}
return false;
}

/**
* Retrieves the NonFlushedValueSet associated with the given catalog version or creates new set.
*
Expand Down

0 comments on commit e518164

Please sign in to comment.