Skip to content

Commit

Permalink
Enhance funnel functions to accept a new option for stepMaxDuration (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Nov 15, 2024
1 parent 63c4c44 commit ec9d078
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class FunnelBaseAggregationFunction<F extends Comparable>
protected final List<ExpressionContext> _stepExpressions;
protected final FunnelModes _modes = new FunnelModes();
protected final int _numSteps;
protected long _maxStepDuration = 0L;

public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) {
int numArguments = arguments.size();
Expand All @@ -58,12 +59,34 @@ public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) {
Preconditions.checkArgument(numArguments >= 3 + _numSteps,
"FUNNEL_AGG_FUNC expects >= " + (3 + _numSteps) + " arguments, got: %s. The function can be used as "
+ getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, "
+ "[stepExpression, ..], [mode, [mode, ... ]])",
+ "[stepExpression, ..], [extraArgument/mode, [extraArgument/mode, ... ]])",
numArguments);
_stepExpressions = arguments.subList(3, 3 + _numSteps);
if (numArguments > 3 + _numSteps) {
arguments.subList(3 + _numSteps, numArguments)
.forEach(arg -> _modes.add(Mode.valueOf(arg.getLiteral().getStringValue().toUpperCase())));
for (int i = 3 + _numSteps; i < numArguments; i++) {
String extraArgument = arguments.get(i).getLiteral().getStringValue().toUpperCase();
String[] parsedExtraArguments = extraArgument.split("=");
if (parsedExtraArguments.length == 2) {
String key = parsedExtraArguments[0].toUpperCase();
switch (key) {
case FunnelConfigs.MAX_STEP_DURATION:
_maxStepDuration = Long.parseLong(parsedExtraArguments[1]);
Preconditions.checkArgument(_maxStepDuration > 0, "MaxStepDuration must be > 0");
break;
case FunnelConfigs.MODE:
for (String modeStr : parsedExtraArguments[1].split(",")) {
_modes.add(Mode.valueOf(modeStr.trim()));
}
break;
default:
throw new IllegalArgumentException("Unrecognized arguments: " + extraArgument);
}
continue;
}
try {
_modes.add(Mode.valueOf(extraArgument));
} catch (Exception e) {
throw new RuntimeException("Unrecognized extra argument for funnel function: " + extraArgument, e);
}
}
}

Expand Down Expand Up @@ -241,6 +264,13 @@ protected void fillWindow(PriorityQueue<FunnelStepEvent> stepEvents, ArrayDeque<
long windowStart = slidingWindow.peek().getTimestamp();
long windowEnd = windowStart + _windowSize;
while (!stepEvents.isEmpty() && (stepEvents.peek().getTimestamp() < windowEnd)) {
if (_maxStepDuration > 0) {
// When maxStepDuration > 0, we need to check if the event_to_add has a timestamp within the max duration
// from the last event in the sliding window. If not, we break the loop.
if (stepEvents.peek().getTimestamp() - slidingWindow.getLast().getTimestamp() > _maxStepDuration) {
break;
}
}
slidingWindow.addLast(stepEvents.poll());
}
}
Expand Down Expand Up @@ -301,4 +331,9 @@ public boolean hasKeepAll() {
return contains(Mode.KEEP_ALL);
}
}

protected static class FunnelConfigs {
public static final String MODE = "MODE";
static final String MAX_STEP_DURATION = "MAXSTEPDURATION";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,79 @@ public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean useMultiStage
}
}

@Test(dataProvider = "useBothQueryEngines")
public void testFunnelMaxStepGroupByQueriesWithMaxStepDuration(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query =
String.format("SELECT "
+ "userId, funnelMaxStep(timestampCol, '1000', 3, "
+ "url = '/product/search', "
+ "url = '/checkout/start', "
+ "url = '/checkout/confirmation', "
+ "'mode=strict_order, keep_all', "
+ "'maxStepDuration=10' ) "
+ "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult());
JsonNode jsonNode = postQuery(query);
JsonNode rows = jsonNode.get("resultTable").get("rows");
assertEquals(rows.size(), 40);
for (int i = 0; i < 40; i++) {
JsonNode row = rows.get(i);
assertEquals(row.size(), 2);
assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
switch (i / 10) {
case 0:
assertEquals(row.get(1).intValue(), 1);
break;
case 1:
assertEquals(row.get(1).intValue(), 1);
break;
case 2:
assertEquals(row.get(1).intValue(), 1);
break;
case 3:
assertEquals(row.get(1).intValue(), 1);
break;
default:
throw new IllegalStateException();
}
}

query =
String.format("SELECT "
+ "userId, funnelMaxStep(timestampCol, '1000', 3, "
+ "url = '/product/search', "
+ "url = '/checkout/start', "
+ "url = '/checkout/confirmation', "
+ "'mode=strict_order', "
+ "'maxStepDuration=10' ) "
+ "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult());
jsonNode = postQuery(query);
rows = jsonNode.get("resultTable").get("rows");
assertEquals(rows.size(), 40);
for (int i = 0; i < 40; i++) {
JsonNode row = rows.get(i);
assertEquals(row.size(), 2);
assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
switch (i / 10) {
case 0:
assertEquals(row.get(1).intValue(), 1);
break;
case 1:
assertEquals(row.get(1).intValue(), 2);
break;
case 2:
assertEquals(row.get(1).intValue(), 1);
break;
case 3:
assertEquals(row.get(1).intValue(), 1);
break;
default:
throw new IllegalStateException();
}
}
}

@Test(dataProvider = "useBothQueryEngines")
public void testFunnelMatchStepGroupByQueriesWithMode(boolean useMultiStageQueryEngine)
throws Exception {
Expand Down

0 comments on commit ec9d078

Please sign in to comment.