diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java index 35f06663274eb..266f379c6995a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** Factory class for creating Flame Graph representations. */ public class VertexFlameGraphFactory { @@ -79,7 +81,7 @@ private static VertexFlameGraph createFlameGraphFromSample( sample.getSamplesBySubtask().values()) { for (ThreadInfoSample threadInfo : threadInfoSubSamples) { if (threadStates.contains(threadInfo.getThreadState())) { - StackTraceElement[] traces = threadInfo.getStackTrace(); + StackTraceElement[] traces = cleanLambdaNames(threadInfo.getStackTrace()); root.incrementHitCount(); NodeBuilder parent = root; for (int i = traces.length - 1; i >= 0; i--) { @@ -97,6 +99,46 @@ private static VertexFlameGraph createFlameGraphFromSample( return new VertexFlameGraph(sample.getEndTime(), root.toNode()); } + // Matches class names like + // org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0 + // org.apache.flink.runtime.taskexecutor.IdleTestTask$$Lambda$351/605293351 + private static final Pattern LAMBDA_CLASS_NAME = + Pattern.compile("(\\$Lambda\\$)\\d+/(0x)?\\p{XDigit}+$"); + + // Drops stack trace elements with class names matching the above regular expression. + // These elements are useless, because they don't provide any additional information + // except the fact that a lambda is used (they don't have source information, for example), + // and also the lambda "class names" can be different across different JVMs, which pollutes + // flame graphs. + // Note that Thread.getStackTrace() performs a similar logic - the stack trace returned + // by this method will not contain lambda references with it. But ThreadMXBean does collect + // lambdas, so we have to clean them up explicitly. + private static StackTraceElement[] cleanLambdaNames(StackTraceElement[] stackTrace) { + StackTraceElement[] result = new StackTraceElement[stackTrace.length]; + for (int i = 0; i < stackTrace.length; i++) { + StackTraceElement element = stackTrace[i]; + Matcher matcher = LAMBDA_CLASS_NAME.matcher(element.getClassName()); + if (matcher.find()) { + // org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0 + // --> + // org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$0/0x0 + // This ensures that the name is stable across JVMs, but at the same time + // keeps the stack frame in the call since it has the method name, which + // may be useful for analysis. + String newClassName = matcher.replaceFirst("$10/$20"); + result[i] = + new StackTraceElement( + newClassName, + element.getMethodName(), + element.getFileName(), + element.getLineNumber()); + } else { + result[i] = element; + } + } + return result; + } + private static class NodeBuilder { private final Map children = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java index 4e14bafd6d6bf..ea41d22f9f24a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java @@ -78,12 +78,7 @@ void testSampleTaskThreadInfo() throws Exception { tasks.add(new IdleTestTask()); Thread.sleep(2000); - Map threads = - tasks.stream() - .collect( - Collectors.toMap( - task -> task.getExecutingThread().getId(), - IdleTestTask::getExecutionId)); + Map threads = collectExecutionAttempts(tasks); final Map> threadInfoSamples = threadInfoSampleService .requestThreadInfoSamples(threads, requestParams) @@ -109,12 +104,8 @@ void testTruncateStackTraceIfLimitIsSpecified() throws Exception { executeWithTerminationGuarantee( () -> { tasks.add(new IdleTestTask()); - Map threads = - tasks.stream() - .collect( - Collectors.toMap( - task -> task.getExecutingThread().getId(), - IdleTestTask::getExecutionId)); + Map threads = collectExecutionAttempts(tasks); + final Map> threadInfoSamples1 = threadInfoSampleService .requestThreadInfoSamples(threads, requestParams) @@ -158,14 +149,7 @@ void testThrowExceptionIfNumSamplesIsNegative() { tasks.add(new IdleTestTask()); Map threads = - tasks.stream() - .collect( - Collectors.toMap( - task -> - task.getExecutingThread() - .getId(), - IdleTestTask - ::getExecutionId)); + collectExecutionAttempts(tasks); threadInfoSampleService.requestThreadInfoSamples( threads, new ThreadInfoSamplesRequest( @@ -186,12 +170,7 @@ void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling() Set tasks = new HashSet<>(); tasks.add(new NotRunningTask()); - Map threads = - tasks.stream() - .collect( - Collectors.toMap( - task -> task.getExecutingThread().getId(), - SampleableTask::getExecutionId)); + Map threads = collectExecutionAttempts(tasks); final CompletableFuture>> sampleFuture = threadInfoSampleService.requestThreadInfoSamples(threads, requestParams); @@ -201,6 +180,15 @@ void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling() .isInstanceOf(IllegalStateException.class); } + private static Map collectExecutionAttempts( + Set tasks) { + return tasks.stream() + .collect( + Collectors.toMap( + task -> task.getExecutingThread().getId(), + SampleableTask::getExecutionId)); + } + private static class NotRunningTask implements SampleableTask { private final ExecutionAttemptID executionId = ExecutionAttemptID.randomId(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java new file mode 100644 index 0000000000000..583f8ae83f0a9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.threadinfo; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraphID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** Tests for {@link VertexFlameGraphFactory}. */ +public class VertexFlameGraphFactoryTest extends TestLogger { + /** Tests that lambda class names are cleaned up inside the stack traces. */ + @Test + public void testLambdaClassNamesCleanUp() { + Map> samplesBySubtask = generateSamples(); + + VertexThreadInfoStats sample = new VertexThreadInfoStats(0, 0, 0, samplesBySubtask); + + VertexFlameGraph graph = VertexFlameGraphFactory.createFullFlameGraphFrom(sample); + int encounteredLambdas = verifyRecursively(graph.getRoot()); + if (encounteredLambdas == 0) { + fail("No lambdas encountered in the test, cleanup functionality was not tested"); + } + } + + private int verifyRecursively(VertexFlameGraph.Node node) { + String location = node.getStackTraceLocation(); + int lambdas = 0; + if (location.contains("$Lambda$")) { + lambdas++; + // com.example.ClassName.method:123 + // -> com.example.ClassName.method + // -> com.example.ClassName + String locationWithoutLineNumber = location.substring(0, location.lastIndexOf(":")); + String className = + locationWithoutLineNumber.substring( + 0, locationWithoutLineNumber.lastIndexOf(".")); + assertThat(className).endsWith("$Lambda$0/0"); + } + return lambdas + node.getChildren().stream().mapToInt(this::verifyRecursively).sum(); + } + + private Map> generateSamples() { + ThreadInfoSample sample1 = ThreadInfoSample.from(getStackTraceWithLambda()).get(); + + List samples = new ArrayList<>(); + samples.add(sample1); + + ExecutionAttemptID executionAttemptID = + new ExecutionAttemptID( + new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(), 0), 0); + + Map> result = new HashMap<>(); + result.put(executionAttemptID, samples); + + return result; + } + + private ThreadInfo getStackTraceWithLambda() { + Supplier r1 = + () -> + ManagementFactory.getThreadMXBean() + .getThreadInfo(Thread.currentThread().getId(), Integer.MAX_VALUE); + Supplier r2 = () -> r1.get(); + return r2.get(); + } +}