Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5-nexus] Write metadata cache data to mappings _meta with refresh time update (#805) #840

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
- `spark.flint.metadataCacheWrite.enabled`: default is false. enable writing metadata to index mappings _meta as read cache for frontend user to access. Do not use in production, this setting will be removed in later version.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
* log entry id
* @param state
* Flint index state
* @param lastRefreshStartTime
* timestamp when last refresh started for manual or external scheduler refresh
* @param lastRefreshCompleteTime
* timestamp when last refresh completed for manual or external scheduler refresh
* @param entryVersion
* entry version fields for consistency control
* @param error
Expand All @@ -28,10 +32,12 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
case class FlintMetadataLogEntry(
id: String,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
* This is currently used as streaming job start time for internal scheduler. In future, this
* should represent the create timestamp of the log entry
*/
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: Map[String, Any],
error: String,
Expand All @@ -40,26 +46,48 @@ case class FlintMetadataLogEntry(
def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties.asScala.toMap)
}

def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties)
}
}

object FlintMetadataLogEntry {

val EMPTY_TIMESTAMP = 0L

/**
* Flint index state enum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
initialLog = initialLog.copy(
initialLog.id(),
initialLog.createTime(),
initialLog.lastRefreshStartTime(),
initialLog.lastRefreshCompleteTime(),
initialLog.state(),
latest.entryVersion(),
initialLog.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessin
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();

json.put("version", "1.0");
json.put("version", "1.1");
json.put("latestId", logEntry.id());
json.put("type", "flintindexstate");
json.put("state", logEntry.state().toString());
json.put("applicationId", applicationId);
json.put("jobId", jobId);
json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString());
json.put("jobStartTime", logEntry.createTime());
json.put("lastRefreshStartTime", logEntry.lastRefreshStartTime());
json.put("lastRefreshCompleteTime", logEntry.lastRefreshCompleteTime());
json.put("lastUpdateTime", lastUpdateTime);
json.put("error", logEntry.error());

Expand Down Expand Up @@ -138,6 +140,8 @@ public static FlintMetadataLogEntry constructLogEntry(
id,
/* sourceMap may use Integer or Long even though it's always long in index mapping */
((Number) sourceMap.get("jobStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshCompleteTime")).longValue(),
FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")),
Map.of("seqNo", seqNo, "primaryTerm", primaryTerm),
(String) sourceMap.get("error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public void purge() {
public FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
0L,
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(),
Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM),
"",
Expand All @@ -146,6 +148,8 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
logEntry.copy(
latestId,
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
logEntry.entryVersion(),
logEntry.error(),
Expand Down Expand Up @@ -184,6 +188,8 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry = new FlintMetadataLogEntry(
logEntry.id(),
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()),
logEntry.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val sourceMap = JMap.of(
"jobStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890123L.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -36,6 +40,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
when(mockLogEntry.id).thenReturn("id")
when(mockLogEntry.state).thenReturn(FlintMetadataLogEntry.IndexState.ACTIVE)
when(mockLogEntry.createTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshStartTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshCompleteTime).thenReturn(1234567890123L)
when(mockLogEntry.error).thenReturn("")
when(mockLogEntry.properties).thenReturn(Map("dataSourceName" -> "testDataSource"))
}
Expand All @@ -45,14 +51,16 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val expectedJsonWithoutLastUpdateTime =
s"""
|{
| "version": "1.0",
| "version": "1.1",
| "latestId": "id",
| "type": "flintindexstate",
| "state": "active",
| "applicationId": "unknown",
| "jobId": "unknown",
| "dataSourceName": "testDataSource",
| "jobStartTime": 1234567890123,
| "lastRefreshStartTime": 1234567890123,
| "lastRefreshCompleteTime": 1234567890123,
| "error": ""
|}
|""".stripMargin
Expand All @@ -67,15 +75,22 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890123L
logEntry.lastRefreshStartTime shouldBe 1234567890123L
logEntry.lastRefreshCompleteTime shouldBe 1234567890123L
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
}

it should "construct log entry with integer jobStartTime value" in {
it should "construct log entry with integer timestamp value" in {
// Use Integer instead of Long for timestamps
val testSourceMap = JMap.of(
"jobStartTime",
1234567890.asInstanceOf[Object], // Integer instead of Long
1234567890.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -87,6 +102,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890
logEntry.lastRefreshStartTime shouldBe 1234567890
logEntry.lastRefreshCompleteTime shouldBe 1234567890
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("false")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -309,6 +313,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Loading
Loading