Skip to content

Commit

Permalink
fix(#3425): Properly store stream transformation rules in adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Jan 23, 2025
1 parent eccff93 commit 509a036
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 26 deletions.
67 changes: 67 additions & 0 deletions ui/cypress/tests/connect/editAdapterTransformationRulesAreKept.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { AdapterBuilder } from '../../support/builder/AdapterBuilder';
import { ConnectUtils } from '../../support/utils/connect/ConnectUtils';
import { ConnectBtns } from '../../support/utils/connect/ConnectBtns';

describe('Test Adapter Transformation Rules are properly stored', () => {
beforeEach('Setup Test', () => {
cy.initStreamPipesTest();
});

it('Test configuration of adapter fields ', () => {
// Set up new adapter
const builder = AdapterBuilder.create('Machine_Data_Simulator')
.setName('Machine_Data_Simulator')
.addInput('input', 'wait-time-ms', '1000');
const configuration = builder.build();
ConnectUtils.goToConnect();
ConnectUtils.goToNewAdapterPage();
ConnectUtils.selectAdapter(configuration.adapterType);
cy.contains('Next').click();

cy.dataCy('sp-event-schema-next-button').click();
cy.dataCy('sp-adapter-name').type('Test Adapter');
cy.dataCy('connect-remove-duplicates-box').click();
cy.dataCy('connect-remove-duplicates-input').type('10000');
cy.dataCy('connect-reduce-event-rate-box').click();
cy.dataCy('connect-reduce-event-input').type('20000');
cy.dataCy('adapter-settings-start-adapter-btn').click();
ConnectUtils.closeAdapterPreview();

// Edit adapter and check if given values and added property still provided
ConnectBtns.editAdapter().should('not.be.disabled');
ConnectBtns.editAdapter().click();
cy.contains('Next').click();
cy.dataCy('sp-event-schema-next-button').click();

cy.dataCy('connect-remove-duplicates-box')
.find('input')
.should('be.checked');
cy.dataCy('connect-remove-duplicates-input').should(
'have.value',
'10000',
);

cy.dataCy('connect-reduce-event-rate-box')
.find('input')
.should('be.checked');
cy.dataCy('connect-reduce-event-input').should('have.value', '20000');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class AdapterConfigurationComponent implements OnInit {
this.adapter.dataStream.eventSchema = targetSchema;

this.adapter.rules =
this.transformationRuleService.getTransformationRuleDescriptions(
this.transformationRuleService.makeTransformationRuleDescriptions(
originalSchema,
targetSchema,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ export class EventSchemaComponent implements OnChanges {
public updatePreview(): void {
this.isPreviewEnabled = false;
const ruleDescriptions =
this.transformationRuleService.getTransformationRuleDescriptions(
this.transformationRuleService.makeTransformationRuleDescriptions(
this.originalSchema,
this.targetSchema,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@
optionDescription="Avoid duplicated events within a certain time interval"
optionIcon="cleaning_services"
dataCy="connect-remove-duplicates-box"
[isChecked]="removeDuplicates"
(optionSelectedEmitter)="removeDuplicates = $event"
>
<mat-form-field *ngIf="removeDuplicates" color="accent">
<mat-form-field
*ngIf="removeDuplicates"
color="accent"
class="mt-10"
>
<input
matInput
id="input-removeDuplicatesTime"
Expand All @@ -83,9 +88,14 @@
optionDescription="Send maximum one event in the specified time window"
optionIcon="speed"
dataCy="connect-reduce-event-rate-box"
[isChecked]="eventRateReduction"
(optionSelectedEmitter)="eventRateReduction = $event"
>
<mat-form-field *ngIf="eventRateReduction" color="accent">
<mat-form-field
*ngIf="eventRateReduction"
color="accent"
class="mt-10"
>
<input
type="number"
matInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,27 @@ import { AdapterStartedDialog } from '../../../dialog/adapter-started/adapter-st
import { DialogService, PanelType } from '@streampipes/shared-ui';
import { ShepherdService } from '../../../../services/tour/shepherd.service';
import { TimestampPipe } from '../../../filter/timestamp.pipe';
import { TransformationRuleService } from '../../../services/transformation-rule.service';

@Component({
selector: 'sp-start-adapter-configuration',
templateUrl: './start-adapter-configuration.component.html',
styleUrls: ['./start-adapter-configuration.component.scss'],
})
export class StartAdapterConfigurationComponent implements OnInit {
static EventRateTransformationRuleId =
'org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription' as const;
static RemoveDuplicatesTransformationRuleId =
'org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription' as const;

/**
* Adapter description the selected format is added to
*/
@Input() adapterDescription: AdapterDescription;

@Input() eventSchema: EventSchema;

@Input() isEditMode;
@Input() isEditMode: boolean;

/**
* Cancels the adapter configuration process
Expand Down Expand Up @@ -96,6 +102,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
private shepherdService: ShepherdService,
private _formBuilder: UntypedFormBuilder,
private timestampPipe: TimestampPipe,
private transformationRuleService: TransformationRuleService,
) {}

ngOnInit(): void {
Expand All @@ -115,6 +122,34 @@ export class StartAdapterConfigurationComponent implements OnInit {
this.startAdapterSettingsFormValid = this.startAdapterForm.valid;
});
this.startAdapterSettingsFormValid = this.startAdapterForm.valid;

this.applySelectedEventRateReduction();
this.applySelectedRemoveDuplicates();
}

applySelectedEventRateReduction(): void {
const eventRateRule =
this.transformationRuleService.getExistingTransformationRule<EventRateTransformationRuleDescription>(
this.adapterDescription,
StartAdapterConfigurationComponent.EventRateTransformationRuleId,
);
if (eventRateRule !== undefined) {
this.eventRateReduction = true;
this.eventRateTime = eventRateRule.aggregationTimeWindow;
this.eventRateMode = eventRateRule.aggregationType;
}
}

applySelectedRemoveDuplicates(): void {
const removeDuplicatesRule =
this.transformationRuleService.getExistingTransformationRule<RemoveDuplicatesTransformationRuleDescription>(
this.adapterDescription,
StartAdapterConfigurationComponent.RemoveDuplicatesTransformationRuleId,
);
if (removeDuplicatesRule !== undefined) {
this.removeDuplicates = true;
this.removeDuplicatesTime = +removeDuplicatesRule.filterTimeWindow;
}
}

findDefaultTimestamp(selected: boolean) {
Expand All @@ -131,6 +166,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
}

public editAdapter() {
this.checkAndApplyStreamRules();
const dialogRef = this.dialogService.open(AdapterStartedDialog, {
panelType: PanelType.STANDARD_PANEL,
title: 'Adapter edit',
Expand All @@ -147,24 +183,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
}

public startAdapter() {
if (this.removeDuplicates) {
const removeDuplicates: RemoveDuplicatesTransformationRuleDescription =
new RemoveDuplicatesTransformationRuleDescription();
removeDuplicates['@class'] =
'org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription';
removeDuplicates.filterTimeWindow = this
.removeDuplicatesTime as any;
this.adapterDescription.rules.push(removeDuplicates);
}
if (this.eventRateReduction) {
const eventRate: EventRateTransformationRuleDescription =
new EventRateTransformationRuleDescription();
eventRate['@class'] =
'org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription';
eventRate.aggregationTimeWindow = this.eventRateTime;
eventRate.aggregationType = this.eventRateMode;
this.adapterDescription.rules.push(eventRate);
}
this.checkAndApplyStreamRules();

const dialogRef = this.dialogService.open(AdapterStartedDialog, {
panelType: PanelType.STANDARD_PANEL,
Expand All @@ -185,6 +204,27 @@ export class StartAdapterConfigurationComponent implements OnInit {
});
}

private checkAndApplyStreamRules(): void {
if (this.removeDuplicates) {
const removeDuplicates: RemoveDuplicatesTransformationRuleDescription =
new RemoveDuplicatesTransformationRuleDescription();
removeDuplicates['@class'] =
StartAdapterConfigurationComponent.RemoveDuplicatesTransformationRuleId;
removeDuplicates.filterTimeWindow = this
.removeDuplicatesTime as any;
this.adapterDescription.rules.push(removeDuplicates);
}
if (this.eventRateReduction) {
const eventRate: EventRateTransformationRuleDescription =
new EventRateTransformationRuleDescription();
eventRate['@class'] =
StartAdapterConfigurationComponent.EventRateTransformationRuleId;
eventRate.aggregationTimeWindow = this.eventRateTime;
eventRate.aggregationType = this.eventRateMode;
this.adapterDescription.rules.push(eventRate);
}
}

public removeSelection() {
this.removeSelectionEmitter.emit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { EventPropertyPrimitive } from '@streampipes/platform-services';
templateUrl: './edit-unit-transformation.component.html',
styleUrls: ['./edit-unit-transformation.component.scss'],
})
export class EditUnitTransformationComponent implements OnInit {
export class EditUnitTransformationComponent {
@Input() cachedProperty: EventPropertyPrimitive;
@Input() originalProperty: EventPropertyPrimitive;

Expand Down Expand Up @@ -64,6 +64,7 @@ export class EditUnitTransformationComponent implements OnInit {
: this.allUnits.slice(),
),
);
this.applySelectedUnits();
});

this.currentUnitStateCtrl.valueChanges.subscribe(val => {
Expand All @@ -79,7 +80,7 @@ export class EditUnitTransformationComponent implements OnInit {

protected open = false;

ngOnInit() {
applySelectedUnits(): void {
if (this.cachedProperty.measurementUnit) {
const sourceUnit = this.cachedProperty.additionalMetadata
.toMeasurementUnit
Expand Down
12 changes: 11 additions & 1 deletion ui/src/app/connect/services/transformation-rule.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import { Injectable } from '@angular/core';
import {
AdapterDescription,
AddTimestampRuleDescription,
AddValueTransformationRuleDescription,
ChangeDatatypeTransformationRuleDescription,
Expand Down Expand Up @@ -49,7 +50,7 @@ export class TransformationRuleService {

private delimiter = '<-=>';

public getTransformationRuleDescriptions(
public makeTransformationRuleDescriptions(
originalSchema: EventSchema,
targetSchema: EventSchema,
): TransformationRuleDescriptionUnion[] {
Expand Down Expand Up @@ -734,4 +735,13 @@ export class TransformationRuleService {
rule.replaceAll = eventProperty.additionalMetadata.replaceAll ?? false;
return rule;
}

public getExistingTransformationRule<T>(
adapterDescription: AdapterDescription,
transformationRuleType: string,
): T {
return adapterDescription.rules.find(
r => r['@class'] === transformationRuleType,
) as T;
}
}

0 comments on commit 509a036

Please sign in to comment.