Skip to content

Commit

Permalink
Dataflow Tuning (#672)
Browse files Browse the repository at this point in the history
* Support new parameters in for dataflow in lowdowntime migrations

* Added changes for Dataflow tuning including UI, API and backend. Cli is autohandled.

* Fixed generated link

* Added blue color to UI button and json validation for user labels

* REsolved comments
  • Loading branch information
Deep1998 authored Nov 20, 2023
1 parent 752ae41 commit 6d4700d
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 103 deletions.
1 change: 1 addition & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type streamingStats struct {
SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner.
DataStreamName string
DataflowJobId string
DataflowLocation string
DataflowGcloudCmd string
ShardToDataStreamNameMap map[string]string
ShardToDataflowInfoMap map[string]ShardedDataflowJobResources
Expand Down
20 changes: 13 additions & 7 deletions profiles/source_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,19 @@ type DatastreamConnProfile struct {
}

type DataflowConfig struct {
Location string `json:"location"`
Network string `json:"network"`
Subnetwork string `json:"subnetwork"`
HostProjectId string `json:"hostProjectId"`
MaxWorkers string `json:"maxWorkers"`
NumWorkers string `json:"numWorkers"`
ServiceAccountEmail string `json:"serviceAccountEmail"`
ProjectId string `json:"projectId"`
Location string `json:"location"`
Network string `json:"network"`
Subnetwork string `json:"subnetwork"`
VpcHostProjectId string `json:"hostProjectId"`
MaxWorkers string `json:"maxWorkers"`
NumWorkers string `json:"numWorkers"`
ServiceAccountEmail string `json:"serviceAccountEmail"`
JobName string `json:"jobName"`
MachineType string `json:"machineType"`
AdditionalUserLabels string `json:"additionalUserLabels"`
KmsKeyName string `json:"kmsKeyName"`
GcsTemplatePath string `json:"gcsTemplatePath"`
}

type DataShard struct {
Expand Down
102 changes: 68 additions & 34 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
MAX_WORKER_LIMIT int32 = 1000
// Min allowed value for maxWorkers and numWorkers.
MIN_WORKER_LIMIT int32 = 1
// Default gcs path of the Dataflow template.
DEFAULT_TEMPLATE_PATH string = "gs://dataflow-templates-southamerica-west1/2023-09-12-00_RC00/flex/Cloud_Datastream_to_Spanner"
)

type SrcConnCfg struct {
Expand All @@ -73,15 +75,20 @@ type DatastreamCfg struct {
}

type DataflowCfg struct {
JobName string `json:"jobName"`
Location string `json:"location"`
HostProjectId string `json:"hostProjectId"`
Network string `json:"network"`
Subnetwork string `json:"subnetwork"`
MaxWorkers string `json:"maxWorkers"`
NumWorkers string `json:"numWorkers"`
ServiceAccountEmail string `json:"serviceAccountEmail"`
DbNameToShardIdMap map[string]string `json:"dbNameToShardIdMap"`
ProjectId string `json:"projectId"`
JobName string `json:"jobName"`
Location string `json:"location"`
VpcHostProjectId string `json:"vpcHostProjectId"`
Network string `json:"network"`
Subnetwork string `json:"subnetwork"`
MaxWorkers string `json:"maxWorkers"`
NumWorkers string `json:"numWorkers"`
ServiceAccountEmail string `json:"serviceAccountEmail"`
MachineType string `json:"machineType"`
AdditionalUserLabels string `json:"additionalUserLabels"`
KmsKeyName string `json:"kmsKeyName"`
GcsTemplatePath string `json:"gcsTemplatePath"`
DbNameToShardIdMap map[string]string `json:"dbNameToShardIdMap"`
}

type StreamingCfg struct {
Expand Down Expand Up @@ -662,24 +669,40 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
inputFilePattern = inputFilePattern + "/"
}
fmt.Println("Reading files from datastream destination ", inputFilePattern)
var dataflowHostProjectId string
if dataflowCfg.HostProjectId == "" {
dataflowHostProjectId, _ = utils.GetProject()
} else {
dataflowHostProjectId = dataflowCfg.HostProjectId
}

dataflowSubnetwork := ""

// If custom network is not selected, use public IP. Typical for internal testing flow.
workerIpAddressConfig := dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC

if dataflowCfg.Network != "" {
// Initiate runtime environment flags and overrides.
var (
dataflowProjectId = project
dataflowVpcHostProjectId = project
gcsTemplatePath = DEFAULT_TEMPLATE_PATH
dataflowSubnetwork = ""
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
dataflowUserLabels = make(map[string]string)
)
// If project override present, use that otherwise default to Spanner project. Useful when customers want to run Dataflow in separate project.
if dataflowCfg.ProjectId != "" {
dataflowProjectId = dataflowCfg.ProjectId
}
// If VPC Host project override present, use that otherwise default to Spanner project.
if dataflowCfg.VpcHostProjectId != "" {
dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId
}
if dataflowCfg.GcsTemplatePath != "" {
gcsTemplatePath = dataflowCfg.GcsTemplatePath
}

// If either network or subnetwork is specified, set IpConfig to private.
if dataflowCfg.Network != "" || dataflowCfg.Subnetwork != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
if dataflowCfg.Subnetwork == "" {
return internal.DataflowOutput{}, fmt.Errorf("if network is specified, subnetwork cannot be empty")
} else {
dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
if dataflowCfg.Subnetwork != "" {
dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowVpcHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
}
}

if dataflowCfg.AdditionalUserLabels != "" {
err = json.Unmarshal([]byte(dataflowCfg.AdditionalUserLabels), &dataflowUserLabels)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not unmarshal AdditionalUserLabels json %s : error = %v", dataflowCfg.AdditionalUserLabels, err)
}
}

Expand All @@ -703,9 +726,10 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
return internal.DataflowOutput{}, fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
}
}

launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://dataflow-templates-southamerica-west1/2023-09-12-00_RC00/flex/Cloud_Datastream_to_Spanner"},
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
Parameters: map[string]string{
"inputFilePattern": concatDirectoryPath(inputFilePattern, "data"),
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
Expand All @@ -725,10 +749,13 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
Network: dataflowCfg.Network,
Subnetwork: dataflowSubnetwork,
IpConfiguration: workerIpAddressConfig,
MachineType: dataflowCfg.MachineType,
AdditionalUserLabels: dataflowUserLabels,
KmsKeyName: dataflowCfg.KmsKeyName,
},
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: project,
ProjectId: dataflowProjectId,
LaunchParameter: launchParameters,
Location: dataflowCfg.Location,
}
Expand All @@ -749,6 +776,7 @@ func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJ
dataflowCfg := streamingCfg.DataflowCfg
conv.Audit.StreamingStats.DataStreamName = datastreamCfg.StreamId
conv.Audit.StreamingStats.DataflowJobId = dfJobId
conv.Audit.StreamingStats.DataflowLocation = streamingCfg.DataflowCfg.Location
conv.Audit.StreamingStats.DataflowGcloudCmd = gcloudDataflowCmd
conv.Audit.StreamingStats.PubsubCfg = streamingCfg.PubsubCfg
conv.Audit.StreamingStats.MonitoringDashboard = dashboardName
Expand Down Expand Up @@ -777,13 +805,19 @@ func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJ
func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg {
//create dataflowcfg from pl receiver object
inputDataflowConfig := pl.DataflowConfig
dataflowCfg := DataflowCfg{Location: inputDataflowConfig.Location,
Network: inputDataflowConfig.Network,
HostProjectId: inputDataflowConfig.HostProjectId,
Subnetwork: inputDataflowConfig.Subnetwork,
MaxWorkers: inputDataflowConfig.MaxWorkers,
NumWorkers: inputDataflowConfig.NumWorkers,
ServiceAccountEmail: inputDataflowConfig.ServiceAccountEmail,
dataflowCfg := DataflowCfg{
ProjectId: inputDataflowConfig.ProjectId,
Location: inputDataflowConfig.Location,
Network: inputDataflowConfig.Network,
VpcHostProjectId: inputDataflowConfig.VpcHostProjectId,
Subnetwork: inputDataflowConfig.Subnetwork,
MaxWorkers: inputDataflowConfig.MaxWorkers,
NumWorkers: inputDataflowConfig.NumWorkers,
ServiceAccountEmail: inputDataflowConfig.ServiceAccountEmail,
MachineType: inputDataflowConfig.MachineType,
AdditionalUserLabels: inputDataflowConfig.AdditionalUserLabels,
KmsKeyName: inputDataflowConfig.KmsKeyName,
GcsTemplatePath: inputDataflowConfig.GcsTemplatePath,
}
//create src and dst datastream from pl receiver object
datastreamCfg := DatastreamCfg{StreamLocation: pl.StreamLocation}
Expand Down
8 changes: 7 additions & 1 deletion ui/src/app/app.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@ export const DialectList = [
export const Dataflow = {
Network: 'network',
Subnetwork: 'subnetwork',
HostProjectId: 'hostProjectId',
VpcHostProjectId: 'vpcHostProjectId',
MaxWorkers: 'maxWorkers',
NumWorkers: 'numWorkers',
ServiceAccountEmail: 'serviceAccountEmail',
MachineType: 'machineType',
AdditionalUserLabels: 'additionalUserLabels',
KmsKeyName: 'kmsKeyName',
ProjectId: 'dataflowProjectId',
Location: 'dataflowLocation',
GcsTemplatePath: 'gcsTemplatePath',
IsDataflowConfigSet: 'isDataflowConfigSet',
}

Expand Down
121 changes: 96 additions & 25 deletions ui/src/app/components/dataflow-form/dataflow-form.component.html
Original file line number Diff line number Diff line change
@@ -1,43 +1,114 @@
<div mat-dialog-content>
<form [formGroup]="dataflowForm" class="dataflow-form">
<h2>Dataflow Details</h2>
<mat-form-field class="full-width" appearance="outline" matTooltip="Edit to run Dataflow in a shared VPC"
[matTooltipPosition]="'right'" matTooltipHideDelay="100000">
<mat-label>Host ProjectID</mat-label>
<input matInput placeholder="Host ProjectID" type="text" formControlName="hostProjectId"/>
<form [formGroup]="tunableFlagsForm" class="dataflow-form">
<h2>Tune Dataflow (Optional)</h2>
<h5>This form is optional and should only be edited to tune runtime environment for Dataflow.</h5>

<mat-expansion-panel>
<mat-expansion-panel-header class="mat-row" matTooltip="Edit to run Dataflow in a VPC">
<span>Networking</span>
</mat-expansion-panel-header>
<div>
<mat-form-field class="full-width" appearance="outline" matTooltip="Specify the host project Id of the VPC network. For shared VPC, this needs to be edited to the correct host project id."
[matTooltipPosition]="'right'">
<mat-label>VPC Host ProjectID</mat-label>
<input matInput placeholder="VPC Host ProjectID" type="text" formControlName="vpcHostProjectId"/>
</mat-form-field>
<br>
<h5>
- Provide <b>ONLY</b> the VPC subnetwork if unsure about what VPC network to use. Dataflow chooses the network for you.
<br>
- If you are using an <a href="https://cloud.google.com/vpc/docs/create-modify-vpc-networks#create-auto-network" target="_blank">auto mode network</a>, provide ONLY the network name and skip the VPC subnetwork.
</h5>
<mat-form-field class="full-width" appearance="outline" matTooltip="Specify the network name for the VPC"
[matTooltipPosition]="'right'">
<mat-label>VPC Network</mat-label>
<input matInput placeholder="VPC Network Name" type="text" formControlName="network"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Specify the subnetwork name for the VPC. Provide only the subnetwork name and NOT the full URL for subnetwork."
[matTooltipPosition]="'right'">
<mat-label>VPC Subnetwork</mat-label>
<input matInput placeholder="VPC Subnetwork Name" type="text" formControlName="subnetwork"/>
</mat-form-field>
</div>
</mat-expansion-panel>
<br>
<mat-expansion-panel>
<mat-expansion-panel-header class="mat-row" matTooltip="Set performance parameters of the Dataflow job(s)">
<span>Performance</span>
</mat-expansion-panel-header>
<div>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set maximum workers for the dataflow job(s). Default value: 50"
[matTooltipPosition]="'right'">
<mat-label>Max Workers</mat-label>
<input matInput placeholder="50" type="text" formControlName="maxWorkers"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set initial number of workers for the dataflow job(s). Default value: 1"
[matTooltipPosition]="'right'">
<mat-label>Number of Workers</mat-label>
<input matInput placeholder="1" type="text" formControlName="numWorkers"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="The machine type to use for the job, eg: n1-standard-2. Use default machine type if not specified."
[matTooltipPosition]="'right'">
<mat-label>Machine Type</mat-label>
<input matInput placeholder="Machine Type" type="text" formControlName="machineType"/>
</mat-form-field>
<h5>Find the list of all machine types <a href="https://cloud.google.com/compute/docs/machine-resource" target="_blank">here</a>.</h5>
</div>
</mat-expansion-panel>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set the service account to run the dataflow job(s) as"
[matTooltipPosition]="'right'">
<mat-label>Service Account Email</mat-label>
<input matInput placeholder="Service Account Email" type="text" formControlName="serviceAccountEmail"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline">
<mat-label>VPC Network</mat-label>
<input matInput placeholder="VPC Network Name" type="text" formControlName="network"/>
<mat-form-field class="full-width" appearance="outline" matTooltip="Additional user labels to be specified for the job. Enter a json of &quot;key&quot;: &quot;value&quot; pairs. Example: {&quot;name&quot;: &quot;wrench&quot;, &quot;mass&quot;: &quot;1kg&quot;, &quot;count&quot;: &quot;3&quot; }."
[matTooltipPosition]="'right'">
<mat-label>Additional User Labels</mat-label>
<input matInput placeholder="Additional User Labels" type="text" formControlName="additionalUserLabels"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline">
<mat-label>VPC Subnetwork</mat-label>
<input matInput placeholder="VPC Subnetwork Name" type="text" formControlName="subnetwork"/>
<mat-form-field class="full-width" appearance="outline" matTooltip="Name for the Cloud KMS key for the job. Key format is: projects/<project>/locations/<location>/keyRings /<keyring>/cryptoKeys/<key>. Omit this field to use Google Managed Encryption Keys."
[matTooltipPosition]="'right'">
<mat-label>KMS Key Name</mat-label>
<input matInput placeholder="KMS Key Name" type="text" formControlName="kmsKeyName"/>
</mat-form-field>
</form>
<hr>
<form [formGroup]="presetFlagsForm" class="dataflow-form">
<h2>
Preset Flags
<span><button mat-button class="edit-button" (click)="enablePresetFlags()" [disabled]="!disablePresetFlags">
<mat-icon>edit</mat-icon>
EDIT
</button></span>
</h2>
<h5>These flags are set by SMT by default and <b>SHOULD NOT BE</b> modified unless running Dataflow in a non-standard configuration. To edit these parameters, click the edit button above.</h5>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set maximum workers for the dataflow job(s). Default value: 50"
[matTooltipPosition]="'right'" matTooltipHideDelay="100000">
<mat-label>Max Workers</mat-label>
<input matInput placeholder="50" type="text" formControlName="maxWorkers"/>
<mat-form-field class="full-width" appearance="outline" matTooltip="Specify the project to run the dataflow job in."
[matTooltipPosition]="'right'">
<mat-label>Dataflow Project Id</mat-label>
<input matInput placeholder="Dataflow Project Id" type="text" formControlName="dataflowProjectId"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set initial number of workers for the dataflow job(s). Default value: 1"
[matTooltipPosition]="'right'" matTooltipHideDelay="100000">
<mat-label>Number of Workers</mat-label>
<input matInput placeholder="1" type="text" formControlName="numWorkers"/>
<mat-form-field class="full-width" appearance="outline" matTooltip="Specify the region to run the dataflow job in. It is recommended to keep the region same as Spanner region for performance. Example: us-central1"
[matTooltipPosition]="'right'">
<mat-label>Dataflow Location</mat-label>
<input matInput placeholder="Dataflow Location" type="text" formControlName="dataflowLocation"/>
</mat-form-field>
<br>
<mat-form-field class="full-width" appearance="outline" matTooltip="Set the service account to run the dataflow job(s) as"
[matTooltipPosition]="'right'" matTooltipHideDelay="100000">
<mat-label>Service Account Email</mat-label>
<input matInput placeholder="Service Account Email" type="text" formControlName="serviceAccountEmail"/>
<mat-form-field class="full-width" appearance="outline" matTooltip="Cloud Storage path to the template spec. Use this to run launch dataflow with custom templates. Example: gs://my-bucket/path/to/template"
[matTooltipPosition]="'right'">
<mat-label>GCS Template Path</mat-label>
<input matInput placeholder="GCS Template Path" type="text" formControlName="gcsTemplatePath"/>
</mat-form-field>
</form>
<div mat-dialog-actions class="buttons-container">
<button mat-button color="primary" mat-dialog-close>Cancel</button>
<button mat-button type="submit" color="primary" [disabled]="!(dataflowForm.valid)"
<button mat-button type="submit" color="primary" [disabled]="!(tunableFlagsForm.valid && presetFlagsForm.valid)"
(click)="updateDataflowDetails()">
Save
</button>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.edit-button{
color: #3367d6;
}

Loading

0 comments on commit 6d4700d

Please sign in to comment.