Skip to content

Commit 1a91a09

Browse files
committed
Memory optimizations when loading historic data for anomaly detection.
1 parent 158c052 commit 1a91a09

File tree

4 files changed

+115
-77
lines changed

4 files changed

+115
-77
lines changed

dqops/src/main/java/com/dqops/data/readouts/factory/SensorReadoutsColumnNames.java

+13
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ public class SensorReadoutsColumnNames {
227227

228228
COLUMN_NAME_COLUMN_NAME,
229229
DATA_GROUP_NAME_COLUMN_NAME,
230+
DATA_GROUP_HASH_COLUMN_NAME,
230231
TABLE_COMPARISON_NAME_COLUMN_NAME,
231232

232233
DURATION_MS_COLUMN_NAME,
@@ -259,4 +260,16 @@ public class SensorReadoutsColumnNames {
259260
DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "8",
260261
DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "9"
261262
};
263+
264+
/**
265+
* A list of a minimum set of columns from sensor readouts that are used to feed time series sensors with historical data.
266+
*/
267+
public static final String[] SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA = new String[] {
268+
TIME_PERIOD_COLUMN_NAME,
269+
TIME_PERIOD_UTC_COLUMN_NAME,
270+
CHECK_HASH_COLUMN_NAME,
271+
DATA_GROUP_HASH_COLUMN_NAME,
272+
ACTUAL_VALUE_COLUMN_NAME,
273+
EXPECTED_VALUE_COLUMN_NAME
274+
};
262275
}

dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsSnapshot.java

+3-29
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import com.dqops.core.principal.UserDomainIdentity;
2020
import com.dqops.core.synchronization.contract.DqoRoot;
2121
import com.dqops.data.readouts.factory.SensorReadoutsColumnNames;
22-
import com.dqops.data.storage.FileStorageSettings;
23-
import com.dqops.data.storage.ParquetPartitionStorageService;
24-
import com.dqops.data.storage.TableDataSnapshot;
25-
import com.dqops.data.storage.TablePartitioningPattern;
22+
import com.dqops.data.storage.*;
2623
import com.dqops.metadata.sources.PhysicalTableName;
2724
import com.dqops.utils.reflection.ObjectMemorySizeUtility;
2825
import com.dqops.utils.tables.TableColumnUtility;
@@ -39,7 +36,6 @@
3936
*/
4037
public class SensorReadoutsSnapshot extends TableDataSnapshot {
4138
public static String PARQUET_FILE_NAME = "sensor_readout.0.parquet";
42-
public static boolean ENABLE_PRE_FILLING_TIME_SERIES_CACHE = false;
4339
private SensorReadoutsTimeSeriesMap timeSeriesMap;
4440

4541
/**
@@ -101,30 +97,8 @@ public SensorReadoutsTimeSeriesMap getHistoricReadoutsTimeSeries() {
10197
return this.timeSeriesMap;
10298
}
10399

104-
Table allLoadedData = this.getAllData();
105-
this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(), allLoadedData);
106-
107-
if (allLoadedData != null && ENABLE_PRE_FILLING_TIME_SERIES_CACHE) {
108-
// THIS SECTION is disabled for the moment in favor of using an index and searching for time series on demand
109-
110-
TableSliceGroup tableSlices = allLoadedData.splitOn(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME,
111-
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
112-
113-
for (TableSlice tableSlice : tableSlices) {
114-
Table timeSeriesTable = tableSlice.asTable();
115-
LongColumn checkHashColumn = (LongColumn) timeSeriesTable.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
116-
LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(timeSeriesTable,
117-
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
118-
long checkHashId = checkHashColumn.get(0); // the first row has the value
119-
long dataStreamHash = dataStreamHashColumn.isMissing(0) ? 0L : dataStreamHashColumn.get(0);
120-
121-
SensorReadoutTimeSeriesKey timeSeriesKey = new SensorReadoutTimeSeriesKey(checkHashId, dataStreamHash);
122-
Table sortedTimeSeriesTable = timeSeriesTable.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
123-
SensorReadoutsTimeSeriesData timeSeriesData = new SensorReadoutsTimeSeriesData(timeSeriesKey, sortedTimeSeriesTable);
124-
this.timeSeriesMap.add(timeSeriesData);
125-
}
126-
}
127-
100+
this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(),
101+
this.getLoadedMonthlyPartitions());
128102
return this.timeSeriesMap;
129103
}
130104
}

dqops/src/main/java/com/dqops/data/readouts/snapshot/SensorReadoutsTimeSeriesMap.java

+78-30
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,58 @@
1616
package com.dqops.data.readouts.snapshot;
1717

1818
import com.dqops.data.readouts.factory.SensorReadoutsColumnNames;
19+
import com.dqops.data.storage.LoadedMonthlyPartition;
20+
import com.dqops.data.storage.ParquetPartitionId;
1921
import com.dqops.utils.tables.TableColumnUtility;
22+
import lombok.Data;
2023
import tech.tablesaw.api.LongColumn;
2124
import tech.tablesaw.api.Table;
2225
import tech.tablesaw.index.LongIndex;
2326
import tech.tablesaw.selection.Selection;
2427

28+
import java.lang.ref.WeakReference;
2529
import java.time.LocalDate;
26-
import java.util.HashMap;
2730
import java.util.LinkedHashMap;
2831
import java.util.Map;
32+
import java.util.TreeMap;
2933

3034
/**
3135
* Dictionary of identified time series in the historic sensor readout results.
3236
*/
3337
public class SensorReadoutsTimeSeriesMap {
34-
private final Map<SensorReadoutTimeSeriesKey, SensorReadoutsTimeSeriesData> entries = new LinkedHashMap<>();
38+
private final Map<SensorReadoutTimeSeriesKey, WeakReference<SensorReadoutsTimeSeriesData>> entries = new LinkedHashMap<>();
39+
private final Map<ParquetPartitionId, LoadedMonthlyPartition> partitionMap;
40+
private final Map<ParquetPartitionId, PartitionIndexes> partitionIndexes = new TreeMap<>();
3541
private LocalDate firstLoadedMonth;
3642
private LocalDate lastLoadedMonth;
37-
private Table allLoadedData;
38-
private LongColumn checkHashColumn;
39-
private LongColumn dataStreamHashColumn;
40-
private LongIndex checkHashIndex;
41-
private LongIndex dataStreamHashIndex;
4243

4344
/**
4445
* Create a time series map.
4546
* @param firstLoadedMonth The date of the first loaded month.
4647
* @param lastLoadedMonth The date of the last loaded month.
48+
* @param partitionMap Dictionary of loaded partitions.
4749
*/
48-
public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth, Table allLoadedData) {
50+
public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth,
51+
Map<ParquetPartitionId, LoadedMonthlyPartition> partitionMap) {
4952
this.firstLoadedMonth = firstLoadedMonth;
5053
this.lastLoadedMonth = lastLoadedMonth;
51-
this.allLoadedData = allLoadedData;
52-
53-
if (allLoadedData != null) {
54-
this.checkHashColumn = (LongColumn) allLoadedData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
55-
this.dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(allLoadedData,
56-
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
57-
this.checkHashIndex = new LongIndex(this.checkHashColumn);
58-
this.dataStreamHashIndex = new LongIndex(this.dataStreamHashColumn);
54+
this.partitionMap = partitionMap;
55+
if (partitionMap != null) {
56+
for (Map.Entry<ParquetPartitionId, LoadedMonthlyPartition> partitionKeyValue : partitionMap.entrySet()) {
57+
Table partitionData = partitionKeyValue.getValue().getData();
58+
if (partitionData == null) {
59+
return;
60+
}
61+
62+
LongColumn checkHashColumn = (LongColumn) partitionData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
63+
LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(partitionData,
64+
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
65+
LongIndex checkHashIndex = new LongIndex(checkHashColumn);
66+
LongIndex dataStreamHashIndex = new LongIndex(dataStreamHashColumn);
67+
68+
PartitionIndexes partitionIndexesEntry = new PartitionIndexes(checkHashIndex, dataStreamHashIndex, partitionKeyValue.getValue());
69+
this.partitionIndexes.put(partitionKeyValue.getKey(), partitionIndexesEntry);
70+
}
5971
}
6072
}
6173

@@ -83,30 +95,66 @@ public LocalDate getLastLoadedMonth() {
8395
*/
8496
public SensorReadoutsTimeSeriesData findTimeSeriesData(long checkHashId, long dimensionId) {
8597
SensorReadoutTimeSeriesKey key = new SensorReadoutTimeSeriesKey(checkHashId, dimensionId);
86-
SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = this.entries.get(key);
98+
WeakReference<SensorReadoutsTimeSeriesData> sensorReadoutsTimeSeriesDataRef = this.entries.get(key);
99+
SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = sensorReadoutsTimeSeriesDataRef != null ?
100+
sensorReadoutsTimeSeriesDataRef.get() : null;
101+
87102
if (sensorReadoutsTimeSeriesData != null) {
88103
return sensorReadoutsTimeSeriesData;
89104
}
90105

91-
if (this.checkHashIndex == null) {
92-
return null;
93-
}
106+
Table allTimeSeriesData = null;
107+
108+
for (Map.Entry<ParquetPartitionId, PartitionIndexes> partitionIndexesKeyValue : this.partitionIndexes.entrySet()) {
109+
PartitionIndexes partitionIndexesEntry = partitionIndexesKeyValue.getValue();
110+
Selection checkHashRows = partitionIndexesEntry.checkHashIndex.get(checkHashId);
111+
Selection groupHashRows = partitionIndexesEntry.dataStreamHashIndex.get(dimensionId);
112+
113+
Table partitionDataTable = partitionIndexesEntry.partitionData.getData();
114+
if (partitionDataTable == null) {
115+
continue;
116+
}
94117

95-
Selection checkHashRows = this.checkHashIndex.get(checkHashId);
96-
Selection groupHashRows = this.dataStreamHashIndex.get(dimensionId);
118+
Table filteredPartitionRows = partitionDataTable.where(checkHashRows.and(groupHashRows));
119+
Table sortedTimeSeriesTable = filteredPartitionRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
97120

98-
Table filteredRows = this.allLoadedData.where(checkHashRows.and(groupHashRows));
99-
Table sortedTimeSeriesTable = filteredRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
121+
if (allTimeSeriesData == null) {
122+
allTimeSeriesData = sortedTimeSeriesTable;
123+
} else {
124+
allTimeSeriesData.append(sortedTimeSeriesTable);
125+
}
126+
}
127+
128+
SensorReadoutsTimeSeriesData timeSeriesDataSlice = new SensorReadoutsTimeSeriesData(key, allTimeSeriesData);
100129

101-
SensorReadoutsTimeSeriesData newSubset = new SensorReadoutsTimeSeriesData(key, sortedTimeSeriesTable);
102-
return newSubset;
130+
// TODO: we could store it in the cache.. but not for the moment, maybe for a different use case
131+
return timeSeriesDataSlice;
103132
}
104133

105134
/**
106-
* Adds a time series object to the dictionary.
107-
* @param timeSeries Time series object.
135+
* Partition indexes container.
108136
*/
109-
public void add(SensorReadoutsTimeSeriesData timeSeries) {
110-
this.entries.put(timeSeries.getKey(), timeSeries);
137+
@Data
138+
public static class PartitionIndexes {
139+
/**
140+
* Check hash index.
141+
*/
142+
private final LongIndex checkHashIndex;
143+
144+
/**
145+
* Data stream (data group) hash index.
146+
*/
147+
private final LongIndex dataStreamHashIndex;
148+
149+
/**
150+
* The partition data.
151+
*/
152+
private final LoadedMonthlyPartition partitionData;
153+
154+
public PartitionIndexes(LongIndex checkHashIndex, LongIndex dataStreamHashIndex, LoadedMonthlyPartition monthlyPartition) {
155+
this.checkHashIndex = checkHashIndex;
156+
this.dataStreamHashIndex = dataStreamHashIndex;
157+
this.partitionData = monthlyPartition;
158+
}
111159
}
112160
}

0 commit comments

Comments
 (0)