Skip to content

Commit

Permalink
feat: Adding support for an Aggregated monitoring dashboard in sharde…
Browse files Browse the repository at this point in the history
…d migration (#686)

* aggregated dashboard

* adding comments

* changes due to comment

* license
  • Loading branch information
asthamohta authored Nov 20, 2023
1 parent 049f8ea commit 118b375
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 247 deletions.
376 changes: 376 additions & 0 deletions common/metrics/dashboard_components.go

Large diffs are not rendered by default.

243 changes: 41 additions & 202 deletions common/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,33 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package utils contains common helper functions used across multiple other packages.
// Utils should not import any Spanner migration tool packages.
package metrics

import (
"context"
"fmt"
"math"
"strings"
"sync"

dashboard "cloud.google.com/go/monitoring/dashboard/apiv1"
"cloud.google.com/go/monitoring/dashboard/apiv1/dashboardpb"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports"
"github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration"
)

// Defines dimensions for Monitoring Dashboard Metrics
const (
// Default height of a tile in the monitoring dashboard
defaultMonitoringMetricHeight int32 = 16
// Default width of a tile in the monitoring dashboard
defaultMonitoringMetricWidth int32 = 16
// Default columns in the monitoring dashboard
defaultColumns int32 = 3
defaultMosaicColumns int32 = 48
)

var once sync.Once
var dashboardClient *dashboard.DashboardsClient

// MonitoringMetricsResources contains information required to create the monitoring dashboard
type MonitoringMetricsResources struct {
ProjectId string
DataflowJobId string
DatastreamId string
GcsBucketId string
PubsubSubscriptionId string
SpannerInstanceId string
SpannerDatabaseId string
ShardId string
}

type TileInfo struct {
Title string
TimeSeriesQueries map[string]string // Map of legend template and their corresponding queries
}

// GetMigrationData returns migration data comprising source schema details,
// request id, target dialect, connection mechanism etc based on
// the conv object, source driver and target db
Expand Down Expand Up @@ -140,192 +122,49 @@ func getMigrationDataSourceDetails(driver string, migrationData *migration.Migra
}
}

func getDashboardClient(ctx context.Context) *dashboard.DashboardsClient {
if dashboardClient == nil {
once.Do(func() {
dashboardClient, _ = dashboard.NewDashboardsClient(ctx)
})
return dashboardClient
}
return dashboardClient
}

// CreateDataflowShardMonitoringDashboard returns a monitoring dashboard for a single shard
func (resourceIds MonitoringMetricsResources) CreateDataflowShardMonitoringDashboard(ctx context.Context) (*dashboardpb.Dashboard, error) {
var mosaicLayoutTiles []*dashboardpb.MosaicLayout_Tile
var heightOffset int32 = 0

// create independent metrics tiles
independentMetricsTiles := createShardIndependentMetrics(resourceIds)
heightOffset += setWidgetPositions(independentMetricsTiles, heightOffset)
mosaicLayoutTiles = append(mosaicLayoutTiles, independentMetricsTiles...)

var mosaicGroups = []struct {
groupTitle string
groupCreateTileFunction func(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile
}{
var mosaicGroups = []MosaicGroup{
{groupTitle: fmt.Sprintf("Dataflow Job: %s", resourceIds.DataflowJobId), groupCreateTileFunction: createShardDataflowMetrics},
{groupTitle: fmt.Sprintf("Datastream: %s", resourceIds.DatastreamId), groupCreateTileFunction: createShardDatastreamMetrics},
{groupTitle: fmt.Sprintf("GCS Bucket: %s", strings.Split(resourceIds.GcsBucketId, "/")[2]), groupCreateTileFunction: createShardGcsMetrics},
{groupTitle: fmt.Sprintf("GCS Bucket: %s", resourceIds.GcsBucketId), groupCreateTileFunction: createShardGcsMetrics},
{groupTitle: fmt.Sprintf("Pubsub: %s", resourceIds.PubsubSubscriptionId), groupCreateTileFunction: createShardPubsubMetrics},
{groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createShardSpannerMetrics},
{groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createSpannerMetrics},
}

for _, mosaicGroup := range mosaicGroups {
metricTiles := mosaicGroup.groupCreateTileFunction(resourceIds)
var groupTile *dashboardpb.MosaicLayout_Tile
groupTile, heightOffset = createCollapsibleGroupTile(TileInfo{Title: mosaicGroup.groupTitle}, metricTiles, heightOffset)
mosaicLayoutTiles = append(append(mosaicLayoutTiles, metricTiles...), groupTile)
}

mosaicLayout := dashboardpb.MosaicLayout{
Columns: defaultMosaicColumns,
Tiles: mosaicLayoutTiles,
}
layout := dashboardpb.Dashboard_MosaicLayout{
MosaicLayout: &mosaicLayout,
}

dashboardDisplayName := "Migration Dashboard"
if resourceIds.ShardId != "" {
dashboardDisplayName = fmt.Sprintf("Shard Migration Dashboard %s", resourceIds.ShardId)
}
db := dashboardpb.Dashboard{
DisplayName: dashboardDisplayName,
Layout: &layout,
}
req := &dashboardpb.CreateDashboardRequest{
Parent: "projects/" + resourceIds.ProjectId,
Dashboard: &db,
}
createDashboardReq := getCreateMonitoringDashboardRequest(resourceIds, createShardIndependentMetrics, mosaicGroups, dashboardDisplayName)
client := getDashboardClient(ctx)
resp, err := client.CreateDashboard(ctx, req)
if client == nil {
return nil, fmt.Errorf("dashboard client could not be created")
}
resp, err := client.CreateDashboard(ctx, createDashboardReq)
if err != nil {
return nil, err
}
return resp, err
}

func createShardDataflowMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
dataflowTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Dataflow Workers CPU Utilization", map[string]string{"": fmt.Sprintf(dataflowCpuUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Dataflow Workers Memory Utilization", map[string]string{"": fmt.Sprintf(dataflowMemoryUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Dataflow Workers Backlog Time Seconds", map[string]string{"": fmt.Sprintf(dataflowBacklogTimeQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
}
return dataflowTiles
}

func createShardDatastreamMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
datastreamTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{
"Datastream Total Latency",
map[string]string{"p50 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, resourceIds.ProjectId, "50"), "p90 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, resourceIds.ProjectId, "90")}}),
createXYChartTile(TileInfo{"Datastream Throughput", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Unsupported Events", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
}
return datastreamTiles
}

func createShardGcsMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
gcsBucketTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"GCS Bucket Total Bytes", map[string]string{resourceIds.GcsBucketId: fmt.Sprintf(gcsTotalBytesQuery, strings.Split(resourceIds.GcsBucketId, "/")[2], resourceIds.ProjectId)}}),
// CreateDataflowAggMonitoringDashboard returns a monitoring dashboard for a sharded migration, aggregated across all shards
func (resourceIds MonitoringMetricsResources) CreateDataflowAggMonitoringDashboard(ctx context.Context) (*dashboardpb.Dashboard, error) {
var mosaicGroups = []MosaicGroup{
{groupTitle: "Summary of Dataflow Jobs", groupCreateTileFunction: createAggDataflowMetrics},
{groupTitle: "Summary of Datastreams", groupCreateTileFunction: createAggDatastreamMetrics},
{groupTitle: "Summary of GCS Buckets", groupCreateTileFunction: createAggGcsMetrics},
{groupTitle: "Summary of Pubsubs", groupCreateTileFunction: createAggPubsubMetrics},
{groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createSpannerMetrics},
}
return gcsBucketTiles
}

func createShardSpannerMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
spannerTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(
TileInfo{"Spanner CPU Utilisation",
map[string]string{"Database CPU Utilisation": fmt.Sprintf(spannerCpuUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance CPU Utilisation": fmt.Sprintf(spannerCpuUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
createXYChartTile(
TileInfo{"Spanner Storage",
map[string]string{"Database Storage": fmt.Sprintf(spannerStorageUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance Storage": fmt.Sprintf(spannerStorageUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
}
return spannerTiles
}

func createShardPubsubMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
pubsubTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Pubsub Subscription Sent Message Count", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubSubscriptionSentMessageCountQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Pubsub Age of Oldest Unacknowledged Message", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}),
}
return pubsubTiles
}

func createShardIndependentMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
independentMetricsTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Dataflow Workers CPU Utilization", map[string]string{"": fmt.Sprintf(dataflowCpuUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Throughput", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Unsupported Events", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Pubsub Age of Oldest Unacknowledged Message", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}), createXYChartTile(
TileInfo{"Spanner CPU Utilisation",
map[string]string{"Database CPU Utilisation": fmt.Sprintf(spannerCpuUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance CPU Utilisation": fmt.Sprintf(spannerCpuUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
}
return independentMetricsTiles
}

// createXYChartTile returns a single tile in a mosaic layout dashboard
func createXYChartTile(tileInfo TileInfo) *dashboardpb.MosaicLayout_Tile {
var dataSets []*dashboardpb.XyChart_DataSet
for legendTemplate, query := range tileInfo.TimeSeriesQueries {
ds := &dashboardpb.XyChart_DataSet{
PlotType: dashboardpb.XyChart_DataSet_LINE,
TargetAxis: dashboardpb.XyChart_DataSet_Y1,
TimeSeriesQuery: &dashboardpb.TimeSeriesQuery{
Source: &dashboardpb.TimeSeriesQuery_TimeSeriesQueryLanguage{
TimeSeriesQueryLanguage: query,
},
},
}
if legendTemplate != "" {
ds.LegendTemplate = legendTemplate
}
dataSets = append(dataSets, ds)
}
tile := dashboardpb.MosaicLayout_Tile{
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_XyChart{
XyChart: &dashboardpb.XyChart{
ChartOptions: &dashboardpb.ChartOptions{
Mode: dashboardpb.ChartOptions_COLOR,
},
DataSets: dataSets,
},
},
},
}
return &tile
}

// createCollapsibleGroupTile returns a collapsible group tile in a mosaic layout dashboard
func createCollapsibleGroupTile(tileInfo TileInfo, tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) (*dashboardpb.MosaicLayout_Tile, int32) {
groupTileHeight := setWidgetPositions(tiles, heightOffset)
groupTile := dashboardpb.MosaicLayout_Tile{
XPos: 0,
YPos: heightOffset,
Width: defaultMonitoringMetricWidth * defaultColumns,
Height: groupTileHeight,
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_CollapsibleGroup{
CollapsibleGroup: &dashboardpb.CollapsibleGroup{
Collapsed: true,
},
},
},
noOfShard := len(resourceIds.ShardToDataflowInfoMap)
createDashboardReq := getCreateMonitoringDashboardRequest(resourceIds, createAggIndependentMetrics, mosaicGroups, fmt.Sprintf("Aggregated Migration Dashboard of %v shards", noOfShard))
client := getDashboardClient(ctx)
if client == nil {
return nil, fmt.Errorf("dashboard client could not be created")
}
return &groupTile, heightOffset + groupTileHeight
}

// setWidgetPositions positions the tiles in the monitoring dashboard
func setWidgetPositions(tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) int32 {
for tilePosition, tile := range tiles {
tile.XPos = (int32(tilePosition) % defaultColumns) * defaultMonitoringMetricWidth
tile.YPos = heightOffset + (int32(tilePosition)/defaultColumns)*defaultMonitoringMetricHeight
tile.Width = defaultMonitoringMetricWidth
tile.Height = defaultMonitoringMetricHeight
resp, err := client.CreateDashboard(ctx, createDashboardReq)
if err != nil {
return nil, err
}
return ((int32(len(tiles)-1) / defaultColumns) + 1) * defaultMonitoringMetricHeight
return resp, err
}
Loading

0 comments on commit 118b375

Please sign in to comment.