diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index a6ab4f1de..0e8568a97 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -334,6 +334,7 @@ MONTHNAME: 'MONTHNAME'; NOW: 'NOW'; PERIOD_ADD: 'PERIOD_ADD'; PERIOD_DIFF: 'PERIOD_DIFF'; +RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP'; SEC_TO_TIME: 'SEC_TO_TIME'; STR_TO_DATE: 'STR_TO_DATE'; SUBDATE: 'SUBDATE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 0a2cdf1a0..1f09cdc24 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -747,6 +747,7 @@ dateTimeFunctionName | NOW | PERIOD_ADD | PERIOD_DIFF + | RELATIVE_TIMESTAMP | QUARTER | SECOND | SECOND_OF_MINUTE diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 86970cefb..411a9c5ea 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -133,6 +133,9 @@ public enum BuiltinFunctionName { LOCALTIMESTAMP(FunctionName.of("localtimestamp")), SYSDATE(FunctionName.of("sysdate")), + // Relative timestamp functions + RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")), + /** Text Functions. */ TOSTRING(FunctionName.of("tostring")), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 2f457c172..c4f841fd6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -13,17 +13,23 @@ import org.apache.spark.sql.types.DataTypes; import scala.Function1; import scala.Function2; +import scala.Function3; import scala.Option; import scala.Serializable; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractFunction2; +import scala.runtime.AbstractFunction3; import scala.collection.JavaConverters; import scala.collection.mutable.WrappedArray; +import java.lang.Boolean; import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Collection; import java.util.List; import java.util.Map; @@ -44,6 +50,10 @@ abstract class SerializableAbstractFunction2 extends AbstractFunction implements Serializable { } + abstract class SerializableAbstractFunction3 extends AbstractFunction3 + implements Serializable { + } + /** * Remove specified keys from a JSON string. * @@ -113,7 +123,7 @@ public String apply(String jsonStr, WrappedArray elements) { } } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() @@ -201,13 +211,18 @@ public BigInteger apply(String ipAddress) { }; } - Function1 relativeDateTimeFunction = new SerializableAbstractFunction1() { + /** + * Returns the {@link Instant} corresponding to the given relative string, current instant, and time zone identifier. + * Throws {@link RuntimeException} if the relative timestamp string is not supported. + */ + Function3 relativeTimestampFunction = new SerializableAbstractFunction3() { @Override - public String apply(String relativeDateTimeString) { + public Instant apply(String relativeDateTimeString, Instant currentInstant, String zoneIdString) { + ZoneId zoneId = ZoneId.of(zoneIdString); + LocalDateTime currentLocalDateTime = LocalDateTime.ofInstant(currentInstant, zoneId); + LocalDateTime relativeLocalDateTime = TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, currentLocalDateTime); - // TODO #991 - Get current datetime from Spark - LocalDateTime currentDateTime = LocalDateTime.now(); - return TimeUtils.getRelativeDateTime(relativeDateTimeString, currentDateTime).toString(); + return relativeLocalDateTime.atZone(zoneId).toInstant(); } }; @@ -264,13 +279,13 @@ static ScalaUDF visit(String funcName, List expressions) { Option.apply("ip_to_int"), false, true); - case "relative_datetime": - return new ScalaUDF(relativeDateTimeFunction, - DataTypes.StringType, + case "relative_timestamp": + return new ScalaUDF(relativeTimestampFunction, + DataTypes.TimestampType, seq(expressions), seq(), Option.empty(), - Option.apply("relative_datetime"), + Option.apply("relative_timestamp"), false, true); default: diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java index 75262fd32..d0f1a830f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java @@ -24,11 +24,11 @@ public class TimeUtils { private static final String NOW = "now"; private static final String NEGATIVE_SIGN = "-"; - // Pattern for relative date time string. + // Pattern for relative string. private static final String OFFSET_PATTERN_STRING = "(?[+-])(?\\d+)?(?\\w+)"; private static final String SNAP_PATTERN_STRING = "[@](?\\w+)"; - private static final Pattern RELATIVE_DATE_TIME_PATTERN = Pattern.compile(String.format( + private static final Pattern RELATIVE_PATTERN = Pattern.compile(String.format( "(?%s)?(?%s)?", OFFSET_PATTERN_STRING, SNAP_PATTERN_STRING), Pattern.CASE_INSENSITIVE); @@ -115,46 +115,46 @@ public class TimeUtils { static final int MONTHS_PER_QUARTER = 3; /** - * Returns the {@link LocalDateTime} corresponding to the given relative date time string and date time. - * Throws {@link RuntimeException} if the relative date time string is not supported. + * Returns the {@link LocalDateTime} corresponding to the given relative string and local date time. + * Throws {@link RuntimeException} if the relative string is not supported. */ - public static LocalDateTime getRelativeDateTime(String relativeDateTimeString, LocalDateTime dateTime) { + public static LocalDateTime getRelativeLocalDateTime(String relativeString, LocalDateTime localDateTime) { - LocalDateTime relativeDateTime = dateTime; + LocalDateTime relativeLocalDateTime = localDateTime; - if (relativeDateTimeString.equalsIgnoreCase(NOW)) { - return dateTime; + if (relativeString.equalsIgnoreCase(NOW)) { + return localDateTime; } - Matcher matcher = RELATIVE_DATE_TIME_PATTERN.matcher(relativeDateTimeString); + Matcher matcher = RELATIVE_PATTERN.matcher(relativeString); if (!matcher.matches()) { - String message = String.format("The relative date time '%s' is not supported.", relativeDateTimeString); + String message = String.format("The relative date time '%s' is not supported.", relativeString); throw new RuntimeException(message); } if (matcher.group("offset") != null) { - relativeDateTime = applyOffset( - relativeDateTime, + relativeLocalDateTime = applyOffset( + relativeLocalDateTime, matcher.group("offsetSign"), matcher.group("offsetValue"), matcher.group("offsetUnit")); } if (matcher.group("snap") != null) { - relativeDateTime = applySnap( - relativeDateTime, + relativeLocalDateTime = applySnap( + relativeLocalDateTime, matcher.group("snapUnit")); } - return relativeDateTime; + return relativeLocalDateTime; } /** * Applies the offset specified by the offset sign, value, - * and unit to the given date time, and returns the result. + * and unit to the given local date time, and returns the result. */ - private LocalDateTime applyOffset(LocalDateTime dateTime, String offsetSign, String offsetValue, String offsetUnit) { + private LocalDateTime applyOffset(LocalDateTime localDateTime, String offsetSign, String offsetValue, String offsetUnit) { int offsetValueInt = Optional.ofNullable(offsetValue).map(Integer::parseInt).orElse(1); if (offsetSign.equals(NEGATIVE_SIGN)) { @@ -170,12 +170,12 @@ private LocalDateTime applyOffset(LocalDateTime dateTime, String offsetSign, Str if (DURATION_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { Duration offsetDuration = DURATION_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); - return dateTime.plus(offsetDuration); + return localDateTime.plus(offsetDuration); } if (PERIOD_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) { Period offsetPeriod = PERIOD_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt); - return dateTime.plus(offsetPeriod); + return localDateTime.plus(offsetPeriod); } String message = String.format("The relative date time unit '%s' is not supported.", offsetUnit); @@ -183,33 +183,33 @@ private LocalDateTime applyOffset(LocalDateTime dateTime, String offsetSign, Str } /** - * Snaps the given date time to the start of the previous time + * Snaps the given local date time to the start of the previous time * period specified by the given snap unit, and returns the result. */ - private LocalDateTime applySnap(LocalDateTime dateTime, String snapUnit) { + private LocalDateTime applySnap(LocalDateTime localDateTime, String snapUnit) { // Convert to lower case to make case-insensitive. String snapUnitLowerCase = snapUnit.toLowerCase(); if (SECOND_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.SECONDS); + return localDateTime.truncatedTo(ChronoUnit.SECONDS); } else if (MINUTE_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.MINUTES); + return localDateTime.truncatedTo(ChronoUnit.MINUTES); } else if (HOUR_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.HOURS); + return localDateTime.truncatedTo(ChronoUnit.HOURS); } else if (DAY_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.DAYS); + return localDateTime.truncatedTo(ChronoUnit.DAYS); } else if (WEEK_UNITS_SET.contains(snapUnitLowerCase)) { - return applySnapToDayOfWeek(dateTime, DayOfWeek.SUNDAY); + return applySnapToDayOfWeek(localDateTime, DayOfWeek.SUNDAY); } else if (MONTH_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); } else if (QUARTER_UNITS_SET.contains(snapUnitLowerCase)) { - int monthsToSnap = (dateTime.getMonthValue() - 1) % MONTHS_PER_QUARTER; - return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).minusMonths(monthsToSnap); + int monthsToSnap = (localDateTime.getMonthValue() - 1) % MONTHS_PER_QUARTER; + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).minusMonths(monthsToSnap); } else if (YEAR_UNITS_SET.contains(snapUnitLowerCase)) { - return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); + return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); } else if (DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.containsKey(snapUnitLowerCase)) { - return applySnapToDayOfWeek(dateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnit)); + return applySnapToDayOfWeek(localDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnit)); } String message = String.format("The relative date time unit '%s' is not supported.", snapUnit); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java index f73a1c491..b4435aba5 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java @@ -8,20 +8,10 @@ import com.google.common.collect.ImmutableMap; import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction; import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction$; -import org.apache.spark.sql.catalyst.expressions.CurrentTimeZone$; -import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp$; -import org.apache.spark.sql.catalyst.expressions.DateAddInterval$; -import org.apache.spark.sql.catalyst.expressions.Expression; -import org.apache.spark.sql.catalyst.expressions.Literal$; -import org.apache.spark.sql.catalyst.expressions.ScalaUDF; -import org.apache.spark.sql.catalyst.expressions.TimestampAdd$; -import org.apache.spark.sql.catalyst.expressions.TimestampDiff$; -import org.apache.spark.sql.catalyst.expressions.ToUTCTimestamp$; -import org.apache.spark.sql.catalyst.expressions.UnaryMinus$; +import org.apache.spark.sql.catalyst.expressions.*; import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.SerializableUdf; -import org.opensearch.sql.ppl.CatalystPlanContext; import scala.Option; import java.util.Arrays; @@ -58,6 +48,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.LOCALTIME; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MINUTE_OF_HOUR; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MONTH_OF_YEAR; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.RELATIVE_TIMESTAMP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND_OF_MINUTE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBDATE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SYSDATE; @@ -174,6 +165,11 @@ public interface BuiltinFunctionTransformer { args -> { return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()); }) + .put( + RELATIVE_TIMESTAMP, + args -> { + return SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply())); + }) .build(); static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List args) { @@ -182,7 +178,7 @@ static Expression builtinFunction(org.opensearch.sql.ast.expression.Function fun if(udf == null) { throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL"); } - return udf; + return udf; } else { BuiltinFunctionName builtin = BuiltinFunctionName.of(function.getFuncName()).get(); String name = SPARK_BUILTIN_FUNCTION_NAME_MAPPING.get(builtin); diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java index 95baded40..723b32ec4 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java @@ -5,48 +5,36 @@ package org.opensearch.sql.expression.function; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.mockito.MockedStatic; -import org.mockito.Mockito; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneOffset; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.mockito.Mockito.mockStatic; -import static org.opensearch.sql.expression.function.SerializableUdf.*; +import static org.opensearch.sql.expression.function.SerializableUdf.relativeTimestampFunction; public class SerializableTimeUdfTest { - private MockedStatic mockedDateTime; - - @Before - public void setup() { - final LocalDateTime now = LocalDateTime.parse("2000-01-03T01:01:01.100"); - mockedDateTime = mockStatic(LocalDateTime.class, Mockito.CALLS_REAL_METHODS); - mockedDateTime.when(LocalDateTime::now).thenReturn(now); - } - - @After - public void teardown() { - mockedDateTime.close(); - } + // Monday, Jan 03, 2000 @ 01:01:01.100 UTC + private final LocalDateTime MOCK_LOCAL_DATE_TIME = LocalDateTime.parse("2000-01-03T01:01:01.100"); + private final ZoneOffset MOCK_ZONE_OFFSET = ZoneOffset.UTC; + private final Instant MOCK_INSTANT = MOCK_LOCAL_DATE_TIME.toInstant(MOCK_ZONE_OFFSET); @Test - public void relativeDateTimeTest() { + public void relativeTimestampTest() { /* These are only basic tests of the relative date time functionality. For more comprehensive tests, see {@link TimeUtilsTest}. */ - testValid("now", "2000-01-03T01:01:01.100"); - testValid("-60m", "2000-01-03T00:01:01.100"); - testValid("-h", "2000-01-03T00:01:01.100"); - testValid("+2wk", "2000-01-17T01:01:01.100"); - testValid("-1h@h", "2000-01-03T00:00"); - testValid("@d", "2000-01-03T00:00"); + testValid("now", "2000-01-03T01:01:01.100Z"); + testValid("-60m", "2000-01-03T00:01:01.100Z"); + testValid("-h", "2000-01-03T00:01:01.100Z"); + testValid("+2wk", "2000-01-17T01:01:01.100Z"); + testValid("-1h@h", "2000-01-03T00:00:00Z"); + testValid("@d", "2000-01-03T00:00:00Z"); testInvalid("invalid", "The relative date time 'invalid' is not supported."); testInvalid("INVALID", "The relative date time 'INVALID' is not supported."); @@ -59,15 +47,16 @@ public void relativeDateTimeTest() { testInvalid("@w8", "The relative date time unit 'w8' is not supported."); } - private void testValid(String relativeDateTimeString, String expectedDateTimeString) { - String testMessage = String.format("\"%s\"", relativeDateTimeString); - assertEquals(testMessage, expectedDateTimeString, relativeDateTimeFunction.apply(relativeDateTimeString)); + private void testValid(String relativeString, String expectedTimestampString) { + String testMessage = String.format("\"%s\"", relativeString); + String actualTimestampString = relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString()).toString(); + assertEquals(testMessage, expectedTimestampString, actualTimestampString); } private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) { String testMessage = String.format("\"%s\"", relativeDateTimeString); String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, - () -> relativeDateTimeFunction.apply(relativeDateTimeString)).getMessage(); + () -> relativeTimestampFunction.apply(relativeDateTimeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString())).getMessage(); assertEquals(expectedExceptionMessage, actualExceptionMessage); } } diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java index 59e92a444..5950ad084 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/TimeUtilsTest.java @@ -9,7 +9,6 @@ import static org.junit.Assert.assertThrows; import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; import org.junit.Test; @@ -173,14 +172,14 @@ public void testRelativeSnap() { private void testValid(String relativeDateTimeString, String expectedDateTimeString) { String testMessage = String.format("\"%s\"", relativeDateTimeString); LocalDateTime expectedDateTime = LocalDateTime.parse(expectedDateTimeString); - LocalDateTime actualDateTime = TimeUtils.getRelativeDateTime(relativeDateTimeString, MOCK_DATETIME); + LocalDateTime actualDateTime = TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, MOCK_DATETIME); assertEquals(testMessage, expectedDateTime, actualDateTime); } private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) { String testMessage = String.format("\"%s\"", relativeDateTimeString); String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, - () -> TimeUtils.getRelativeDateTime(relativeDateTimeString, MOCK_DATETIME)).getMessage(); + () -> TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, MOCK_DATETIME)).getMessage(); assertEquals(expectedExceptionMessage, actualExceptionMessage); } }