Skip to content

Commit

Permalink
[FLINK-32137][flamegraph] Added filtering of lambdas when building fl…
Browse files Browse the repository at this point in the history
…ame graphs
  • Loading branch information
netvl authored and 1996fanrui committed Aug 23, 2023
1 parent 1ae8e40 commit 9805724
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Factory class for creating Flame Graph representations. */
public class VertexFlameGraphFactory {
Expand Down Expand Up @@ -79,7 +81,7 @@ private static VertexFlameGraph createFlameGraphFromSample(
sample.getSamplesBySubtask().values()) {
for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
if (threadStates.contains(threadInfo.getThreadState())) {
StackTraceElement[] traces = threadInfo.getStackTrace();
StackTraceElement[] traces = cleanLambdaNames(threadInfo.getStackTrace());
root.incrementHitCount();
NodeBuilder parent = root;
for (int i = traces.length - 1; i >= 0; i--) {
Expand All @@ -97,6 +99,46 @@ private static VertexFlameGraph createFlameGraphFromSample(
return new VertexFlameGraph(sample.getEndTime(), root.toNode());
}

// Matches class names like
// org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
// org.apache.flink.runtime.taskexecutor.IdleTestTask$$Lambda$351/605293351
private static final Pattern LAMBDA_CLASS_NAME =
Pattern.compile("(\\$Lambda\\$)\\d+/(0x)?\\p{XDigit}+$");

// Drops stack trace elements with class names matching the above regular expression.
// These elements are useless, because they don't provide any additional information
// except the fact that a lambda is used (they don't have source information, for example),
// and also the lambda "class names" can be different across different JVMs, which pollutes
// flame graphs.
// Note that Thread.getStackTrace() performs a similar logic - the stack trace returned
// by this method will not contain lambda references with it. But ThreadMXBean does collect
// lambdas, so we have to clean them up explicitly.
private static StackTraceElement[] cleanLambdaNames(StackTraceElement[] stackTrace) {
StackTraceElement[] result = new StackTraceElement[stackTrace.length];
for (int i = 0; i < stackTrace.length; i++) {
StackTraceElement element = stackTrace[i];
Matcher matcher = LAMBDA_CLASS_NAME.matcher(element.getClassName());
if (matcher.find()) {
// org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
// -->
// org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$0/0x0
// This ensures that the name is stable across JVMs, but at the same time
// keeps the stack frame in the call since it has the method name, which
// may be useful for analysis.
String newClassName = matcher.replaceFirst("$10/$20");
result[i] =
new StackTraceElement(
newClassName,
element.getMethodName(),
element.getFileName(),
element.getLineNumber());
} else {
result[i] = element;
}
}
return result;
}

private static class NodeBuilder {

private final Map<String, NodeBuilder> children = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,7 @@ void testSampleTaskThreadInfo() throws Exception {
tasks.add(new IdleTestTask());
Thread.sleep(2000);

Map<Long, ExecutionAttemptID> threads =
tasks.stream()
.collect(
Collectors.toMap(
task -> task.getExecutingThread().getId(),
IdleTestTask::getExecutionId));
Map<Long, ExecutionAttemptID> threads = collectExecutionAttempts(tasks);
final Map<ExecutionAttemptID, Collection<ThreadInfoSample>> threadInfoSamples =
threadInfoSampleService
.requestThreadInfoSamples(threads, requestParams)
Expand All @@ -109,12 +104,8 @@ void testTruncateStackTraceIfLimitIsSpecified() throws Exception {
executeWithTerminationGuarantee(
() -> {
tasks.add(new IdleTestTask());
Map<Long, ExecutionAttemptID> threads =
tasks.stream()
.collect(
Collectors.toMap(
task -> task.getExecutingThread().getId(),
IdleTestTask::getExecutionId));
Map<Long, ExecutionAttemptID> threads = collectExecutionAttempts(tasks);

final Map<ExecutionAttemptID, Collection<ThreadInfoSample>> threadInfoSamples1 =
threadInfoSampleService
.requestThreadInfoSamples(threads, requestParams)
Expand Down Expand Up @@ -158,14 +149,7 @@ void testThrowExceptionIfNumSamplesIsNegative() {
tasks.add(new IdleTestTask());

Map<Long, ExecutionAttemptID> threads =
tasks.stream()
.collect(
Collectors.toMap(
task ->
task.getExecutingThread()
.getId(),
IdleTestTask
::getExecutionId));
collectExecutionAttempts(tasks);
threadInfoSampleService.requestThreadInfoSamples(
threads,
new ThreadInfoSamplesRequest(
Expand All @@ -186,12 +170,7 @@ void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()
Set<SampleableTask> tasks = new HashSet<>();
tasks.add(new NotRunningTask());

Map<Long, ExecutionAttemptID> threads =
tasks.stream()
.collect(
Collectors.toMap(
task -> task.getExecutingThread().getId(),
SampleableTask::getExecutionId));
Map<Long, ExecutionAttemptID> threads = collectExecutionAttempts(tasks);
final CompletableFuture<Map<ExecutionAttemptID, Collection<ThreadInfoSample>>>
sampleFuture =
threadInfoSampleService.requestThreadInfoSamples(threads, requestParams);
Expand All @@ -201,6 +180,15 @@ void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()
.isInstanceOf(IllegalStateException.class);
}

private static Map<Long, ExecutionAttemptID> collectExecutionAttempts(
Set<? extends SampleableTask> tasks) {
return tasks.stream()
.collect(
Collectors.toMap(
task -> task.getExecutingThread().getId(),
SampleableTask::getExecutionId));
}

private static class NotRunningTask implements SampleableTask {

private final ExecutionAttemptID executionId = ExecutionAttemptID.randomId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.threadinfo;

import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;

import org.junit.jupiter.api.Test;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for {@link VertexFlameGraphFactory}. */
public class VertexFlameGraphFactoryTest extends TestLogger {
/** Tests that lambda class names are cleaned up inside the stack traces. */
@Test
public void testLambdaClassNamesCleanUp() {
Map<ExecutionAttemptID, Collection<ThreadInfoSample>> samplesBySubtask = generateSamples();

VertexThreadInfoStats sample = new VertexThreadInfoStats(0, 0, 0, samplesBySubtask);

VertexFlameGraph graph = VertexFlameGraphFactory.createFullFlameGraphFrom(sample);
int encounteredLambdas = verifyRecursively(graph.getRoot());
if (encounteredLambdas == 0) {
fail("No lambdas encountered in the test, cleanup functionality was not tested");
}
}

private int verifyRecursively(VertexFlameGraph.Node node) {
String location = node.getStackTraceLocation();
int lambdas = 0;
if (location.contains("$Lambda$")) {
lambdas++;
// com.example.ClassName.method:123
// -> com.example.ClassName.method
// -> com.example.ClassName
String locationWithoutLineNumber = location.substring(0, location.lastIndexOf(":"));
String className =
locationWithoutLineNumber.substring(
0, locationWithoutLineNumber.lastIndexOf("."));
assertThat(className).endsWith("$Lambda$0/0");
}
return lambdas + node.getChildren().stream().mapToInt(this::verifyRecursively).sum();
}

private Map<ExecutionAttemptID, Collection<ThreadInfoSample>> generateSamples() {
ThreadInfoSample sample1 = ThreadInfoSample.from(getStackTraceWithLambda()).get();

List<ThreadInfoSample> samples = new ArrayList<>();
samples.add(sample1);

ExecutionAttemptID executionAttemptID =
new ExecutionAttemptID(
new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(), 0), 0);

Map<ExecutionAttemptID, Collection<ThreadInfoSample>> result = new HashMap<>();
result.put(executionAttemptID, samples);

return result;
}

private ThreadInfo getStackTraceWithLambda() {
Supplier<ThreadInfo> r1 =
() ->
ManagementFactory.getThreadMXBean()
.getThreadInfo(Thread.currentThread().getId(), Integer.MAX_VALUE);
Supplier<ThreadInfo> r2 = () -> r1.get();
return r2.get();
}
}

0 comments on commit 9805724

Please sign in to comment.