Skip to content

Commit

Permalink
Add fine-grained error handling; misc bug fixes (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohltyler authored Feb 6, 2025
1 parent 2a61016 commit c9dd036
Show file tree
Hide file tree
Showing 19 changed files with 504 additions and 82 deletions.
45 changes: 45 additions & 0 deletions common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ export type CachedFormikState = {
touched?: {};
};

export type IngestPipelineErrors = {
[idx: number]: { processorType: string; errorMsg: string };
};

/**
********** OPENSEARCH TYPES/INTERFACES ************
*/
Expand Down Expand Up @@ -596,6 +600,24 @@ export type SimulateIngestPipelineResponse = {
docs: SimulateIngestPipelineDocResponse[];
};

// verbose mode
// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#query-parameters
export type SimulateIngestPipelineDocResponseVerbose = SimulateIngestPipelineDocResponse & {
processor_type: string;
status: 'success' | 'error';
description?: string;
};

// verbose mode
// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#query-parameters
export type SimulateIngestPipelineResponseVerbose = {
docs: [
{
processor_results: SimulateIngestPipelineDocResponseVerbose[];
}
];
};

export type SearchHit = SimulateIngestPipelineDoc;

export type SearchResponse = {
Expand All @@ -619,6 +641,29 @@ export type SearchResponse = {
ext?: {};
};

export type SearchProcessorInputData = {
_index: string;
_id: string;
_score: number;
_source: {};
};
export type SearchProcessorOutputData = SearchProcessorInputData;

export type SearchProcessorResult = {
processor_name: string;
duration_millis: number;
status: 'success' | 'fail';
error?: string;
input_data: SearchProcessorInputData[] | null;
output_data: SearchProcessorOutputData[] | null;
};

export type SearchResponseVerbose = SearchResponse & {
processor_results: SearchProcessorResult[];
};

export type SearchPipelineErrors = IngestPipelineErrors;

export type IndexResponse = {
indexName: string;
indexDetails: IndexConfiguration;
Expand Down
8 changes: 8 additions & 0 deletions public/configs/ingest_processors/split_ingest_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ export class SplitIngestProcessor extends SplitProcessor {
constructor() {
super();
this.id = generateId('split_processor_ingest');
this.optionalFields = [
...(this.optionalFields || []),
{
id: 'ignore_missing',
type: 'boolean',
value: false,
},
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ export class TextChunkingIngestProcessor extends Processor {
id: 'tag',
type: 'string',
},
{
id: 'ignore_missing',
type: 'boolean',
value: false,
},
];
}
}
5 changes: 5 additions & 0 deletions public/configs/sort_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export abstract class SortProcessor extends Processor {
id: 'tag',
type: 'string',
},
{
id: 'ignore_failure',
type: 'boolean',
value: false,
},
];
}
}
5 changes: 5 additions & 0 deletions public/configs/split_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export abstract class SplitProcessor extends Processor {
id: 'tag',
type: 'string',
},
{
id: 'ignore_failure',
type: 'boolean',
value: false,
},
];
}
}
61 changes: 47 additions & 14 deletions public/pages/workflow_detail/tools/query/query.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@ import {
FETCH_ALL_QUERY,
QueryParam,
SearchResponse,
SearchResponseVerbose,
WorkflowFormValues,
} from '../../../../../common';
import { AppState, searchIndex, useAppDispatch } from '../../../../store';
import {
AppState,
searchIndex,
setOpenSearchError,
setSearchPipelineErrors,
useAppDispatch,
} from '../../../../store';
import {
containsEmptyValues,
containsSameValues,
formatSearchPipelineErrors,
getDataSourceId,
getPlaceholdersFromQuery,
getSearchPipelineErrors,
injectParameters,
} from '../../../../utils';
import { QueryParamsList, Results } from '../../../../general_components';
Expand Down Expand Up @@ -166,9 +175,7 @@ export function Query(props: QueryProps) {
: [SEARCH_OPTIONS[1]]
}
selectedOptions={
props.hasSearchPipeline &&
includePipeline &&
props.selectedStep === CONFIG_STEP.SEARCH
includePipeline
? [SEARCH_OPTIONS[0]]
: [SEARCH_OPTIONS[1]]
}
Expand All @@ -194,23 +201,49 @@ export function Query(props: QueryProps) {
apiBody: {
index: indexToSearch,
body: injectParameters(queryParams, tempRequest),
searchPipeline:
props.hasSearchPipeline &&
includePipeline &&
props.selectedStep === CONFIG_STEP.SEARCH &&
!isEmpty(values?.search?.pipelineName)
? values?.search?.pipelineName
: '_none',
searchPipeline: includePipeline
? values?.search?.pipelineName
: '_none',
},
dataSourceId,
verbose: includePipeline,
})
)
.unwrap()
.then(async (resp: SearchResponse) => {
setQueryResponse(resp);
})
.then(
async (
resp: SearchResponse | SearchResponseVerbose
) => {
if (includePipeline) {
const searchPipelineErrors = getSearchPipelineErrors(
resp as SearchResponseVerbose
);
// The errors map may be empty; in which case, this dispatch will clear
// any older errors.
dispatch(
setSearchPipelineErrors({
errors: searchPipelineErrors,
})
);
if (!isEmpty(searchPipelineErrors)) {
dispatch(
setOpenSearchError({
error: `Error running search pipeline. ${formatSearchPipelineErrors(
searchPipelineErrors
)}`,
})
);
}
} else {
setSearchPipelineErrors({ errors: {} });
}

setQueryResponse(resp);
}
)
.catch((error: any) => {
setQueryResponse(undefined);
setSearchPipelineErrors({ errors: {} });
console.error('Error running query: ', error);
});
}}
Expand Down
47 changes: 43 additions & 4 deletions public/pages/workflow_detail/workflow_inputs/processors_list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
*/

import React, { useEffect, useState } from 'react';
import { useSelector } from 'react-redux';
import semver from 'semver';
import { getEffectiveVersion } from '../../../pages/workflows/new_workflow/new_workflow';
import {
EuiSmallButtonEmpty,
EuiSmallButtonIcon,
Expand All @@ -29,7 +29,7 @@ import {
WorkflowFormValues,
} from '../../../../common';
import { formikToUiConfig, getDataSourceFromURL } from '../../../utils';

import { getEffectiveVersion } from '../../../pages/workflows/new_workflow/new_workflow';
import {
CollapseProcessor,
CopyIngestProcessor,
Expand All @@ -53,6 +53,12 @@ import {
MIN_SUPPORTED_VERSION,
MINIMUM_FULL_SUPPORTED_VERSION,
} from '../../../../common';
import {
AppState,
setIngestPipelineErrors,
setSearchPipelineErrors,
useAppDispatch,
} from '../../../store';

interface ProcessorsListProps {
uiConfig: WorkflowConfig;
Expand All @@ -67,13 +73,26 @@ const PANEL_ID = 0;
* General component for configuring pipeline processors (ingest / search request / search response)
*/
export function ProcessorsList(props: ProcessorsListProps) {
const dispatch = useAppDispatch();
const {
ingestPipeline: ingestPipelineErrors,
searchPipeline: searchPipelineErrors,
} = useSelector((state: AppState) => state.errors);
const { values, errors, touched } = useFormikContext<WorkflowFormValues>();
const [version, setVersion] = useState<string>('');
const location = useLocation();
const [processorAdded, setProcessorAdded] = useState<boolean>(false);
const [isPopoverOpen, setPopover] = useState(false);
const [processors, setProcessors] = useState<IProcessorConfig[]>([]);

function clearProcessorErrors(): void {
if (props.context === PROCESSOR_CONTEXT.INGEST) {
dispatch(setIngestPipelineErrors({ errors: {} }));
} else {
dispatch(setSearchPipelineErrors({ errors: {} }));
}
}

const closePopover = () => {
setPopover(false);
};
Expand Down Expand Up @@ -258,6 +277,7 @@ export function ProcessorsList(props: ProcessorsListProps) {
// the list of processors. Additionally, persist any current form state
// (touched, errors) so they are re-initialized when the form is reset.
function addProcessor(processor: IProcessorConfig): void {
clearProcessorErrors();
props.setCachedFormikState({
errors,
touched,
Expand Down Expand Up @@ -295,6 +315,7 @@ export function ProcessorsList(props: ProcessorsListProps) {
// (getting any updated/interim values along the way) delete
// the specified processor from the list of processors
function deleteProcessor(processorIdToDelete: string): void {
clearProcessorErrors();
const existingConfig = cloneDeep(props.uiConfig as WorkflowConfig);
let newConfig = formikToUiConfig(values, existingConfig);
switch (props.context) {
Expand Down Expand Up @@ -341,6 +362,23 @@ export function ProcessorsList(props: ProcessorsListProps) {
}
} catch (e) {}

const processorFormError =
hasErrors && allTouched
? 'Invalid or missing fields detected'
: undefined;
const processorRuntimeError =
props.context === PROCESSOR_CONTEXT.INGEST
? getIn(
ingestPipelineErrors,
`${processorIndex}.errorMsg`,
undefined
)
: getIn(
searchPipelineErrors,
`${processorIndex}.errorMsg`,
undefined
);

return (
<EuiFlexItem key={processorIndex}>
<EuiPanel paddingSize="s">
Expand All @@ -354,14 +392,15 @@ export function ProcessorsList(props: ProcessorsListProps) {
<EuiFlexItem grow={false}>
<EuiText>{`${processor.name || 'Processor'}`}</EuiText>
</EuiFlexItem>
{hasErrors && allTouched && (
{(processorFormError !== undefined ||
processorRuntimeError !== undefined) && (
<EuiFlexItem grow={false}>
<EuiIconTip
aria-label="Warning"
size="m"
type="alert"
color="danger"
content="Invalid or missing fields detected"
content={processorFormError || processorRuntimeError}
position="right"
/>
</EuiFlexItem>
Expand Down
Loading

0 comments on commit c9dd036

Please sign in to comment.