Skip to content

Commit

Permalink
Integrate with synchronous provisioning feature (#591) (#594)
Browse files Browse the repository at this point in the history
* Integrate with synchronous provisioning feature



* Remove remaining sleeps



---------


(cherry picked from commit bdaa464)

Signed-off-by: Tyler Ohlsen <ohltyler@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 8b95d4a commit e787a4a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 20 deletions.
2 changes: 1 addition & 1 deletion common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ export const MAX_BYTES_FORMATTED = '1,048,576';
export const MAX_WORKFLOW_NAME_TO_DISPLAY = 40;
export const WORKFLOW_NAME_REGEXP = RegExp('^[a-zA-Z0-9_-]*$');
export const INDEX_NAME_REGEXP = WORKFLOW_NAME_REGEXP;
export const PROVISION_TIMEOUT = '10s'; // the timeout config for synchronous provisioning. https://github.com/opensearch-project/flow-framework/pull/990
export const EMPTY_MAP_ENTRY = { key: '', value: '' } as MapEntry;
export const EMPTY_INPUT_MAP_ENTRY = {
key: '',
Expand All @@ -662,7 +663,6 @@ export const EMPTY_INPUT_MAP_ENTRY = {
value: '',
},
} as InputMapEntry;

export const EMPTY_OUTPUT_MAP_ENTRY = {
...EMPTY_INPUT_MAP_ENTRY,
value: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import {
hasProvisionedIngestResources,
hasProvisionedSearchResources,
generateId,
sleep,
getResourcesToBeForceDeleted,
getDataSourceId,
} from '../../../utils';
Expand Down Expand Up @@ -341,7 +340,6 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
)
.unwrap()
.then(async (result) => {
await sleep(1000);
props.setUnsavedIngestProcessors(false);
props.setUnsavedSearchProcessors(false);
success = true;
Expand Down Expand Up @@ -386,22 +384,16 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
)
.unwrap()
.then(async (result) => {
await sleep(1000);
props.setUnsavedIngestProcessors(false);
props.setUnsavedSearchProcessors(false);
await dispatch(
// TODO: update to be synchronous provisioning, to prevent
// having to wait/sleep before performing next actions.
// https://github.com/opensearch-project/flow-framework/pull/1009
provisionWorkflow({
workflowId: updatedWorkflow.id as string,
dataSourceId,
})
)
.unwrap()
.then(async (result) => {
await sleep(1000);

await dispatch(
getWorkflow({
workflowId: updatedWorkflow.id as string,
Expand Down
2 changes: 1 addition & 1 deletion public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ export function getEmbeddingModelDimensions(
// so we check for that first.
if (connector?.parameters?.dimensions !== undefined) {
return connector.parameters?.dimensions;
} else if (connector.parameters?.model !== undefined) {
} else if (connector?.parameters?.model !== undefined) {
return (
// @ts-ignore
COHERE_CONFIGS[connector.parameters?.model]?.dimension ||
Expand Down
21 changes: 18 additions & 3 deletions server/cluster/flow_framework_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE,
FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE,
FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX,
PROVISION_TIMEOUT,
} from '../../common';

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {

flowFramework.updateWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=<%=reprovision%>`,
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`,
req: {
workflow_id: {
type: 'string',
Expand All @@ -85,7 +86,21 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {
type: 'boolean',
required: true,
},
reprovision: {
},
},
needBody: true,
method: 'PUT',
});

flowFramework.updateAndReprovisionWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=true&wait_for_completion_timeout=${PROVISION_TIMEOUT}`,
req: {
workflow_id: {
type: 'string',
required: true,
},
update_fields: {
type: 'boolean',
required: true,
},
Expand All @@ -97,7 +112,7 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {

flowFramework.provisionWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision`,
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision?wait_for_completion_timeout=${PROVISION_TIMEOUT}`,
req: {
workflow_id: {
type: 'string',
Expand Down
22 changes: 15 additions & 7 deletions server/routes/flow_framework_routes_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,21 @@ export class FlowFrameworkRoutesService {
data_source_id,
this.client
);
await callWithRequest('flowFramework.updateWorkflow', {
workflow_id,
// default update_fields to false if not explicitly set otherwise
update_fields: update_fields,
reprovision: reprovision,
body: workflowTemplate,
});
if (reprovision) {
await callWithRequest('flowFramework.updateAndReprovisionWorkflow', {
workflow_id,
// default update_fields to false if not explicitly set otherwise
update_fields,
body: workflowTemplate,
});
} else {
await callWithRequest('flowFramework.updateWorkflow', {
workflow_id,
// default update_fields to false if not explicitly set otherwise
update_fields,
body: workflowTemplate,
});
}

return res.ok({ body: { workflowId: workflow_id, workflowTemplate } });
} catch (err: any) {
Expand Down

0 comments on commit e787a4a

Please sign in to comment.