From b0667ce69536ccb30afffd92d11a8cc497529d06 Mon Sep 17 00:00:00 2001 From: Bryan Massoth Date: Mon, 3 Feb 2025 15:12:09 -0800 Subject: [PATCH] Open source TPU-specific input pipeline analysis. PiperOrigin-RevId: 722815891 --- tensorflow/core/profiler/convert/BUILD | 27 +- .../op_stats_to_input_pipeline_analysis.cc | 828 +++++++++++++++++- .../op_stats_to_input_pipeline_analysis.h | 34 + ...p_stats_to_input_pipeline_analysis_test.cc | 204 +++++ .../profiler/protobuf/input_pipeline.proto | 3 +- 5 files changed, 1075 insertions(+), 21 deletions(-) create mode 100644 tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis_test.cc diff --git a/tensorflow/core/profiler/convert/BUILD b/tensorflow/core/profiler/convert/BUILD index 01c98502eb0654..564d4a4195947b 100644 --- a/tensorflow/core/profiler/convert/BUILD +++ b/tensorflow/core/profiler/convert/BUILD @@ -275,7 +275,9 @@ cc_library( copts = tf_profiler_copts(), deps = [ ":op_metrics_to_record", + ":profile_time_breakdown", ":step_events_to_steps_db", + ":tpu_input_pipeline_analysis_constants", "//tensorflow/core:lib", "//tensorflow/core:lib_internal", "//tensorflow/core/platform:logging", @@ -284,20 +286,43 @@ cc_library( "//tensorflow/core/profiler/protobuf:op_metrics_proto_cc", "//tensorflow/core/profiler/protobuf:op_stats_proto_cc", "//tensorflow/core/profiler/protobuf:steps_db_proto_cc", + "//tensorflow/core/profiler/protobuf:tpu_input_pipeline_proto_cc", "//tensorflow/core/profiler/utils:diagnostics", "//tensorflow/core/profiler/utils:event_span", - "//tensorflow/core/profiler/utils:hardware_type_utils", "//tensorflow/core/profiler/utils:html_utils", "//tensorflow/core/profiler/utils:math_utils", "//tensorflow/core/profiler/utils:op_metrics_db_utils", + "//tensorflow/core/profiler/utils:tpu_step_breakdown_utils", + "//tensorflow/core/profiler/utils:tpu_step_details_utils", "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/log", + "@com_google_absl//absl/log:check", "@com_google_absl//absl/strings", + "@com_google_absl//absl/strings:str_format", + "@local_xla//xla/hlo/ir:hlo", + "@local_xla//xla/tsl/profiler/convert:xla_op_utils", "@local_xla//xla/tsl/profiler/utils:format_utils", + "@local_xla//xla/tsl/profiler/utils:math_utils", "@local_xla//xla/tsl/profiler/utils:tf_op_utils", "@local_xla//xla/tsl/util:stats_calculator_portable", ], ) +tf_cc_test( + name = "op_stats_to_input_pipeline_analysis_test", + srcs = ["op_stats_to_input_pipeline_analysis_test.cc"], + deps = [ + ":op_stats_to_input_pipeline_analysis", + "//tensorflow/core:test", + "//tensorflow/core:test_main", + "//tensorflow/core/profiler/protobuf:steps_db_proto_cc", + "//tensorflow/core/profiler/utils:event_span", + "//tensorflow/core/profiler/utils:op_metrics_db_utils", + "@local_xla//xla/hlo/ir:hlo", + "@local_xla//xla/tsl/profiler/utils:timespan", + ], +) + cc_library( name = "op_stats_to_tf_stats", srcs = ["op_stats_to_tf_stats.cc"], diff --git a/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.cc b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.cc index bd21fae928c3de..119ccc6f1518b7 100644 --- a/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.cc +++ b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.cc @@ -18,6 +18,7 @@ limitations under the License. #include #include +#include #include #include #include @@ -25,17 +26,25 @@ limitations under the License. #include "google/protobuf/any.pb.h" #include "absl/container/flat_hash_map.h" +#include "absl/log/check.h" +#include "absl/log/log.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" +#include "xla/hlo/ir/hlo_opcode.h" +#include "xla/tsl/profiler/convert/xla_op_utils.h" #include "xla/tsl/profiler/utils/format_utils.h" +#include "xla/tsl/profiler/utils/math_utils.h" #include "xla/tsl/profiler/utils/tf_op_utils.h" #include "xla/tsl/util/stats_calculator.h" #include "tensorflow/core/lib/gtl/map_util.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/types.h" #include "tensorflow/core/profiler/convert/op_metrics_to_record.h" +#include "tensorflow/core/profiler/convert/profile_time_breakdown.h" #include "tensorflow/core/profiler/convert/step_events_to_steps_db.h" +#include "tensorflow/core/profiler/convert/tpu_input_pipeline_analysis_constants.h" #include "tensorflow/core/profiler/protobuf/hardware_types.pb.h" #include "tensorflow/core/profiler/protobuf/input_pipeline.pb.h" #include "tensorflow/core/profiler/protobuf/op_metrics.pb.h" @@ -43,10 +52,11 @@ limitations under the License. #include "tensorflow/core/profiler/protobuf/steps_db.pb.h" #include "tensorflow/core/profiler/utils/diagnostics.h" #include "tensorflow/core/profiler/utils/event_span.h" -#include "tensorflow/core/profiler/utils/hardware_type_utils.h" #include "tensorflow/core/profiler/utils/html_utils.h" #include "tensorflow/core/profiler/utils/math_utils.h" #include "tensorflow/core/profiler/utils/op_metrics_db_utils.h" +#include "tensorflow/core/profiler/utils/tpu_step_breakdown_utils.h" +#include "tensorflow/core/profiler/utils/tpu_step_details_utils.h" namespace tensorflow { namespace profiler { @@ -55,6 +65,22 @@ namespace { using tsl::profiler::OneDigit; +// If the percentage of step time that spends on SparseCoreV0 is more than +// kModeratelySparseCoreV0BoundThresholdInPercent, it is considered highly +// SparseCoreV0 bound. +constexpr double kModeratelySparseCoreV0BoundThresholdInPercent = 10; +// If the percentage of step time that spends on all-reduce is more than +// kAllReduceBoundThresholdInPercent, it is considered all-reduce bound. +constexpr double kAllReduceBoundThresholdInPercent = 6; +// If the percentage of step time that is idle due to host overhead (but not +// input-related) is >= kTcIdleThresholdInPercent, it will be highlighted in the +// recommendation section of the Overview Page. +constexpr double kTcIdleThresholdInPercent = 3; +// Public doc on how to run multiple steps in a tf-function. +constexpr absl::string_view kMultipleStepsInTffunctionDoc = + "https://www.tensorflow.org/guide/" + "tpu#improving_performance_by_multiple_steps_within_tffunction"; + const double kNumPsPerMs = 1000000000.0; // If the percentage of step time that is due to infeed is less than @@ -461,6 +487,560 @@ std::string DatasetIntroDoc() { return "https://www.tensorflow.org/guide/data"; } +struct WaitForScV0Breakdown { + uint64_t DurationPs() const { + return scv0_infeed_duration_ps + scv0_compute_duration_ps; + } + + uint64_t scv0_infeed_duration_ps = 0; + uint64_t scv0_compute_duration_ps = 0; +}; + +struct TcInfeed { + std::optional core_id; + uint64_t duration_ps = 0; +}; + +void ConvertGenericStepBreakdownToTpuStepBreakdown( + const tensorflow::profiler::GenericStepBreakdown& generic_step_breakdown, + uint64_t step_time_ps, TpuStepBreakdown& tpu_step_breakdown) { + auto& category_ps = generic_step_breakdown.category_ps(); + tensorflow::profiler::ProfileTimeBreakdown time_breakdown; + for (const auto& [category, time_ps] : category_ps) { + // Don't add idle time to time_breakdown as the idle time is inferred. + if (category == "IDLE") continue; + time_breakdown.IncrementCategoryTimePs(category, time_ps); + } + time_breakdown.SetProfileTimePs(step_time_ps); + time_breakdown.BreakdownSparseCoreV0Infeed(); + + tpu_step_breakdown.set_infeed_duration_ps(time_breakdown.InfeedTimePs()); + tpu_step_breakdown.set_host_outfeed_ps(time_breakdown.OutfeedTimePs()); + tpu_step_breakdown.set_wait_for_scv0_duration_ps( + time_breakdown.SparseCoreV0InfeedWaitTimePs()); + tpu_step_breakdown.set_scv0_infeed_transform_ps( + time_breakdown.SparseCoreV0InfeedTransformTimePs()); + tpu_step_breakdown.set_scv0_outfeed_ps( + time_breakdown.SparseCoreV0OutfeedTimePs()); + tpu_step_breakdown.set_crs_duration_ps( + time_breakdown.AllReduceOrAllToAllTimePs()); + tpu_step_breakdown.set_send_duration_ps(time_breakdown.SendTimePs()); + tpu_step_breakdown.set_recv_duration_ps(time_breakdown.RecvTimePs()); + tpu_step_breakdown.set_host_send_duration_ps(time_breakdown.HostSendTimePs()); + tpu_step_breakdown.set_host_recv_duration_ps(time_breakdown.HostRecvTimePs()); + tpu_step_breakdown.set_wait_for_megacore_fusion_peer_duration_ps( + time_breakdown.MegacoreFusionTimePs()); + tpu_step_breakdown.set_high_flops_compute_ps( + time_breakdown.HighFlopsComputeTimePs()); + tpu_step_breakdown.set_tc_idle_ps(time_breakdown.IdleTimePs()); + tpu_step_breakdown.set_tc_busy_ps(time_breakdown.TensorCoreBusyTimePs()); +} +// Computes the fields in PerStepData by considering the different StepInfos +// of the same step across cores. +PerTpuStepDetails ComputeTpuPerStepDataAcrossCores( + const PerCoreStepInfo& coreid_stepinfo_map, + const protobuf::Map& + core_details_map) { + PerTpuStepDetails per_step_data; + + PerCoreAllReduceBreakdown all_reduce_breakdown = + ComputePerStepAllReduceBreakdownAcrossCores(coreid_stepinfo_map); + + tsl::Stat infeed_percent_stats; + tsl::Stat step_stats_in_ps; + tsl::Stat optimal_step_time_ps; + // Take the average TC outfeed time in result. + tsl::Stat tc_outfeed_time_in_ps; + tsl::Stat sc_optimal_step_time_ps; + tsl::Stat sc_step_stats_in_ps; + tsl::Stat sc_outfeed_time_in_ps; + tsl::Stat sc_infeed_time_in_ps; + tsl::Stat sc_idle_time_in_ps; + + tsl::Stat host_send_recv_time_ps; + + // For the core with the max wait-for-scv0 duration, breakdown to compute and + // infeed time. + WaitForScV0Breakdown max_wait_for_scv0; + + TcInfeed max_infeed; + + // For the core with the max all reduce duration, breakdown to compute and + // synchronization time. + AllReduceBreakdown max_all_reduce; + + per_step_data.set_step_number(-1); + auto process_step_for_sc = + [&](const tensorflow::profiler::StepInfoResult& step_info, + const SparseCoreStepBreakdown& sc_step) { + if (per_step_data.step_number() < 0) { + per_step_data.set_step_number(step_info.step_num()); + } else { + if (per_step_data.step_number() != step_info.step_num()) { + VLOG(1) << "Inconsistent step numbers across cores (" + << per_step_data.step_number() << " vs. " + << step_info.step_num() << ")."; + } + } + sc_step_stats_in_ps.UpdateStat(step_info.duration_ps()); + sc_outfeed_time_in_ps.UpdateStat(sc_step.sc_outfeed_ps()); + sc_infeed_time_in_ps.UpdateStat(sc_step.sc_infeed_ps()); + sc_optimal_step_time_ps.UpdateStat(step_info.duration_ps() - + sc_step.sc_infeed_ps() - + sc_step.sc_outfeed_ps()); + sc_idle_time_in_ps.UpdateStat(sc_step.sc_idle_ps()); + }; + for (const auto& [core_id, step_info] : + coreid_stepinfo_map.step_info_per_core()) { + // iterates over each core. + TpuStepBreakdown tpu; + if (!step_info.step_breakdown().UnpackTo(&tpu)) { + VLOG(1) << "Unable to unpack step_breakdown from tpu, try unpacking from " + "generic"; + tensorflow::profiler::GenericStepBreakdown generic_step_breakdown; + if (!step_info.step_breakdown().UnpackTo(&generic_step_breakdown)) { + SparseCoreStepBreakdown sc_step; + if (step_info.step_breakdown().UnpackTo(&sc_step)) { + process_step_for_sc(step_info, sc_step); + continue; + } else { + LOG(ERROR) << "Unable to unpack step_breakdown from " + "GenericStepBreakdown or SparseCoreStepBreakdown"; + // TODO(b/302086111): Switch back to DFATAL once absl is updated. + DCHECK(false); + return per_step_data; + } + } + if (core_id >= kSparseCoreIndexStart) { + // Sparse core step breakdown from xspace. + uint64_t total_time_ps = step_info.duration_ps(); + uint64_t idle_time_ps = + generic_step_breakdown.category_ps().find("IDLE")->second; + sc_step_stats_in_ps.UpdateStat(total_time_ps); + sc_idle_time_in_ps.UpdateStat(idle_time_ps); + continue; + } else { + // Tensor core step breakdown from xspace. + ConvertGenericStepBreakdownToTpuStepBreakdown( + generic_step_breakdown, step_info.duration_ps(), tpu); + } + } + step_stats_in_ps.UpdateStat(step_info.duration_ps()); + if (tpu.wait_for_scv0_duration_ps() > max_wait_for_scv0.DurationPs()) { + max_wait_for_scv0.scv0_infeed_duration_ps = ScV0InfeedDurationPs(tpu); + max_wait_for_scv0.scv0_compute_duration_ps = ScV0ComputeDurationPs(tpu); + } + + tc_outfeed_time_in_ps.UpdateStat(tpu.host_outfeed_ps()); + + const AllReduceBreakdown& breakdown = all_reduce_breakdown[core_id]; + if (breakdown.DurationPs() > max_all_reduce.DurationPs()) { + max_all_reduce = breakdown; + } + + infeed_percent_stats.UpdateStat(100.0 * TcPlusScV0InfeedDurationPs(tpu) / + step_info.duration_ps()); + // The optimal step time is the actual step time minus the time tensor + // core spends waiting for host or sparsecorev0 (but not other tensor + // cores). + optimal_step_time_ps.UpdateStat(step_info.duration_ps() - + WaitForHostOrScV0DurationPs(tpu)); + host_send_recv_time_ps.UpdateStat(HostSendRecvDurationPs(tpu)); + + if (per_step_data.step_number() < 0) { + // Sets the step number of the current step from the first core. + per_step_data.set_step_number(step_info.step_num()); + } else { + // The step number of the current step is already set. Checks if it is + // the same across cores. In case of multi-host tracing, we may have + // some inconsistent steps as tracing is not exactly guaranteed to be + // synchronized across all hosts. + if (per_step_data.step_number() != step_info.step_num()) { + VLOG(1) << "Inconsistent step numbers across cores (" + << per_step_data.step_number() << " vs. " + << step_info.step_num() << ")."; + } + } + if (tpu.infeed_duration_ps() > max_infeed.duration_ps) { + max_infeed.core_id = core_id; + max_infeed.duration_ps = tpu.infeed_duration_ps(); + } + } + + per_step_data.set_tc_outfeed_time_ms( + tsl::profiler::PicoToMilli(tc_outfeed_time_in_ps.avg())); + // The TC compute time is the minimum of the optimal step time across cores. + per_step_data.set_tc_compute_time_ms( + tsl::profiler::PicoToMilli(optimal_step_time_ps.min())); + per_step_data.set_host_transfer_ms( + tsl::profiler::PicoToMilli(host_send_recv_time_ps.max())); + // TODO(b/153730997): Use the maximum step time. + // The infeed time is the step time across cores minus all other times. + // Previously, we used the maximum step time but changed to use the minimum + // step time to work around b/153730997. + // Uses the max TC infeed duration across cores as the step's TC infeed + // duration. + per_step_data.set_tc_infeed_time_ms( + tsl::profiler::PicoToMilli(max_infeed.duration_ps)); + if (max_infeed.core_id.has_value()) { + per_step_data.set_coreid_max_infeed_time(max_infeed.core_id.value()); + if (core_details_map.contains(max_infeed.core_id.value())) { + const CoreDetails& core_details = + core_details_map.at(max_infeed.core_id.value()); + per_step_data.set_max_infeed_time_core_name(absl::StrCat( + core_details.hostname(), ":", core_details.device_ordinal())); + } + } + + per_step_data.set_scv0_compute_time_ms( + tsl::profiler::PicoToMilli(max_wait_for_scv0.scv0_compute_duration_ps)); + per_step_data.set_scv0_infeed_time_ms( + tsl::profiler::PicoToMilli(max_wait_for_scv0.scv0_infeed_duration_ps)); + + // The TC idle time is the time TC spends waiting for the host but not + // waiting for input. + per_step_data.set_tc_idle_time_ms( + tsl::profiler::PicoToMilli(step_stats_in_ps.min()) - + NonIdleTimeMs(per_step_data)); + if (per_step_data.tc_idle_time_ms() < 0) { + per_step_data.set_tc_idle_time_ms(0); + } + + per_step_data.set_all_reduce_compute_time_ms( + tsl::profiler::PicoToMilli(max_all_reduce.compute_duration_ps)); + per_step_data.set_all_reduce_sync_time_ms( + tsl::profiler::PicoToMilli(max_all_reduce.sync_duration_ps)); + + per_step_data.set_infeed_percent_average(infeed_percent_stats.avg()); + per_step_data.set_infeed_percent_minimum(infeed_percent_stats.min()); + per_step_data.set_infeed_percent_maximum(infeed_percent_stats.max()); + + per_step_data.set_sc_infeed_time_ms( + tsl::profiler::PicoToMilli(sc_infeed_time_in_ps.avg())); + per_step_data.set_sc_outfeed_time_ms( + tsl::profiler::PicoToMilli(sc_outfeed_time_in_ps.avg())); + per_step_data.set_sc_compute_time_ms( + tsl::profiler::PicoToMilli(sc_optimal_step_time_ps.min())); + per_step_data.set_sc_idle_time_ms( + tsl::profiler::PicoToMilli(sc_idle_time_in_ps.avg())); + per_step_data.set_sc_step_time_ms( + tsl::profiler::PicoToMilli(sc_step_stats_in_ps.avg())); + if (per_step_data.sc_idle_time_ms() < 0) { + per_step_data.set_sc_idle_time_ms(0); + } + return per_step_data; +} + +TpuStepTimeBreakdown ComputeTpuStepTimeBreakdownInMs( + const InputPipelineAnalysisResult& analysis, bool has_sparse_core) { + tsl::Stat tc_compute_ms; + tsl::Stat tc_infeed_ms; + tsl::Stat tc_outfeed_ms; + tsl::Stat tc_idle_ms; + tsl::Stat scv0_compute_ms; + tsl::Stat scv0_infeed_ms; + tsl::Stat host_transfer_ms; + tsl::Stat sc_compute_ms; + tsl::Stat sc_infeed_ms; + tsl::Stat sc_outfeed_ms; + tsl::Stat sc_idle_ms; + tsl::Stat sc_step_time_ms; + TpuStepTimeBreakdown result; + + for (const google::protobuf::Any& step_details : analysis.step_details()) { + PerTpuStepDetails details; + if (!step_details.UnpackTo(&details)) { + LOG(ERROR) << "Unable to unpack step_details. Expected: tpu"; + // TODO(b/302086111): Switch back to DFATAL once absl is updated. + DCHECK(false); + return result; + } + tc_compute_ms.UpdateStat(details.tc_compute_time_ms()); + tc_idle_ms.UpdateStat(details.tc_idle_time_ms()); + tc_infeed_ms.UpdateStat(details.tc_infeed_time_ms()); + tc_outfeed_ms.UpdateStat(details.tc_outfeed_time_ms()); + scv0_compute_ms.UpdateStat(details.scv0_compute_time_ms()); + scv0_infeed_ms.UpdateStat(details.scv0_infeed_time_ms()); + host_transfer_ms.UpdateStat(details.host_transfer_ms()); + sc_compute_ms.UpdateStat(details.sc_compute_time_ms()); + sc_idle_ms.UpdateStat(details.sc_idle_time_ms()); + sc_infeed_ms.UpdateStat(details.sc_infeed_time_ms()); + sc_outfeed_ms.UpdateStat(details.sc_outfeed_time_ms()); + sc_step_time_ms.UpdateStat(details.sc_step_time_ms()); + } + *result.mutable_tc_compute_ms_summary() = + GetStepSummaryForSampleStats(tc_compute_ms); + *result.mutable_scv0_compute_ms_summary() = + GetStepSummaryForSampleStats(scv0_compute_ms); + *result.mutable_tc_infeed_ms_summary() = + GetStepSummaryForSampleStats(tc_infeed_ms); + *result.mutable_tc_outfeed_ms_summary() = + GetStepSummaryForSampleStats(tc_outfeed_ms); + *result.mutable_scv0_infeed_ms_summary() = + GetStepSummaryForSampleStats(scv0_infeed_ms); + *result.mutable_tc_idle_ms_summary() = + GetStepSummaryForSampleStats(tc_idle_ms); + *result.mutable_host_transfer_ms_summary() = + GetStepSummaryForSampleStats(host_transfer_ms); + if (has_sparse_core) { + auto* sparse_core_step_summary = result.mutable_sparse_core_step_summary(); + *sparse_core_step_summary->mutable_sc_compute_ms_summary() = + GetStepSummaryForSampleStats(sc_compute_ms); + *sparse_core_step_summary->mutable_sc_infeed_ms_summary() = + GetStepSummaryForSampleStats(sc_infeed_ms); + *sparse_core_step_summary->mutable_sc_outfeed_ms_summary() = + GetStepSummaryForSampleStats(sc_outfeed_ms); + *sparse_core_step_summary->mutable_sc_idle_ms_summary() = + GetStepSummaryForSampleStats(sc_idle_ms); + *sparse_core_step_summary->mutable_sc_step_time_ms_summary() = + GetStepSummaryForSampleStats(sc_step_time_ms); + } + return result; +} + +// Given the step sequence on each core, computes the result proto of the +// input-pipeline analysis tool (the InputPipelineAnalysisResult defined in +// input_pipeline.proto). +// Note on grouped_by_step: There is one element for each step executed (on +// multiple cores). Each element is a map from the core_id to the information +// of the step that runs on that core. Elements are in the same order that the +// steps are executed over time. +InputPipelineAnalysisResult ComputeTpuInputPipelineAnalysisResult( + const protobuf::RepeatedPtrField& grouped_by_step, + const protobuf::Map& + core_details_map) { + InputPipelineAnalysisResult result; + bool has_sparse_core = false; + + // Computes the summary of step time in ms. + *result.mutable_step_time_summary() = + ComputeStepTimeSummaryInMs(grouped_by_step); + + // Summary of the statistics of infeed time as percentage of the step + // time. + tsl::Stat infeed_summary_stats_in_percent; + for (const auto& coreid_stepinfo_map : grouped_by_step) { + // Compute each TPU step stats. + const PerTpuStepDetails& per_step_data = + ComputeTpuPerStepDataAcrossCores(coreid_stepinfo_map, core_details_map); + result.add_step_details()->PackFrom(per_step_data); + + // The infeed summary is based on the maximum infeed time across cores at + // each step. + infeed_summary_stats_in_percent.UpdateStat( + per_step_data.infeed_percent_maximum()); + // Since core_details_map only contains tensor core data, we can use it to + // see if more cores have steps (aka sparse cores are present in the chip). + has_sparse_core |= (core_details_map.size() < + coreid_stepinfo_map.step_info_per_core().size()); + } + + // Computes the summary of infeed time as percentage of step time. + *result.mutable_input_percent_summary() = + GetStepSummaryForSampleStats(infeed_summary_stats_in_percent); + + // Computes the breakdown of step time + TpuStepTimeBreakdown tpu_step_time_breakdown = + ComputeTpuStepTimeBreakdownInMs(result, has_sparse_core); + result.mutable_step_time_breakdown()->PackFrom(tpu_step_time_breakdown); + result.set_tag(true); + + return result; +} + +// Returns true if device_op_metrics_db contains an infeed op. +bool HasTpuInfeedOp(const OpMetricsDb& device_op_metrics_db) { + for (const OpMetrics& metrics : device_op_metrics_db.metrics_db()) { + if (tsl::profiler::IsHostOrSparseCoreV0Infeed(metrics.category())) { + return true; + } + } + return false; +} + +// Returns the time spent waiting for input for generic hardware. +uint64_t TotalInputPs(const StepDetails& step_details) { + uint64_t total_input_ps = 0; + for (const auto& event : step_details.Events()) { + if (event.type == HOST_WAIT_INPUT || event.type == HOST_TO_DEVICE) { + // Includes both the time where the host was waiting input and the time + // where the host was sending data to the device. + total_input_ps += event.span.duration_ps(); + } + } + return total_input_ps; +} + +void TensorCoreIdleAnalysis(bool all_cores_profiled, double tc_idle_percent, + std::string* input_classification, + std::string* input_statement, + std::string* tc_idle_classification, + std::string* tc_idle_statement) { + // In MayFixTpuStepAnalysis(), we have already separated the idle time from + // the input time. So, we don't need to substract the input time from the + // idle time here. + if (tc_idle_percent < kTcIdleThresholdInPercent) { + *tc_idle_classification = "no"; + *tc_idle_statement = ""; + return; + } + std::string idle_percent_str = absl::StrFormat("%.1lf", tc_idle_percent); + if (all_cores_profiled) { + // Significant idle time with all cores profiled. + *tc_idle_classification = "yes"; + *tc_idle_statement = + absl::StrCat(idle_percent_str, + " % of the total step time sampled is due to host " + "overhead that is not input-related. For TF 2.x, you may " + "want to use a ", + AnchorElement(kMultipleStepsInTffunctionDoc, + "host-training loop (i.e. running multiple " + "steps within a tf.function).")); + return; + } + + // Significant idle time without all cores profiled. + if (*input_classification == "host") { + // We've already identified that it is input bound. So, no need to issue + // more warnings. + *tc_idle_classification = "no"; + *tc_idle_statement = ""; + return; + } + + *input_classification = "host"; // focuses on "host" first. + *input_statement = absl::StrCat( + "Your program COULD be input-bound because ", idle_percent_str, + "% of the total step time is idle. This may be a manifestation of an " + "input issue on a worker " + "machine that was not profiled. To be certain, please profile ALL " + "worker machines in your job by following ", + AnchorElement(kProfileAllHostsDoc, "this instruction.")); + *tc_idle_classification = "no"; + *tc_idle_statement = ""; +} + +void AllReduceAnalysis(bool all_cores_profiled, + double all_reduce_compute_percent, + double all_reduce_sync_percent, double input_percent, + std::string* input_classification, + std::string* input_statement, + std::string* all_reduce_classification, + std::string* all_reduce_statement) { + double all_reduce_percent = + all_reduce_compute_percent + all_reduce_sync_percent; + // Since all-reduce time is overlapped with the input time, we consider the + // all-reduce time that is not input related. + double all_reduce_not_input_related_percent = + all_reduce_percent - input_percent; + + if (all_reduce_not_input_related_percent < + kAllReduceBoundThresholdInPercent) { + // Insignificant time spent on all-reduce. + *all_reduce_classification = "no"; + *all_reduce_statement = ""; + return; + } + + if (all_cores_profiled) { + // Significant time spent on all-reduce with all cores profiled. + std::string all_reduce_compute_percent_str = + absl::StrFormat("%.1lf", all_reduce_compute_percent); + std::string all_reduce_sync_percent_str = + absl::StrFormat("%.1lf", all_reduce_sync_percent); + *all_reduce_classification = "yes"; + *all_reduce_statement = absl::StrCat( + "Also, ", all_reduce_sync_percent_str, + " % of the total step time sampled is spent on synchronization with " + "other TPU cores, and ", + all_reduce_compute_percent_str, + " % of the total step time sampled is spent on actual AllReduce."); + return; + } + + // Significant time spent on all-reduce and not all cores were profiled. + std::string all_reduce_percent_str = + absl::StrFormat("%.1lf", all_reduce_percent); + + if (*input_classification != "device") { + // InputAnalysis() already indicates some potential input issue. So, we + // can focus on all-reduce performance. + *all_reduce_classification = "yes"; + *all_reduce_statement = absl::StrCat( + "Also, ", all_reduce_percent_str, + " % of the total step time sampled is spent on synchronization " + "with " + "other TPU cores and AllReduce. Not all worker machines are " + "profiled, " + "therefore " + "we " + "cannot disambiguate the actual time for AllReduce from the " + "synchronization. To be certain, please profile ALL " + "worker machines in your job by following ", + AnchorElement(kProfileAllHostsDoc, "this instruction.")); + return; + } + + // InputAnalysis() indicates that it is NOT input-bound. However, it may + // be because the input delay is manifested as all-reduce time. So, + // attribute it to a possible input issue. + *input_classification = "host"; // focuses on "host" first. + *input_statement = absl::StrCat( + "Your program COULD be input-bound because ", all_reduce_percent_str, + "% of the total step time is spent on synchronization with other " + "TPU cores. This may be a manifestation of an input issue on a " + "worker " + "machine that was not profiled. To be certain, please profile ALL " + "worker machines in your job by following ", + AnchorElement(kProfileAllHostsDoc, "this instruction.")); + *all_reduce_classification = "no"; + *all_reduce_statement = ""; +} + +void ScV0Analysis(double scv0_percent, std::string* scv0_classification, + std::string* scv0_statement) { + if (scv0_percent == 0) { + *scv0_classification = "no"; + *scv0_statement = ""; + return; + } + std::string scv0_percent_str = absl::StrFormat("%.1lf", scv0_percent); + if (scv0_percent < kModeratelySparseCoreV0BoundThresholdInPercent) { + *scv0_classification = "moderate"; + *scv0_statement = absl::StrCat( + "Also, ", scv0_percent_str, + " % of the total step time sampled is spent on the ", kSparseCoreV0Name, + " compute. You may also want to reduce the ", kSparseCoreV0Name, + " compute time."); + return; + } + *scv0_classification = "high"; + *scv0_statement = absl::StrCat( + "Also, ", scv0_percent_str, + " % of the total step time sampled is spent on the ", kSparseCoreV0Name, + " compute. You should focus on reducing the ", kSparseCoreV0Name, + " compute time as well."); +} + +// A map keeps track of the minimum value associated with an id. +class MinMap { + public: + void Observe(uint64_t id, uint64_t value) { + auto [iter, inserted] = min_map_.try_emplace(id, value); + if (!inserted && iter->second > value) { + iter->second = value; + } + } + + uint64_t Min(uint64_t id) const { + auto iter = min_map_.find(id); + return (iter != min_map_.end()) ? iter->second : 0; + } + + private: + absl::flat_hash_map min_map_; +}; + } // namespace StepSummary GetStepSummaryForSampleStats( @@ -484,6 +1064,198 @@ StepSummary GetStepSummaryForSampleStats( return step_time_summary; } +PerCoreAllReduceBreakdown ComputePerStepAllReduceBreakdownAcrossCores( + const PerCoreStepInfo& coreid_stepinfo_map) { + PerCoreAllReduceBreakdown result; + MinMap min_duration_map; + for (const auto& [core_id, all_reduce_db] : + coreid_stepinfo_map.all_reduce_db_per_core()) { + for (const auto& all_reduce : all_reduce_db.all_reduce_info()) { + uint64_t duration_ps = + all_reduce.end_time_ps() - all_reduce.start_time_ps(); + min_duration_map.Observe(all_reduce.id(), duration_ps); + } + } + for (const auto& [core_id, all_reduce_db] : + coreid_stepinfo_map.all_reduce_db_per_core()) { + AllReduceBreakdown& breakdown = result[core_id]; + for (const auto& all_reduce : all_reduce_db.all_reduce_info()) { + uint64_t duration_ps = + all_reduce.end_time_ps() - all_reduce.start_time_ps(); + uint64_t min_duration_ps = min_duration_map.Min(all_reduce.id()); + breakdown.compute_duration_ps += min_duration_ps; + breakdown.sync_duration_ps += duration_ps - min_duration_ps; + } + } + return result; +} + +void MayFixTpuStepAnalysis( + const StepEvents& host_step_events, const OpMetricsDb& device_op_metrics_db, + StepDatabaseResult& step_db, + const protobuf::Map& + core_details_map) { + // This code is only applicable when input is received by the tensor core + // from the host without the use of infeed. If the tensor core receives + // input via host infeed or via sparsecorev0 infeed, there's nothing to do. + if (HasTpuInfeedOp(device_op_metrics_db)) return; + + for (PerCoreStepInfo& per_core_step_info : + *(step_db.mutable_step_sequence())) { + uint32_t step_num = per_core_step_info.step_num(); + // TODO(ckluk): step_num is obtained from tf_op_stats, which is based on the + // step-tracking mechanism with the on-device training loop. However, this + // step_num is different from the group_id. So, what we are doing here is + // only an approximation, assuming that all steps exhibit similar + // breakdown. Once grouping works on TPU device, we need to replace step_num + // by the group_id from TPU device. + const StepDetails* step_details = + gtl::FindOrNull(host_step_events, step_num); + if (step_details == nullptr) { + continue; // step_num not in host_step_events, we don't know how to fix. + } + uint64_t total_input_ps = TotalInputPs(*step_details); + if (total_input_ps == 0) { + continue; // no host input events. + } + PerTpuStepDetails tpu_step_data = + ComputeTpuPerStepDataAcrossCores(per_core_step_info, core_details_map); + double tc_idle_ms = tpu_step_data.tc_idle_time_ms(); + double adjusted_input_ratio = + std::min(tsl::profiler::SafeDivide( + tsl::profiler::PicoToMilli(total_input_ps), tc_idle_ms), + 1.0); + for (auto& [core_id, step_info] : + *per_core_step_info.mutable_step_info_per_core()) { + // skip sparse cores for this. + if (core_id >= kSparseCoreIndexStart) continue; + TpuStepBreakdown tpu; + if (TpuStepBreakdown tpu; step_info.step_breakdown().UnpackTo(&tpu)) { + DCHECK_EQ(tpu.infeed_duration_ps(), 0); + if (tpu.tc_idle_ps() > 0) { + // Extract the infeed fraction of idle time. + tpu.set_infeed_duration_ps(tpu.tc_idle_ps() * adjusted_input_ratio); + tpu.set_tc_idle_ps(tpu.tc_idle_ps() - tpu.infeed_duration_ps()); + step_info.mutable_step_breakdown()->PackFrom(tpu); + } + } else if (tensorflow::profiler::GenericStepBreakdown generic; + step_info.step_breakdown().UnpackTo(&generic)) { + uint64_t& infeed_time_ps = + (*generic.mutable_category_ps())[xla::HloOpcodeString( + xla::HloOpcode::kInfeed)]; + uint64_t& idle_time_ps = + (*generic.mutable_category_ps())[tensorflow::profiler::kIdle]; + DCHECK_EQ(infeed_time_ps, 0); + if (idle_time_ps > 0) { + infeed_time_ps = idle_time_ps * adjusted_input_ratio; + idle_time_ps -= infeed_time_ps; + step_info.mutable_step_breakdown()->PackFrom(generic); + } + } else { + // Likely encountered an ScStepBreakdown instance which can be skipped + // as we only care about attributing TC idle time to host. + LOG(INFO) << "Unable to unpack step_breakdown."; + } + } + } +} + +TpuBottleneckAnalysis ComputeTpuBottleneckAnalysis( + bool all_cores_profiled, const InputPipelineAnalysisResult& result) { + double total_step_time_ms = 0; + double total_infeed_time_ms = 0; + double total_tc_outfeed_time_ms = 0; + double total_scv0_compute_time_ms = 0; + double total_all_reduce_compute_time_ms = 0; + double total_all_reduce_sync_time_ms = 0; + double total_tc_idle_time_ms = 0; + + TpuBottleneckAnalysis analysis; + for (const google::protobuf::Any& step_details : result.step_details()) { + PerTpuStepDetails details; + if (!step_details.UnpackTo(&details)) { + LOG(ERROR) << "Unable to unpack step_details. Expected: tpu"; + // TODO(b/302086111): Switch back to DFATAL once absl is updated. + DCHECK(false); + return analysis; + } + total_step_time_ms += StepTimeMs(details); + total_infeed_time_ms += InfeedTimeMs(details); + total_tc_outfeed_time_ms += details.tc_outfeed_time_ms(); + total_scv0_compute_time_ms += details.scv0_compute_time_ms(); + total_all_reduce_compute_time_ms += details.all_reduce_compute_time_ms(); + total_all_reduce_sync_time_ms += details.all_reduce_sync_time_ms(); + total_tc_idle_time_ms += details.tc_idle_time_ms(); + } + if (total_step_time_ms == 0) { + analysis.set_input_classification("unknown"); + analysis.set_input_statement( + "No step time measured. Therefore we cannot tell where the performance " + "bottleneck is."); + analysis.set_tc_idle_classification("no"), + analysis.set_tc_idle_statement(""); + analysis.set_scv0_classification("no"); + analysis.set_scv0_statement(""); + analysis.set_all_reduce_classification("no"); + analysis.set_all_reduce_statement(""); + return analysis; + } + + double infeed_percent = 100.0 * total_infeed_time_ms / total_step_time_ms; + std::string input_classification; + std::string input_statement; + InputAnalysis(infeed_percent, /*all_other_percent=*/0, &input_classification, + &input_statement); + + double tc_outfeed_percent = + 100.0 * total_tc_outfeed_time_ms / total_step_time_ms; + std::string output_classification; + std::string output_statement; + OutputAnalysis(tc_outfeed_percent, &output_classification, &output_statement); + + double tc_idle_percent = 100.0 * total_tc_idle_time_ms / total_step_time_ms; + std::string tc_idle_classification; + std::string tc_idle_statement; + TensorCoreIdleAnalysis(all_cores_profiled, tc_idle_percent, + &input_classification, &input_statement, + &tc_idle_classification, &tc_idle_statement); + + double all_reduce_compute_percent = + 100.0 * total_all_reduce_compute_time_ms / total_step_time_ms; + double all_reduce_sync_percent = + 100.0 * total_all_reduce_sync_time_ms / total_step_time_ms; + std::string all_reduce_classification; + std::string all_reduce_statement; + AllReduceAnalysis(all_cores_profiled, all_reduce_compute_percent, + all_reduce_sync_percent, infeed_percent, + &input_classification, &input_statement, + &all_reduce_classification, &all_reduce_statement); + + double scv0_percent = 100.0 * total_scv0_compute_time_ms / total_step_time_ms; + std::string scv0_classification; + std::string scv0_statement; + ScV0Analysis(scv0_percent, &scv0_classification, &scv0_statement); + + // compute_percent includes both TC and ScV0 compute. + double compute_percent = std::max( + 0.0, 100.0 - infeed_percent - tc_outfeed_percent - tc_idle_percent); + + analysis.set_compute_percent(compute_percent); + analysis.set_input_percent(infeed_percent); + analysis.set_output_percent(tc_outfeed_percent); + analysis.set_tc_idle_percent(tc_idle_percent); + analysis.set_input_classification(input_classification); + analysis.set_input_statement(input_statement); + analysis.set_output_statement(output_statement); + analysis.set_tc_idle_classification(tc_idle_classification), + analysis.set_tc_idle_statement(tc_idle_statement); + analysis.set_scv0_classification(scv0_classification); + analysis.set_scv0_statement(scv0_statement); + analysis.set_all_reduce_classification(all_reduce_classification); + analysis.set_all_reduce_statement(all_reduce_statement); + return analysis; +} + void GenerateHostResult(const OpMetricsDb& host_tf_metrics_db, InputPipelineAnalysisResult* result) { InputOpMetrics input_op_metrics = SelectInputOpMetrics(host_tf_metrics_db); @@ -609,26 +1381,46 @@ StepSummary ComputeStepTimeSummaryInMs( InputPipelineAnalysisResult ConvertOpStatsToInputPipelineAnalysis( const OpStats& op_stats) { - InputPipelineAnalysisResult result = - ComputeGenericInputPipelineAnalysisResult( - op_stats.step_db().step_sequence()); + const HardwareType hardware_type = op_stats.run_environment().hardware_type(); + + InputPipelineAnalysisResult result; + if (hardware_type == tensorflow::profiler::TPU) { + result = ComputeTpuInputPipelineAnalysisResult( + op_stats.step_db().step_sequence(), op_stats.core_id_to_details()); + } else { + result = ComputeGenericInputPipelineAnalysisResult( + op_stats.step_db().step_sequence()); + } + result.set_hardware_type(HardwareType_Name(hardware_type)); + PopulateStepDiagnostics(op_stats, result.mutable_diagnostics()); - result.set_hardware_type(HardwareType_Name( - ParseHardwareType(op_stats.run_environment().device_type()))); GenerateHostResult(op_stats.host_op_metrics_db(), &result); InputPipelineAnalysisRecommendation recommendation = GenerateRecommendation(); - BottleneckAnalysis bottleneck_analysis = ComputeBottleneckAnalysis( - result.input_time_breakdown(), result.step_details()); - result.set_input_percent(bottleneck_analysis.input_percent()); - result.set_output_percent(bottleneck_analysis.output_percent()); - result.set_idle_percent(bottleneck_analysis.idle_percent()); - result.set_compute_percent(bottleneck_analysis.compute_percent()); - - recommendation.mutable_bottleneck_analysis()->PackFrom(bottleneck_analysis); - *recommendation.mutable_summary_next_step() = - GetSummaryNextStep(bottleneck_analysis.input_classification(), - result.input_time_breakdown()); + if (hardware_type == tensorflow::profiler::TPU) { + TpuBottleneckAnalysis bottleneck_analysis = ComputeTpuBottleneckAnalysis( + /*all_cores_profiled=*/true, result); + result.set_input_percent(bottleneck_analysis.input_percent()); + result.set_output_percent(bottleneck_analysis.output_percent()); + result.set_idle_percent(bottleneck_analysis.tc_idle_percent()); + result.set_compute_percent(bottleneck_analysis.compute_percent()); + + recommendation.mutable_bottleneck_analysis()->PackFrom(bottleneck_analysis); + *recommendation.mutable_summary_next_step() = + GetSummaryNextStep(bottleneck_analysis.input_classification(), + result.input_time_breakdown()); + } else { + BottleneckAnalysis bottleneck_analysis = ComputeBottleneckAnalysis( + result.input_time_breakdown(), result.step_details()); + result.set_input_percent(bottleneck_analysis.input_percent()); + result.set_output_percent(bottleneck_analysis.output_percent()); + result.set_idle_percent(bottleneck_analysis.idle_percent()); + result.set_compute_percent(bottleneck_analysis.compute_percent()); + recommendation.mutable_bottleneck_analysis()->PackFrom(bottleneck_analysis); + *recommendation.mutable_summary_next_step() = + GetSummaryNextStep(bottleneck_analysis.input_classification(), + result.input_time_breakdown()); + } *result.mutable_recommendation() = recommendation; return result; @@ -702,7 +1494,7 @@ void OutputAnalysis(double output_percent, std::string* output_classification, BottleneckAnalysis ComputeBottleneckAnalysis( const InputTimeBreakdown& input_time_breakdown, - const ::tensorflow::protobuf::RepeatedPtrField<::google::protobuf::Any>& + const protobuf::RepeatedPtrField<::google::protobuf::Any>& any_step_details) { double total_step_time_ms = 0; double total_input_ms = 0; diff --git a/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h index c9de162eb8c058..1c2165a8fe0fc7 100644 --- a/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h +++ b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h @@ -16,9 +16,11 @@ limitations under the License. #ifndef TENSORFLOW_CORE_PROFILER_CONVERT_OP_STATS_TO_INPUT_PIPELINE_ANALYSIS_H_ #define TENSORFLOW_CORE_PROFILER_CONVERT_OP_STATS_TO_INPUT_PIPELINE_ANALYSIS_H_ +#include #include #include "google/protobuf/any.pb.h" +#include "absl/container/flat_hash_map.h" #include "absl/strings/string_view.h" #include "xla/tsl/util/stats_calculator.h" #include "tensorflow/core/platform/protobuf.h" @@ -28,10 +30,29 @@ limitations under the License. #include "tensorflow/core/profiler/protobuf/op_metrics.pb.h" #include "tensorflow/core/profiler/protobuf/op_stats.pb.h" #include "tensorflow/core/profiler/protobuf/steps_db.pb.h" +#include "tensorflow/core/profiler/protobuf/tpu_input_pipeline.pb.h" +#include "tensorflow/core/profiler/utils/event_span.h" namespace tensorflow { namespace profiler { +struct AllReduceBreakdown { + uint64_t compute_duration_ps = 0; + uint64_t sync_duration_ps = 0; + + uint64_t DurationPs() const { return compute_duration_ps + sync_duration_ps; } +}; + +// Used to store AllReduceBreakdown per core id. Just an alias for user +// convenience. +using PerCoreAllReduceBreakdown = + absl::flat_hash_map; + +// Breakdown AllReduce time into synchronization time and actual compute time +// for each core and step. +PerCoreAllReduceBreakdown ComputePerStepAllReduceBreakdownAcrossCores( + const PerCoreStepInfo& coreid_stepinfo_map); + StepSummary GetStepSummaryForSampleStats(const tsl::Stat& sample_stats); // If the percent of input-time spent on host-to-device transfer is greater than @@ -55,6 +76,19 @@ void GenerateHostResult(const OpMetricsDb& host_tf_metrics_db, InputPipelineAnalysisRecommendation GenerateRecommendation(); +// For TPU, we may have mis-regarded some host overhead as idle time. +// This function checks if this is the case using host_step_events. If this is, +// it will do the correction in op_stats. +void MayFixTpuStepAnalysis( + const StepEvents& host_step_events, const OpMetricsDb& device_op_metrics_db, + StepDatabaseResult& step_db, + const protobuf::Map& core_details_map); + +// Returns a struct that describes the performance bottleneck of the +// program executed on TPU. +TpuBottleneckAnalysis ComputeTpuBottleneckAnalysis( + bool all_cores_profiled, const InputPipelineAnalysisResult& result); + // Returns the performance bottleneck of the program executed. BottleneckAnalysis ComputeBottleneckAnalysis( const InputTimeBreakdown& input_time_breakdown, diff --git a/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis_test.cc b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis_test.cc new file mode 100644 index 00000000000000..c8b675b75ecc84 --- /dev/null +++ b/tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis_test.cc @@ -0,0 +1,204 @@ +/* Copyright 2024 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#include "tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h" + +#include +#include + +#include "google/protobuf/any.pb.h" +#include "xla/hlo/ir/hlo_opcode.h" +#include "xla/tsl/profiler/utils/timespan.h" +#include "tensorflow/core/platform/test.h" +#include "tensorflow/core/profiler/protobuf/steps_db.pb.h" +#include "tensorflow/core/profiler/utils/event_span.h" +#include "tensorflow/core/profiler/utils/op_metrics_db_utils.h" + +namespace tensorflow { +namespace profiler { +namespace { + +using ::tensorflow::profiler::CoreDetails; +using ::tensorflow::profiler::OpMetricsDb; +using ::tensorflow::profiler::StepDatabaseResult; +using ::tensorflow::profiler::StepEvents; + +TEST(TfOpStatsToInputPipelineAnalysisTest, + AttributeHostInputTimeToTCWhenInfeedMissing) { + uint64_t step_num = 1; + tensorflow::profiler::StepDetails step_details; + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_WAIT_INPUT, + tsl::profiler::Timespan::FromEndPoints(50, 100))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(110, 200))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(430, 500))); + StepEvents host_step_events = {{step_num, step_details}}; + StepDatabaseResult step_db; + tensorflow::profiler::PerCoreStepInfo* pcsi = step_db.add_step_sequence(); + pcsi->set_step_num(step_num); + auto& sipc_map = *pcsi->mutable_step_info_per_core(); + tensorflow::profiler::StepInfoResult& sir = sipc_map[/* core_id= */ 2]; + sir.set_step_num(step_num); + sir.set_begin_ps(40); + sir.set_duration_ps(1000); + tensorflow::profiler::GenericStepBreakdown step_breakdown; + protobuf::Map& category_ps = + *step_breakdown.mutable_category_ps(); + category_ps[tensorflow::profiler::kIdle] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kMultiply)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAllGather)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAsyncStart)] = 50; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAsyncDone)] = 50; + sir.mutable_step_breakdown()->PackFrom(step_breakdown); + protobuf::Map core_details_map; + MayFixTpuStepAnalysis(host_step_events, OpMetricsDb(), step_db, + core_details_map); + tensorflow::profiler::GenericStepBreakdown updated_step_breakdown; + sir.step_breakdown().UnpackTo(&updated_step_breakdown); + const protobuf::Map& updated_category_ps = + updated_step_breakdown.category_ps(); + EXPECT_EQ(updated_category_ps.at(tensorflow::profiler::kIdle), 90); + ASSERT_TRUE(updated_category_ps.contains( + xla::HloOpcodeString(xla::HloOpcode::kInfeed))); + EXPECT_EQ( + updated_category_ps.at(xla::HloOpcodeString(xla::HloOpcode::kInfeed)), + 210); +} + +TEST(TfOpStatsToInputPipelineAnalysisTest, + AttributeHostInputTimeToTCWhenInfeedMissingMultiCore) { + uint64_t step_num = 1; + tensorflow::profiler::StepDetails step_details; + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_WAIT_INPUT, + tsl::profiler::Timespan::FromEndPoints(50, 100))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(110, 200))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(430, 500))); + StepEvents host_step_events = {{step_num, step_details}}; + StepDatabaseResult step_db; + tensorflow::profiler::PerCoreStepInfo* pcsi = step_db.add_step_sequence(); + pcsi->set_step_num(step_num); + protobuf::Map& sipc_map = + *pcsi->mutable_step_info_per_core(); + tensorflow::profiler::StepInfoResult& sir = sipc_map[/* core_id= */ 2]; + sir.set_step_num(step_num); + sir.set_begin_ps(40); + sir.set_duration_ps(1000); + tensorflow::profiler::GenericStepBreakdown step_breakdown; + protobuf::Map& category_ps = + *step_breakdown.mutable_category_ps(); + category_ps[tensorflow::profiler::kIdle] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kMultiply)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAllGather)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAsyncStart)] = 50; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAsyncDone)] = 50; + sir.mutable_step_breakdown()->PackFrom(step_breakdown); + tensorflow::profiler::StepInfoResult& sir2 = sipc_map[/* core_id= */ 1]; + sir2.set_step_num(step_num); + sir2.set_begin_ps(45); + sir2.set_duration_ps(900); + tensorflow::profiler::GenericStepBreakdown step_breakdown2; + protobuf::Map& category_ps2 = + *step_breakdown2.mutable_category_ps(); + category_ps2[tensorflow::profiler::kIdle] = 250; + category_ps2[xla::HloOpcodeString(xla::HloOpcode::kMultiply)] = 300; + category_ps2[xla::HloOpcodeString(xla::HloOpcode::kAllGather)] = 250; + category_ps2[xla::HloOpcodeString(xla::HloOpcode::kAsyncStart)] = 50; + category_ps2[xla::HloOpcodeString(xla::HloOpcode::kAsyncDone)] = 50; + sir2.mutable_step_breakdown()->PackFrom(step_breakdown2); + protobuf::Map core_details_map; + OpMetricsDb device_op_metrics_db; + MayFixTpuStepAnalysis(host_step_events, device_op_metrics_db, step_db, + core_details_map); + tensorflow::profiler::GenericStepBreakdown updated_step_breakdown; + sir.step_breakdown().UnpackTo(&updated_step_breakdown); + const protobuf::Map& updated_category_ps = + updated_step_breakdown.category_ps(); + EXPECT_EQ(updated_category_ps.at(tensorflow::profiler::kIdle), 48); + ASSERT_TRUE(updated_category_ps.contains( + xla::HloOpcodeString(xla::HloOpcode::kInfeed))); + EXPECT_EQ( + updated_category_ps.at(xla::HloOpcodeString(xla::HloOpcode::kInfeed)), + 252); + tensorflow::profiler::GenericStepBreakdown updated_step_breakdown2; + sir2.step_breakdown().UnpackTo(&updated_step_breakdown2); + const protobuf::Map& updated_category_ps2 = + updated_step_breakdown2.category_ps(); + EXPECT_EQ(updated_category_ps2.at(tensorflow::profiler::kIdle), 40); + ASSERT_TRUE(updated_category_ps2.contains( + xla::HloOpcodeString(xla::HloOpcode::kInfeed))); + EXPECT_EQ( + updated_category_ps2.at(xla::HloOpcodeString(xla::HloOpcode::kInfeed)), + 210); +} + +TEST(TfOpStatsToInputPipelineAnalysisTest, + SkipMayFixTpuStepAnalysisWhenInfeedExists) { + uint64_t step_num = 1; + tensorflow::profiler::StepDetails step_details; + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_WAIT_INPUT, + tsl::profiler::Timespan::FromEndPoints(50, 100))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(110, 200))); + step_details.AddEvent(tensorflow::profiler::EventTypeSpan( + tensorflow::profiler::EventType::HOST_TO_DEVICE, + tsl::profiler::Timespan::FromEndPoints(430, 500))); + StepEvents host_step_events = {{step_num, step_details}}; + StepDatabaseResult step_db; + tensorflow::profiler::PerCoreStepInfo* pcsi = step_db.add_step_sequence(); + pcsi->set_step_num(step_num); + protobuf::Map& sipc_map = + *pcsi->mutable_step_info_per_core(); + tensorflow::profiler::StepInfoResult& sir = sipc_map[/* core_id= */ 2]; + sir.set_step_num(step_num); + sir.set_begin_ps(40); + sir.set_duration_ps(1000); + tensorflow::profiler::GenericStepBreakdown step_breakdown; + protobuf::Map& category_ps = + *step_breakdown.mutable_category_ps(); + category_ps[tensorflow::profiler::kIdle] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kMultiply)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAllGather)] = 300; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kAsyncStart)] = 50; + category_ps[xla::HloOpcodeString(xla::HloOpcode::kInfeed)] = 50; + sir.mutable_step_breakdown()->PackFrom(step_breakdown); + protobuf::Map core_details_map; + OpMetricsDb device_op_metrics_db; + device_op_metrics_db.add_metrics_db()->set_category( + std::string(xla::HloOpcodeString(xla::HloOpcode::kInfeed))); + MayFixTpuStepAnalysis(host_step_events, device_op_metrics_db, step_db, + core_details_map); + tensorflow::profiler::GenericStepBreakdown updated_step_breakdown; + sir.step_breakdown().UnpackTo(&updated_step_breakdown); + const protobuf::Map& updated_category_ps = + updated_step_breakdown.category_ps(); + EXPECT_EQ(updated_category_ps.at(tensorflow::profiler::kIdle), 300); + EXPECT_EQ( + updated_category_ps.at(xla::HloOpcodeString(xla::HloOpcode::kInfeed)), + 50); +} + +} // namespace +} // namespace profiler +} // namespace tensorflow diff --git a/tensorflow/core/profiler/protobuf/input_pipeline.proto b/tensorflow/core/profiler/protobuf/input_pipeline.proto index 13542606c8a95f..08ef575b7100ff 100644 --- a/tensorflow/core/profiler/protobuf/input_pipeline.proto +++ b/tensorflow/core/profiler/protobuf/input_pipeline.proto @@ -152,8 +152,7 @@ message GenericStepTimeBreakdown { message InputPipelineAnalysisResult { // tag : indicate the format of step_details and step_time_breakdown. - // true if google confidential format, otherwise it is in the format of - // PerGenericStepDetails and GenericStepTimeBreakdown respectively. + // true for TPU-specific data models. bool tag = 16; // Hardware type. string hardware_type = 9;