Skip to content

Commit

Permalink
Initial implementation of relative_timestamp UDF.
Browse files Browse the repository at this point in the history
Signed-off-by: currantw <taylor.curran@improving.com>
  • Loading branch information
currantw committed Jan 7, 2025
1 parent 93446bb commit c78c0a5
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ MONTHNAME: 'MONTHNAME';
NOW: 'NOW';
PERIOD_ADD: 'PERIOD_ADD';
PERIOD_DIFF: 'PERIOD_DIFF';
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
SEC_TO_TIME: 'SEC_TO_TIME';
STR_TO_DATE: 'STR_TO_DATE';
SUBDATE: 'SUBDATE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ dateTimeFunctionName
| NOW
| PERIOD_ADD
| PERIOD_DIFF
| RELATIVE_TIMESTAMP
| QUARTER
| SECOND
| SECOND_OF_MINUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public enum BuiltinFunctionName {
LOCALTIMESTAMP(FunctionName.of("localtimestamp")),
SYSDATE(FunctionName.of("sysdate")),

// Relative timestamp functions
RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")),

/** Text Functions. */
TOSTRING(FunctionName.of("tostring")),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
import org.apache.spark.sql.types.DataTypes;
import scala.Function1;
import scala.Function2;
import scala.Function3;
import scala.Option;
import scala.Serializable;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;
import scala.collection.JavaConverters;
import scala.collection.mutable.WrappedArray;

import java.lang.Boolean;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -44,6 +50,10 @@ abstract class SerializableAbstractFunction2<T1, T2, R> extends AbstractFunction
implements Serializable {
}

abstract class SerializableAbstractFunction3<T1, T2, T3, R> extends AbstractFunction3<T1, T2, T3, R>
implements Serializable {
}

/**
* Remove specified keys from a JSON string.
*
Expand Down Expand Up @@ -113,7 +123,7 @@ public String apply(String jsonStr, WrappedArray<String> elements) {
}
}
};

Function2<String, String, Boolean> cidrFunction = new SerializableAbstractFunction2<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
Expand Down Expand Up @@ -201,13 +211,18 @@ public BigInteger apply(String ipAddress) {
};
}

Function1<String, String> relativeDateTimeFunction = new SerializableAbstractFunction1<String, String>() {
/**
* Returns the {@link Instant} corresponding to the given relative string, current instant, and time zone identifier.
* Throws {@link RuntimeException} if the relative timestamp string is not supported.
*/
Function3<String, Instant, String, Instant> relativeTimestampFunction = new SerializableAbstractFunction3<String, Instant, String, Instant>() {
@Override
public String apply(String relativeDateTimeString) {
public Instant apply(String relativeDateTimeString, Instant currentInstant, String zoneIdString) {
ZoneId zoneId = ZoneId.of(zoneIdString);
LocalDateTime currentLocalDateTime = LocalDateTime.ofInstant(currentInstant, zoneId);
LocalDateTime relativeLocalDateTime = TimeUtils.getRelativeLocalDateTime(relativeDateTimeString, currentLocalDateTime);

// TODO #991 - Get current datetime from Spark
LocalDateTime currentDateTime = LocalDateTime.now();
return TimeUtils.getRelativeDateTime(relativeDateTimeString, currentDateTime).toString();
return relativeLocalDateTime.atZone(zoneId).toInstant();
}
};

Expand Down Expand Up @@ -264,13 +279,13 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
Option.apply("ip_to_int"),
false,
true);
case "relative_datetime":
return new ScalaUDF(relativeDateTimeFunction,
DataTypes.StringType,
case "relative_timestamp":
return new ScalaUDF(relativeTimestampFunction,
DataTypes.TimestampType,
seq(expressions),
seq(),
Option.empty(),
Option.apply("relative_datetime"),
Option.apply("relative_timestamp"),
false,
true);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public class TimeUtils {
private static final String NOW = "now";
private static final String NEGATIVE_SIGN = "-";

// Pattern for relative date time string.
// Pattern for relative string.
private static final String OFFSET_PATTERN_STRING = "(?<offsetSign>[+-])(?<offsetValue>\\d+)?(?<offsetUnit>\\w+)";
private static final String SNAP_PATTERN_STRING = "[@](?<snapUnit>\\w+)";

private static final Pattern RELATIVE_DATE_TIME_PATTERN = Pattern.compile(String.format(
private static final Pattern RELATIVE_PATTERN = Pattern.compile(String.format(
"(?<offset>%s)?(?<snap>%s)?", OFFSET_PATTERN_STRING, SNAP_PATTERN_STRING),
Pattern.CASE_INSENSITIVE);

Expand Down Expand Up @@ -115,46 +115,46 @@ public class TimeUtils {
static final int MONTHS_PER_QUARTER = 3;

/**
* Returns the {@link LocalDateTime} corresponding to the given relative date time string and date time.
* Throws {@link RuntimeException} if the relative date time string is not supported.
* Returns the {@link LocalDateTime} corresponding to the given relative string and local date time.
* Throws {@link RuntimeException} if the relative string is not supported.
*/
public static LocalDateTime getRelativeDateTime(String relativeDateTimeString, LocalDateTime dateTime) {
public static LocalDateTime getRelativeLocalDateTime(String relativeString, LocalDateTime localDateTime) {

LocalDateTime relativeDateTime = dateTime;
LocalDateTime relativeLocalDateTime = localDateTime;

if (relativeDateTimeString.equalsIgnoreCase(NOW)) {
return dateTime;
if (relativeString.equalsIgnoreCase(NOW)) {
return localDateTime;
}

Matcher matcher = RELATIVE_DATE_TIME_PATTERN.matcher(relativeDateTimeString);
Matcher matcher = RELATIVE_PATTERN.matcher(relativeString);
if (!matcher.matches()) {
String message = String.format("The relative date time '%s' is not supported.", relativeDateTimeString);
String message = String.format("The relative date time '%s' is not supported.", relativeString);
throw new RuntimeException(message);
}


if (matcher.group("offset") != null) {
relativeDateTime = applyOffset(
relativeDateTime,
relativeLocalDateTime = applyOffset(
relativeLocalDateTime,
matcher.group("offsetSign"),
matcher.group("offsetValue"),
matcher.group("offsetUnit"));
}

if (matcher.group("snap") != null) {
relativeDateTime = applySnap(
relativeDateTime,
relativeLocalDateTime = applySnap(
relativeLocalDateTime,
matcher.group("snapUnit"));
}

return relativeDateTime;
return relativeLocalDateTime;
}

/**
* Applies the offset specified by the offset sign, value,
* and unit to the given date time, and returns the result.
* and unit to the given local date time, and returns the result.
*/
private LocalDateTime applyOffset(LocalDateTime dateTime, String offsetSign, String offsetValue, String offsetUnit) {
private LocalDateTime applyOffset(LocalDateTime localDateTime, String offsetSign, String offsetValue, String offsetUnit) {

int offsetValueInt = Optional.ofNullable(offsetValue).map(Integer::parseInt).orElse(1);
if (offsetSign.equals(NEGATIVE_SIGN)) {
Expand All @@ -170,46 +170,46 @@ private LocalDateTime applyOffset(LocalDateTime dateTime, String offsetSign, Str

if (DURATION_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) {
Duration offsetDuration = DURATION_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt);
return dateTime.plus(offsetDuration);
return localDateTime.plus(offsetDuration);
}

if (PERIOD_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) {
Period offsetPeriod = PERIOD_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt);
return dateTime.plus(offsetPeriod);
return localDateTime.plus(offsetPeriod);
}

String message = String.format("The relative date time unit '%s' is not supported.", offsetUnit);
throw new RuntimeException(message);
}

/**
* Snaps the given date time to the start of the previous time
* Snaps the given local date time to the start of the previous time
* period specified by the given snap unit, and returns the result.
*/
private LocalDateTime applySnap(LocalDateTime dateTime, String snapUnit) {
private LocalDateTime applySnap(LocalDateTime localDateTime, String snapUnit) {

// Convert to lower case to make case-insensitive.
String snapUnitLowerCase = snapUnit.toLowerCase();

if (SECOND_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.SECONDS);
return localDateTime.truncatedTo(ChronoUnit.SECONDS);
} else if (MINUTE_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.MINUTES);
return localDateTime.truncatedTo(ChronoUnit.MINUTES);
} else if (HOUR_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.HOURS);
return localDateTime.truncatedTo(ChronoUnit.HOURS);
} else if (DAY_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.DAYS);
return localDateTime.truncatedTo(ChronoUnit.DAYS);
} else if (WEEK_UNITS_SET.contains(snapUnitLowerCase)) {
return applySnapToDayOfWeek(dateTime, DayOfWeek.SUNDAY);
return applySnapToDayOfWeek(localDateTime, DayOfWeek.SUNDAY);
} else if (MONTH_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
} else if (QUARTER_UNITS_SET.contains(snapUnitLowerCase)) {
int monthsToSnap = (dateTime.getMonthValue() - 1) % MONTHS_PER_QUARTER;
return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).minusMonths(monthsToSnap);
int monthsToSnap = (localDateTime.getMonthValue() - 1) % MONTHS_PER_QUARTER;
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).minusMonths(monthsToSnap);
} else if (YEAR_UNITS_SET.contains(snapUnitLowerCase)) {
return dateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
} else if (DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.containsKey(snapUnitLowerCase)) {
return applySnapToDayOfWeek(dateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnit));
return applySnapToDayOfWeek(localDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnit));
}

String message = String.format("The relative date time unit '%s' is not supported.", snapUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,10 @@
import com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction;
import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction$;
import org.apache.spark.sql.catalyst.expressions.CurrentTimeZone$;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp$;
import org.apache.spark.sql.catalyst.expressions.DateAddInterval$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.catalyst.expressions.ScalaUDF;
import org.apache.spark.sql.catalyst.expressions.TimestampAdd$;
import org.apache.spark.sql.catalyst.expressions.TimestampDiff$;
import org.apache.spark.sql.catalyst.expressions.ToUTCTimestamp$;
import org.apache.spark.sql.catalyst.expressions.UnaryMinus$;
import org.apache.spark.sql.catalyst.expressions.*;
import org.opensearch.sql.ast.expression.IntervalUnit;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.SerializableUdf;
import org.opensearch.sql.ppl.CatalystPlanContext;
import scala.Option;

import java.util.Arrays;
Expand Down Expand Up @@ -58,6 +48,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LOCALTIME;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MINUTE_OF_HOUR;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MONTH_OF_YEAR;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.RELATIVE_TIMESTAMP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SECOND_OF_MINUTE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBDATE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SYSDATE;
Expand Down Expand Up @@ -174,6 +165,11 @@ public interface BuiltinFunctionTransformer {
args -> {
return ToUTCTimestamp$.MODULE$.apply(CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply());
})
.put(
RELATIVE_TIMESTAMP,
args -> {
return SerializableUdf.visit("relative_timestamp", List.of(args.get(0), CurrentTimestamp$.MODULE$.apply(), CurrentTimeZone$.MODULE$.apply()));
})
.build();

static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List<Expression> args) {
Expand All @@ -182,7 +178,7 @@ static Expression builtinFunction(org.opensearch.sql.ast.expression.Function fun
if(udf == null) {
throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL");
}
return udf;
return udf;
} else {
BuiltinFunctionName builtin = BuiltinFunctionName.of(function.getFuncName()).get();
String name = SPARK_BUILTIN_FUNCTION_NAME_MAPPING.get(builtin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,36 @@

package org.opensearch.sql.expression.function;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mockStatic;
import static org.opensearch.sql.expression.function.SerializableUdf.*;
import static org.opensearch.sql.expression.function.SerializableUdf.relativeTimestampFunction;

public class SerializableTimeUdfTest {

private MockedStatic<LocalDateTime> mockedDateTime;

@Before
public void setup() {
final LocalDateTime now = LocalDateTime.parse("2000-01-03T01:01:01.100");
mockedDateTime = mockStatic(LocalDateTime.class, Mockito.CALLS_REAL_METHODS);
mockedDateTime.when(LocalDateTime::now).thenReturn(now);
}

@After
public void teardown() {
mockedDateTime.close();
}
// Monday, Jan 03, 2000 @ 01:01:01.100 UTC
private final LocalDateTime MOCK_LOCAL_DATE_TIME = LocalDateTime.parse("2000-01-03T01:01:01.100");
private final ZoneOffset MOCK_ZONE_OFFSET = ZoneOffset.UTC;
private final Instant MOCK_INSTANT = MOCK_LOCAL_DATE_TIME.toInstant(MOCK_ZONE_OFFSET);

@Test
public void relativeDateTimeTest() {
public void relativeTimestampTest() {

/* These are only basic tests of the relative date time functionality.
For more comprehensive tests, see {@link TimeUtilsTest}.
*/

testValid("now", "2000-01-03T01:01:01.100");
testValid("-60m", "2000-01-03T00:01:01.100");
testValid("-h", "2000-01-03T00:01:01.100");
testValid("+2wk", "2000-01-17T01:01:01.100");
testValid("-1h@h", "2000-01-03T00:00");
testValid("@d", "2000-01-03T00:00");
testValid("now", "2000-01-03T01:01:01.100Z");
testValid("-60m", "2000-01-03T00:01:01.100Z");
testValid("-h", "2000-01-03T00:01:01.100Z");
testValid("+2wk", "2000-01-17T01:01:01.100Z");
testValid("-1h@h", "2000-01-03T00:00:00Z");
testValid("@d", "2000-01-03T00:00:00Z");

testInvalid("invalid", "The relative date time 'invalid' is not supported.");
testInvalid("INVALID", "The relative date time 'INVALID' is not supported.");
Expand All @@ -59,15 +47,16 @@ public void relativeDateTimeTest() {
testInvalid("@w8", "The relative date time unit 'w8' is not supported.");
}

private void testValid(String relativeDateTimeString, String expectedDateTimeString) {
String testMessage = String.format("\"%s\"", relativeDateTimeString);
assertEquals(testMessage, expectedDateTimeString, relativeDateTimeFunction.apply(relativeDateTimeString));
private void testValid(String relativeString, String expectedTimestampString) {
String testMessage = String.format("\"%s\"", relativeString);
String actualTimestampString = relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString()).toString();
assertEquals(testMessage, expectedTimestampString, actualTimestampString);
}

private void testInvalid(String relativeDateTimeString, String expectedExceptionMessage) {
String testMessage = String.format("\"%s\"", relativeDateTimeString);
String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class,
() -> relativeDateTimeFunction.apply(relativeDateTimeString)).getMessage();
() -> relativeTimestampFunction.apply(relativeDateTimeString, MOCK_INSTANT, MOCK_ZONE_OFFSET.toString())).getMessage();
assertEquals(expectedExceptionMessage, actualExceptionMessage);
}
}
Loading

0 comments on commit c78c0a5

Please sign in to comment.