From 93342957986c589e1cb28e13e20d1fa9a9ba9269 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Mon, 3 Feb 2025 09:33:20 -0800 Subject: [PATCH 1/8] Add verbose simulate ingest pipeline Signed-off-by: Tyler Ohlsen --- common/interfaces.ts | 22 +++ .../split_ingest_processor.ts | 8 ++ .../text_chunking_ingest_processor.ts | 5 + public/configs/sort_processor.ts | 5 + public/configs/split_processor.ts | 5 + .../workflow_inputs/workflow_inputs.tsx | 131 ++++++++++++------ .../new_workflow/quick_configure_modal.tsx | 3 +- public/route_service.ts | 18 ++- public/store/reducers/opensearch_reducer.ts | 21 ++- public/store/reducers/workflows_reducer.ts | 2 +- public/utils/utils.ts | 30 ++++ server/routes/opensearch_routes_service.ts | 70 +++++++++- 12 files changed, 264 insertions(+), 56 deletions(-) diff --git a/common/interfaces.ts b/common/interfaces.ts index 56b2a5ee..9029cef6 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -568,6 +568,10 @@ export type CachedFormikState = { touched?: {}; }; +export type IngestPipelineErrors = { + [idx: number]: { processorType: string; errorMsg: string }; +}; + /** ********** OPENSEARCH TYPES/INTERFACES ************ */ @@ -596,6 +600,24 @@ export type SimulateIngestPipelineResponse = { docs: SimulateIngestPipelineDocResponse[]; }; +// verbose mode +// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#query-parameters +export type SimulateIngestPipelineDocResponseVerbose = SimulateIngestPipelineDocResponse & { + processor_type: string; + status: 'success' | 'error'; + description?: string; +}; + +// verbose mode +// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#query-parameters +export type SimulateIngestPipelineResponseVerbose = { + docs: [ + { + processor_results: SimulateIngestPipelineDocResponseVerbose[]; + } + ]; +}; + export type SearchHit = SimulateIngestPipelineDoc; export type SearchResponse = { diff --git a/public/configs/ingest_processors/split_ingest_processor.ts b/public/configs/ingest_processors/split_ingest_processor.ts index 91c38062..ff79dedb 100644 --- a/public/configs/ingest_processors/split_ingest_processor.ts +++ b/public/configs/ingest_processors/split_ingest_processor.ts @@ -13,5 +13,13 @@ export class SplitIngestProcessor extends SplitProcessor { constructor() { super(); this.id = generateId('split_processor_ingest'); + this.optionalFields = [ + ...(this.optionalFields || []), + { + id: 'ignore_missing', + type: 'boolean', + value: false, + }, + ]; } } diff --git a/public/configs/ingest_processors/text_chunking_ingest_processor.ts b/public/configs/ingest_processors/text_chunking_ingest_processor.ts index 8caa71a5..cfcf8e24 100644 --- a/public/configs/ingest_processors/text_chunking_ingest_processor.ts +++ b/public/configs/ingest_processors/text_chunking_ingest_processor.ts @@ -70,6 +70,11 @@ export class TextChunkingIngestProcessor extends Processor { id: 'tag', type: 'string', }, + { + id: 'ignore_missing', + type: 'boolean', + value: false, + }, ]; } } diff --git a/public/configs/sort_processor.ts b/public/configs/sort_processor.ts index bc30d410..715d3549 100644 --- a/public/configs/sort_processor.ts +++ b/public/configs/sort_processor.ts @@ -40,6 +40,11 @@ export abstract class SortProcessor extends Processor { id: 'tag', type: 'string', }, + { + id: 'ignore_failure', + type: 'boolean', + value: false, + }, ]; } } diff --git a/public/configs/split_processor.ts b/public/configs/split_processor.ts index 3e0349f7..20f9a162 100644 --- a/public/configs/split_processor.ts +++ b/public/configs/split_processor.ts @@ -45,6 +45,11 @@ export abstract class SplitProcessor extends Processor { id: 'tag', type: 'string', }, + { + id: 'ignore_failure', + type: 'boolean', + value: false, + }, ]; } } diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index 71e10484..439ecdf4 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -23,6 +23,7 @@ import { import { CONFIG_STEP, CachedFormikState, + SimulateIngestPipelineResponseVerbose, TemplateNode, WORKFLOW_STEP_TYPE, Workflow, @@ -38,6 +39,8 @@ import { deprovisionWorkflow, getWorkflow, provisionWorkflow, + setOpenSearchError, + simulatePipeline, updateWorkflow, useAppDispatch, } from '../../../store'; @@ -51,6 +54,9 @@ import { generateId, getResourcesToBeForceDeleted, getDataSourceId, + prepareDocsForSimulate, + getIngestPipelineErrors, + formatIngestPipelineErrors, } from '../../../utils'; import { BooleanField } from './input_fields'; import '../workspace/workspace-styles.scss'; @@ -273,6 +279,49 @@ export function WorkflowInputs(props: WorkflowInputsProps) { } } + // Utility fn to perform bulk ingest + function bulkIngest(ingestDocsObjs: {}[]) { + const bulkBody = prepareBulkBody(values.ingest.index.name, ingestDocsObjs); + dispatch(bulk({ apiBody: { body: bulkBody }, dataSourceId })) + .unwrap() + .then(async (resp: any) => { + props.setIngestResponse(customStringify(resp)); + props.setIsRunningIngest(false); + setLastIngested(Date.now()); + getCore().notifications.toasts.add({ + iconType: 'check', + color: 'success', + title: 'Ingest flow updated', + // @ts-ignore + text: ( + + + + Validate your ingest flow using Test flow + + + + + + props.displaySearchPanel()} + > + Test flow + + + + + + ), + }); + }) + .catch((error: any) => { + props.setIngestResponse(''); + throw error; + }); + } + // Utility fn to update the workflow UI config only, based on the current form values. // A get workflow API call is subsequently run to fetch the updated state. async function updateWorkflowUiConfig() { @@ -524,48 +573,48 @@ export function WorkflowInputs(props: WorkflowInputsProps) { if (ingestDocsObjs.length > 0 && !isEmpty(ingestDocsObjs[0])) { success = await validateAndUpdateWorkflow(false, true, false); if (success) { - const bulkBody = prepareBulkBody( - values.ingest.index.name, - ingestDocsObjs - ); - dispatch(bulk({ apiBody: { body: bulkBody }, dataSourceId })) - .unwrap() - .then(async (resp: any) => { - props.setIngestResponse(customStringify(resp)); - props.setIsRunningIngest(false); - setLastIngested(Date.now()); - getCore().notifications.toasts.add({ - iconType: 'check', - color: 'success', - title: 'Ingest flow updated', - // @ts-ignore - text: ( - - - - Validate your ingest flow using Test flow - - - - - - props.displaySearchPanel()} - > - Test flow - - - - - - ), + if ( + !isEmpty(values?.ingest?.enrich) && + values?.ingest?.pipelineName !== undefined && + values?.ingest?.pipelineName !== '' + ) { + const curDocs = prepareDocsForSimulate( + values?.ingest?.docs, + values?.ingest?.index?.name + ); + await dispatch( + simulatePipeline({ + apiBody: { + docs: curDocs, + }, + pipelineId: values.ingest.pipelineName, + dataSourceId, + verbose: true, + }) + ) + .unwrap() + .then((resp: SimulateIngestPipelineResponseVerbose) => { + const ingestPipelineErrors = getIngestPipelineErrors(resp); + if (isEmpty(ingestPipelineErrors)) { + bulkIngest(ingestDocsObjs); + } else { + dispatch( + setOpenSearchError({ + error: `Data not ingested. ${formatIngestPipelineErrors( + ingestPipelineErrors + )}`, + }) + ); + } + }) + .catch((error: any) => { + getCore().notifications.toasts.addDanger( + `Failed to simulate ingest pipeline: ${error}` + ); }); - }) - .catch((error: any) => { - props.setIngestResponse(''); - throw error; - }); + } else { + bulkIngest(ingestDocsObjs); + } } } else { getCore().notifications.toasts.addDanger( diff --git a/public/pages/workflows/new_workflow/quick_configure_modal.tsx b/public/pages/workflows/new_workflow/quick_configure_modal.tsx index 603eee3e..060d159c 100644 --- a/public/pages/workflows/new_workflow/quick_configure_modal.tsx +++ b/public/pages/workflows/new_workflow/quick_configure_modal.tsx @@ -72,7 +72,8 @@ export function QuickConfigureModal(props: QuickConfigureModalProps) { const dataSourceId = getDataSourceId(); const history = useHistory(); const { models } = useSelector((state: AppState) => state.ml); - const { workflows } = useSelector((state: AppState) => state.workflows); + //const { workflows } = useSelector((state: AppState) => state.workflows); + const workflows = undefined; // model interface states const [embeddingModelInterface, setEmbeddingModelInterface] = useState< diff --git a/public/route_service.ts b/public/route_service.ts index c093fc91..c186a0a0 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -127,10 +127,12 @@ export interface RouteService { ) => Promise; simulatePipeline: ( body: { - pipeline: IngestPipelineConfig; + pipeline?: IngestPipelineConfig; docs: SimulateIngestPipelineDoc[]; }, - dataSourceId?: string + dataSourceId?: string, + pipelineId?: string, + verbose?: boolean ) => Promise; getIngestPipeline: ( pipelineId: string, @@ -408,17 +410,23 @@ export function configureRoutes(core: CoreStart): RouteService { }, simulatePipeline: async ( body: { - pipeline: IngestPipelineConfig; + pipeline?: IngestPipelineConfig; docs: SimulateIngestPipelineDoc[]; }, - dataSourceId?: string + dataSourceId?: string, + pipelineId?: string, + verbose?: boolean ) => { try { - const url = dataSourceId + let url = dataSourceId ? `${BASE_NODE_API_PATH}/${dataSourceId}/opensearch/simulatePipeline` : SIMULATE_PIPELINE_NODE_API_PATH; + url = pipelineId ? `${url}/${pipelineId}` : url; const response = await core.http.post<{ respString: string }>(url, { body: JSON.stringify(body), + query: { + verbose: verbose ?? false, + }, }); return response; } catch (e: any) { diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 6c05c332..f8e94d31 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -32,6 +32,7 @@ export const INITIAL_OPENSEARCH_STATE = { }; const OPENSEARCH_PREFIX = 'opensearch'; +const SET_OPENSEARCH_ERROR = `${OPENSEARCH_PREFIX}/setError`; const CAT_INDICES_ACTION = `${OPENSEARCH_PREFIX}/catIndices`; const GET_MAPPINGS_ACTION = `${OPENSEARCH_PREFIX}/mappings`; const SEARCH_INDEX_ACTION = `${OPENSEARCH_PREFIX}/search`; @@ -42,6 +43,13 @@ const GET_INGEST_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getIngestPipeline`; const GET_SEARCH_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getSearchPipeline`; const GET_INDEX_ACTION = `${OPENSEARCH_PREFIX}/getIndex`; +export const setOpenSearchError = createAsyncThunk( + SET_OPENSEARCH_ERROR, + async ({ error }: { error: string }, { rejectWithValue }) => { + return error; + } +); + export const catIndices = createAsyncThunk( CAT_INDICES_ACTION, async ( @@ -197,12 +205,16 @@ export const simulatePipeline = createAsyncThunk( { apiBody, dataSourceId, + pipelineId, + verbose, }: { apiBody: { - pipeline: IngestPipelineConfig; + pipeline?: IngestPipelineConfig; docs: SimulateIngestPipelineDoc[]; }; dataSourceId?: string; + pipelineId?: string; + verbose?: boolean; }, { rejectWithValue } ) => { @@ -210,7 +222,9 @@ export const simulatePipeline = createAsyncThunk( | any | HttpFetchError = await getRouteService().simulatePipeline( apiBody, - dataSourceId + dataSourceId, + pipelineId, + verbose ); if (response instanceof HttpFetchError) { return rejectWithValue( @@ -312,6 +326,9 @@ const opensearchSlice = createSlice({ state.loading = true; state.errorMessage = ''; }) + .addCase(setOpenSearchError.fulfilled, (state, action) => { + state.errorMessage = action.payload; + }) .addCase(catIndices.fulfilled, (state, action) => { const indicesMap = new Map(); action.payload.forEach((index: Index) => { diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index c34fd999..06f23120 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -273,7 +273,7 @@ const workflowsSlice = createSlice({ }) .addCase(searchWorkflows.fulfilled, (state, action) => { const { workflows } = action.payload as { workflows: WorkflowDict }; - state.workflows = workflows; + state.workflows = workflows || {}; state.loading = false; state.errorMessage = ''; }) diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 6b75db94..1a3586d0 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -43,6 +43,7 @@ import { import { getCore, getDataSourceEnabled } from '../services'; import { Connector, + IngestPipelineErrors, InputMapEntry, MDSQueryParams, ModelInputMap, @@ -50,6 +51,7 @@ import { OutputMapEntry, OutputMapFormValue, QueryParam, + SimulateIngestPipelineResponseVerbose, } from '../../common/interfaces'; import * as pluginManifest from '../../opensearch_dashboards.json'; import { DataSourceAttributes } from '../../../../src/plugins/data_source/common/data_sources'; @@ -199,6 +201,34 @@ export function unwrapTransformedDocs( return transformedDocsSources; } +// Extract any processor-level errors from a verbose simulate ingest pipeline API call +export function getIngestPipelineErrors( + simulatePipelineResponse: SimulateIngestPipelineResponseVerbose +): IngestPipelineErrors { + let ingestPipelineErrors = {} as IngestPipelineErrors; + simulatePipelineResponse.docs?.forEach((docResult) => { + docResult.processor_results.forEach((processorResult, idx) => { + if (processorResult.error?.reason !== undefined) { + ingestPipelineErrors[idx] = { + processorType: processorResult.processor_type, + errorMsg: processorResult.error.reason, + }; + } + }); + }); + return ingestPipelineErrors; +} + +export function formatIngestPipelineErrors( + errors: IngestPipelineErrors +): string { + let msg = 'Errors found with the following ingest processor(s):\n\n'; + Object.values(errors || {}).forEach((processorError, idx) => { + msg += `Processor type: ${processorError.processorType}. Error: ${processorError.errorMsg}\n\n`; + }); + return msg; +} + // ML inference processors will use standard dot notation or JSONPath depending on the input. // We follow the same logic here to generate consistent results. export function generateTransform( diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index 2ba07300..1adbaab2 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -4,6 +4,7 @@ */ import { schema } from '@osd/config-schema'; +import { isEmpty } from 'lodash'; import { IRouter, IOpenSearchDashboardsResponse, @@ -242,6 +243,26 @@ export function registerOpenSearchRoutes( pipeline: schema.any(), docs: schema.any(), }), + query: schema.object({ + verbose: schema.boolean(), + }), + }, + }, + opensearchRoutesService.simulatePipeline + ); + router.post( + { + path: `${SIMULATE_PIPELINE_NODE_API_PATH}/{pipeline_id}`, + validate: { + body: schema.object({ + docs: schema.any(), + }), + params: schema.object({ + pipeline_id: schema.string(), + }), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.simulatePipeline @@ -257,6 +278,27 @@ export function registerOpenSearchRoutes( pipeline: schema.any(), docs: schema.any(), }), + query: schema.object({ + verbose: schema.boolean(), + }), + }, + }, + opensearchRoutesService.simulatePipeline + ); + router.post( + { + path: `${BASE_NODE_API_PATH}/{data_source_id}/opensearch/simulatePipeline/{pipeline_id}`, + validate: { + params: schema.object({ + data_source_id: schema.string(), + pipeline_id: schema.string(), + }), + body: schema.object({ + docs: schema.any(), + }), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.simulatePipeline @@ -511,11 +553,17 @@ export class OpenSearchRoutesService { req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - const { data_source_id = '' } = req.params as { data_source_id?: string }; + const { data_source_id = '', pipeline_id = '' } = req.params as { + data_source_id?: string; + pipeline_id?: string; + }; const { pipeline, docs } = req.body as { - pipeline: IngestPipelineConfig; + pipeline?: IngestPipelineConfig; docs: SimulateIngestPipelineDoc[]; }; + const { verbose = false } = req.query as { + verbose?: boolean; + }; try { const callWithRequest = getClientBasedOnDataSource( context, @@ -525,10 +573,20 @@ export class OpenSearchRoutesService { this.client ); - const response = await callWithRequest('ingest.simulate', { - body: { pipeline, docs }, - }); - + let response = undefined as any; + + if (!isEmpty(pipeline_id)) { + response = await callWithRequest('ingest.simulate', { + body: { docs }, + id: pipeline_id, + verbose, + }); + } else { + response = await callWithRequest('ingest.simulate', { + body: { docs, pipeline }, + verbose, + }); + } return res.ok({ body: { docs: response.docs } as SimulateIngestPipelineResponse, }); From a92a3943baac79c6de0461b00ff9007b835c84b5 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 4 Feb 2025 09:37:11 -0800 Subject: [PATCH 2/8] Add verbose to search pipeline; fix bug of search dropdown on ingest side Signed-off-by: Tyler Ohlsen --- .../pages/workflow_detail/tools/query/query.tsx | 15 +++++---------- public/route_service.ts | 7 +++++++ public/store/reducers/opensearch_reducer.ts | 3 +++ server/routes/opensearch_routes_service.ts | 16 ++++++++++++++++ 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/public/pages/workflow_detail/tools/query/query.tsx b/public/pages/workflow_detail/tools/query/query.tsx index c539319e..a3003027 100644 --- a/public/pages/workflow_detail/tools/query/query.tsx +++ b/public/pages/workflow_detail/tools/query/query.tsx @@ -166,9 +166,7 @@ export function Query(props: QueryProps) { : [SEARCH_OPTIONS[1]] } selectedOptions={ - props.hasSearchPipeline && - includePipeline && - props.selectedStep === CONFIG_STEP.SEARCH + includePipeline ? [SEARCH_OPTIONS[0]] : [SEARCH_OPTIONS[1]] } @@ -194,15 +192,12 @@ export function Query(props: QueryProps) { apiBody: { index: indexToSearch, body: injectParameters(queryParams, tempRequest), - searchPipeline: - props.hasSearchPipeline && - includePipeline && - props.selectedStep === CONFIG_STEP.SEARCH && - !isEmpty(values?.search?.pipelineName) - ? values?.search?.pipelineName - : '_none', + searchPipeline: includePipeline + ? values?.search?.pipelineName + : '_none', }, dataSourceId, + verbose: includePipeline, }) ) .unwrap() diff --git a/public/route_service.ts b/public/route_service.ts index c186a0a0..180b1f19 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -97,11 +97,13 @@ export interface RouteService { body, dataSourceId, searchPipeline, + verbose, }: { index: string; body: {}; dataSourceId?: string; searchPipeline?: string; + verbose?: boolean; }) => Promise; ingest: ( index: string, @@ -322,11 +324,13 @@ export function configureRoutes(core: CoreStart): RouteService { body, dataSourceId, searchPipeline, + verbose, }: { index: string; body: {}; dataSourceId?: string; searchPipeline?: string; + verbose?: boolean; }) => { try { const url = dataSourceId @@ -338,6 +342,9 @@ export function configureRoutes(core: CoreStart): RouteService { : basePath; const response = await core.http.post<{ respString: string }>(path, { body: JSON.stringify(body), + query: { + verbose: verbose ?? false, + }, }); return response; } catch (e: any) { diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index f8e94d31..952a5e27 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -118,9 +118,11 @@ export const searchIndex = createAsyncThunk( { apiBody, dataSourceId, + verbose, }: { apiBody: { index: string; body: {}; searchPipeline?: string }; dataSourceId?: string; + verbose?: boolean; }, { rejectWithValue } ) => { @@ -130,6 +132,7 @@ export const searchIndex = createAsyncThunk( body, dataSourceId, searchPipeline, + verbose, }); if (response instanceof HttpFetchError) { return rejectWithValue('Error searching index: ' + response.body.message); diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index 1adbaab2..f983d930 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -120,6 +120,9 @@ export function registerOpenSearchRoutes( index: schema.string(), }), body: schema.any(), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.searchIndex @@ -133,6 +136,9 @@ export function registerOpenSearchRoutes( data_source_id: schema.string(), }), body: schema.any(), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.searchIndex @@ -146,6 +152,9 @@ export function registerOpenSearchRoutes( search_pipeline: schema.string(), }), body: schema.any(), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.searchIndex @@ -160,6 +169,9 @@ export function registerOpenSearchRoutes( data_source_id: schema.string(), }), body: schema.any(), + query: schema.object({ + verbose: schema.boolean(), + }), }, }, opensearchRoutesService.searchIndex @@ -467,6 +479,9 @@ export class OpenSearchRoutesService { search_pipeline: string | undefined; }; const { data_source_id = '' } = req.params as { data_source_id?: string }; + const { verbose = false } = req.query as { + verbose?: boolean; + }; const body = req.body; try { const callWithRequest = getClientBasedOnDataSource( @@ -481,6 +496,7 @@ export class OpenSearchRoutesService { index, body, search_pipeline, + verbose_pipeline: verbose, }); return res.ok({ body: response }); From 7db5edc37d4b971f1a335d4c8b214f1d8dd9a13f Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 4 Feb 2025 10:08:18 -0800 Subject: [PATCH 3/8] Add similar helper fns for search pipeline responses Signed-off-by: Tyler Ohlsen --- common/interfaces.ts | 23 ++++++++++++ .../workflow_detail/tools/query/query.tsx | 35 ++++++++++++++++--- public/utils/utils.ts | 27 ++++++++++++++ 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/common/interfaces.ts b/common/interfaces.ts index 9029cef6..f55ae405 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -641,6 +641,29 @@ export type SearchResponse = { ext?: {}; }; +export type SearchProcessorInputData = { + _index: string; + _id: string; + _score: number; + _source: {}; +}; +export type SearchProcessorOutputData = SearchProcessorInputData; + +export type SearchProcessorResult = { + processor_name: string; + duration_millis: number; + status: 'success' | 'fail'; + error?: string; + input_data: SearchProcessorInputData[] | null; + output_data: SearchProcessorOutputData[] | null; +}; + +export type SearchResponseVerbose = SearchResponse & { + processor_results: SearchProcessorResult[]; +}; + +export type SearchPipelineErrors = IngestPipelineErrors; + export type IndexResponse = { indexName: string; indexDetails: IndexConfiguration; diff --git a/public/pages/workflow_detail/tools/query/query.tsx b/public/pages/workflow_detail/tools/query/query.tsx index a3003027..acedbaaa 100644 --- a/public/pages/workflow_detail/tools/query/query.tsx +++ b/public/pages/workflow_detail/tools/query/query.tsx @@ -23,14 +23,22 @@ import { FETCH_ALL_QUERY, QueryParam, SearchResponse, + SearchResponseVerbose, WorkflowFormValues, } from '../../../../../common'; -import { AppState, searchIndex, useAppDispatch } from '../../../../store'; +import { + AppState, + searchIndex, + setOpenSearchError, + useAppDispatch, +} from '../../../../store'; import { containsEmptyValues, containsSameValues, + formatSearchPipelineErrors, getDataSourceId, getPlaceholdersFromQuery, + getSearchPipelineErrors, injectParameters, } from '../../../../utils'; import { QueryParamsList, Results } from '../../../../general_components'; @@ -201,9 +209,28 @@ export function Query(props: QueryProps) { }) ) .unwrap() - .then(async (resp: SearchResponse) => { - setQueryResponse(resp); - }) + .then( + async ( + resp: SearchResponse | SearchResponseVerbose + ) => { + if (includePipeline) { + const searchPipelineErrors = getSearchPipelineErrors( + resp as SearchResponseVerbose + ); + if (!isEmpty(searchPipelineErrors)) { + dispatch( + setOpenSearchError({ + error: `Error running search pipeline. ${formatSearchPipelineErrors( + searchPipelineErrors + )}`, + }) + ); + } + } + + setQueryResponse(resp); + } + ) .catch((error: any) => { setQueryResponse(undefined); console.error('Error running query: ', error); diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 1a3586d0..7025b0a0 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -51,6 +51,8 @@ import { OutputMapEntry, OutputMapFormValue, QueryParam, + SearchPipelineErrors, + SearchResponseVerbose, SimulateIngestPipelineResponseVerbose, } from '../../common/interfaces'; import * as pluginManifest from '../../opensearch_dashboards.json'; @@ -229,6 +231,31 @@ export function formatIngestPipelineErrors( return msg; } +export function getSearchPipelineErrors( + searchResponseVerbose: SearchResponseVerbose +): SearchPipelineErrors { + let searchPipelineErrors = {} as SearchPipelineErrors; + searchResponseVerbose.processor_results?.forEach((processorResult, idx) => { + if (processorResult?.error !== undefined) { + searchPipelineErrors[idx] = { + processorType: processorResult.processor_name, + errorMsg: processorResult.error, + }; + } + }); + return searchPipelineErrors; +} + +export function formatSearchPipelineErrors( + errors: IngestPipelineErrors +): string { + let msg = 'Errors found with the following search processor(s):\n\n'; + Object.values(errors || {}).forEach((processorError, idx) => { + msg += `Processor type: ${processorError.processorType}. Error: ${processorError.errorMsg}\n\n`; + }); + return msg; +} + // ML inference processors will use standard dot notation or JSONPath depending on the input. // We follow the same logic here to generate consistent results. export function generateTransform( From a8af16313a82f08ae83823844d3bb2ec6b23d23f Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 4 Feb 2025 11:03:07 -0800 Subject: [PATCH 4/8] Revert undefined workflows comment Signed-off-by: Tyler Ohlsen --- public/pages/workflows/new_workflow/quick_configure_modal.tsx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/public/pages/workflows/new_workflow/quick_configure_modal.tsx b/public/pages/workflows/new_workflow/quick_configure_modal.tsx index 060d159c..603eee3e 100644 --- a/public/pages/workflows/new_workflow/quick_configure_modal.tsx +++ b/public/pages/workflows/new_workflow/quick_configure_modal.tsx @@ -72,8 +72,7 @@ export function QuickConfigureModal(props: QuickConfigureModalProps) { const dataSourceId = getDataSourceId(); const history = useHistory(); const { models } = useSelector((state: AppState) => state.ml); - //const { workflows } = useSelector((state: AppState) => state.workflows); - const workflows = undefined; + const { workflows } = useSelector((state: AppState) => state.workflows); // model interface states const [embeddingModelInterface, setEmbeddingModelInterface] = useState< From bf1054b8a85feeddbb9a5a1f09d4e89c77ac55ff Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 4 Feb 2025 12:50:52 -0800 Subject: [PATCH 5/8] Aside: display invalid ui_metadata types in table Signed-off-by: Tyler Ohlsen --- .../workflows/workflow_list/workflow_list.tsx | 14 +++++++++----- public/utils/utils.ts | 4 +++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/public/pages/workflows/workflow_list/workflow_list.tsx b/public/pages/workflows/workflow_list/workflow_list.tsx index a019ab06..7abf6a92 100644 --- a/public/pages/workflows/workflow_list/workflow_list.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.tsx @@ -5,7 +5,7 @@ import React, { useState, useEffect } from 'react'; import { useSelector } from 'react-redux'; -import { debounce, isEmpty } from 'lodash'; +import { debounce } from 'lodash'; import { EuiInMemoryTable, Direction, @@ -34,6 +34,7 @@ import { MultiSelectFilter } from '../../../general_components'; import { WORKFLOWS_TAB } from '../workflows'; import { DeleteWorkflowModal } from './delete_workflow_modal'; import { ResourceList } from './resource_list'; +import { isValidUiWorkflow } from '../../../utils'; interface WorkflowListProps { setSelectedTabId: (tabId: WORKFLOWS_TAB) => void; @@ -142,15 +143,14 @@ export function WorkflowList(props: WorkflowListProps) { > -

{`Resources configured for'${getCharacterLimitedString( +

{`Resources configured for '${getCharacterLimitedString( selectedWorkflow.name, MAX_WORKFLOW_NAME_TO_DISPLAY )}'`}

- {selectedWorkflow?.ui_metadata === undefined || - isEmpty(selectedWorkflow?.ui_metadata) || + {!isValidUiWorkflow(selectedWorkflow) || selectedWorkflow?.ui_metadata?.type === WORKFLOW_TYPE.UNKNOWN ? ( Invalid workflow type} @@ -230,7 +230,11 @@ function fetchFilteredWorkflows( description: workflow.description || EMPTY_FIELD_STRING, ui_metadata: { ...workflow.ui_metadata, - type: workflow.ui_metadata?.type || WORKFLOW_TYPE.UNKNOWN, + type: + workflow.ui_metadata?.type !== undefined && + Object.values(WORKFLOW_TYPE).includes(workflow.ui_metadata?.type) + ? workflow.ui_metadata?.type + : WORKFLOW_TYPE.UNKNOWN, } as UIState, }) ); diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 7025b0a0..0469f55c 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -39,6 +39,7 @@ import { IMAGE_FIELD_PATTERN, LABEL_FIELD_PATTERN, MODEL_ID_PATTERN, + WORKFLOW_TYPE, } from '../../common'; import { getCore, getDataSourceEnabled } from '../services'; import { @@ -151,7 +152,8 @@ export function isValidUiWorkflow(workflowObj: any): boolean { return ( isValidWorkflow(workflowObj) && workflowObj?.ui_metadata?.config !== undefined && - workflowObj?.ui_metadata?.type !== undefined + workflowObj?.ui_metadata?.type !== undefined && + Object.values(WORKFLOW_TYPE).includes(workflowObj?.ui_metadata?.type) ); } From 40ef7377c8e1d739bd5e0863018910bf0a13a5c0 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 4 Feb 2025 17:18:34 -0800 Subject: [PATCH 6/8] Make processor errors visible on UI Signed-off-by: Tyler Ohlsen --- .../workflow_inputs/processors_list.tsx | 23 +++++++++--- .../workflow_inputs/workflow_inputs.tsx | 7 ++++ public/store/reducers/errors_reducer.ts | 35 +++++++++++++++++++ public/store/reducers/index.ts | 1 + public/store/store.ts | 2 ++ test/utils.ts | 2 ++ 6 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 public/store/reducers/errors_reducer.ts diff --git a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx index fbcf1814..bc078cbe 100644 --- a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx @@ -4,8 +4,8 @@ */ import React, { useEffect, useState } from 'react'; +import { useSelector } from 'react-redux'; import semver from 'semver'; -import { getEffectiveVersion } from '../../../pages/workflows/new_workflow/new_workflow'; import { EuiSmallButtonEmpty, EuiSmallButtonIcon, @@ -29,7 +29,7 @@ import { WorkflowFormValues, } from '../../../../common'; import { formikToUiConfig, getDataSourceFromURL } from '../../../utils'; - +import { getEffectiveVersion } from '../../../pages/workflows/new_workflow/new_workflow'; import { CollapseProcessor, CopyIngestProcessor, @@ -53,6 +53,7 @@ import { MIN_SUPPORTED_VERSION, MINIMUM_FULL_SUPPORTED_VERSION, } from '../../../../common'; +import { AppState } from '../../../store'; interface ProcessorsListProps { uiConfig: WorkflowConfig; @@ -67,6 +68,9 @@ const PANEL_ID = 0; * General component for configuring pipeline processors (ingest / search request / search response) */ export function ProcessorsList(props: ProcessorsListProps) { + const { ingestPipeline: ingestPipelineErrors } = useSelector( + (state: AppState) => state.errors + ); const { values, errors, touched } = useFormikContext(); const [version, setVersion] = useState(''); const location = useLocation(); @@ -341,6 +345,16 @@ export function ProcessorsList(props: ProcessorsListProps) { } } catch (e) {} + const processorFormError = + hasErrors && allTouched + ? 'Invalid or missing fields detected' + : undefined; + const processorRuntimeError = getIn( + ingestPipelineErrors, + `${processorIndex}.errorMsg`, + undefined + ) as string | undefined; + return ( @@ -354,14 +368,15 @@ export function ProcessorsList(props: ProcessorsListProps) { {`${processor.name || 'Processor'}`} - {hasErrors && allTouched && ( + {(processorFormError !== undefined || + processorRuntimeError !== undefined) && ( diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index 439ecdf4..474ec8c6 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -39,6 +39,7 @@ import { deprovisionWorkflow, getWorkflow, provisionWorkflow, + setIngestPipelineErrors, setOpenSearchError, simulatePipeline, updateWorkflow, @@ -595,6 +596,11 @@ export function WorkflowInputs(props: WorkflowInputsProps) { .unwrap() .then((resp: SimulateIngestPipelineResponseVerbose) => { const ingestPipelineErrors = getIngestPipelineErrors(resp); + // The errors map may be empty; in which case, this dispatch will clear + // any older errors. + dispatch( + setIngestPipelineErrors({ errors: ingestPipelineErrors }) + ); if (isEmpty(ingestPipelineErrors)) { bulkIngest(ingestDocsObjs); } else { @@ -614,6 +620,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) { }); } else { bulkIngest(ingestDocsObjs); + dispatch(setIngestPipelineErrors({ errors: {} })); } } } else { diff --git a/public/store/reducers/errors_reducer.ts b/public/store/reducers/errors_reducer.ts new file mode 100644 index 00000000..55e6db2e --- /dev/null +++ b/public/store/reducers/errors_reducer.ts @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; +import { IngestPipelineErrors, SearchPipelineErrors } from '../../../common'; + +export const INITIAL_ERRORS_STATE = { + ingestPipeline: {} as IngestPipelineErrors, + searchPipeline: {} as SearchPipelineErrors, +}; + +const ERRORS_PREFIX = 'errors'; +const SET_INGEST_PIPELINE_ERRORS = `${ERRORS_PREFIX}/setIngestPipelineErrors`; + +export const setIngestPipelineErrors = createAsyncThunk( + SET_INGEST_PIPELINE_ERRORS, + async ({ errors }: { errors: IngestPipelineErrors }, { rejectWithValue }) => { + return errors; + } +); + +const errorsSlice = createSlice({ + name: ERRORS_PREFIX, + initialState: INITIAL_ERRORS_STATE, + reducers: {}, + extraReducers: (builder) => { + builder.addCase(setIngestPipelineErrors.fulfilled, (state, action) => { + state.ingestPipeline = action.payload; + }); + }, +}); + +export const errorsReducer = errorsSlice.reducer; diff --git a/public/store/reducers/index.ts b/public/store/reducers/index.ts index d28b3193..b83c36a8 100644 --- a/public/store/reducers/index.ts +++ b/public/store/reducers/index.ts @@ -7,3 +7,4 @@ export * from './opensearch_reducer'; export * from './workflows_reducer'; export * from './presets_reducer'; export * from './ml_reducer'; +export * from './errors_reducer'; diff --git a/public/store/store.ts b/public/store/store.ts index c3a61818..24e1a746 100644 --- a/public/store/store.ts +++ b/public/store/store.ts @@ -11,6 +11,7 @@ import { workflowsReducer, presetsReducer, mlReducer, + errorsReducer, } from './reducers'; const rootReducer = combineReducers({ @@ -18,6 +19,7 @@ const rootReducer = combineReducers({ presets: presetsReducer, ml: mlReducer, opensearch: opensearchReducer, + errors: errorsReducer, }); export const store = configureStore({ diff --git a/test/utils.ts b/test/utils.ts index 1626cb4d..6268a687 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -4,6 +4,7 @@ */ import { + INITIAL_ERRORS_STATE, INITIAL_ML_STATE, INITIAL_OPENSEARCH_STATE, INITIAL_PRESETS_STATE, @@ -38,6 +39,7 @@ export function mockStore(...workflowSets: WorkflowInput[]) { workflows: workflowDict, }, presets: INITIAL_PRESETS_STATE, + errors: INITIAL_ERRORS_STATE, }), dispatch: jest.fn(), subscribe: jest.fn(), From f776ba092ed671d8977fda93f67104e998e8b1a3 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Wed, 5 Feb 2025 09:14:30 -0800 Subject: [PATCH 7/8] Integrate with search pipeline Signed-off-by: Tyler Ohlsen --- .../workflow_detail/tools/query/query.tsx | 11 +++++ .../workflow_inputs/processors_list.tsx | 42 +++++++++++++++---- public/store/reducers/errors_reducer.ts | 18 ++++++-- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/public/pages/workflow_detail/tools/query/query.tsx b/public/pages/workflow_detail/tools/query/query.tsx index acedbaaa..e3f03d61 100644 --- a/public/pages/workflow_detail/tools/query/query.tsx +++ b/public/pages/workflow_detail/tools/query/query.tsx @@ -30,6 +30,7 @@ import { AppState, searchIndex, setOpenSearchError, + setSearchPipelineErrors, useAppDispatch, } from '../../../../store'; import { @@ -217,6 +218,13 @@ export function Query(props: QueryProps) { const searchPipelineErrors = getSearchPipelineErrors( resp as SearchResponseVerbose ); + // The errors map may be empty; in which case, this dispatch will clear + // any older errors. + dispatch( + setSearchPipelineErrors({ + errors: searchPipelineErrors, + }) + ); if (!isEmpty(searchPipelineErrors)) { dispatch( setOpenSearchError({ @@ -226,6 +234,8 @@ export function Query(props: QueryProps) { }) ); } + } else { + setSearchPipelineErrors({ errors: {} }); } setQueryResponse(resp); @@ -233,6 +243,7 @@ export function Query(props: QueryProps) { ) .catch((error: any) => { setQueryResponse(undefined); + setSearchPipelineErrors({ errors: {} }); console.error('Error running query: ', error); }); }} diff --git a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx index bc078cbe..d3d9e70c 100644 --- a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx @@ -53,7 +53,12 @@ import { MIN_SUPPORTED_VERSION, MINIMUM_FULL_SUPPORTED_VERSION, } from '../../../../common'; -import { AppState } from '../../../store'; +import { + AppState, + setIngestPipelineErrors, + setSearchPipelineErrors, + useAppDispatch, +} from '../../../store'; interface ProcessorsListProps { uiConfig: WorkflowConfig; @@ -68,9 +73,11 @@ const PANEL_ID = 0; * General component for configuring pipeline processors (ingest / search request / search response) */ export function ProcessorsList(props: ProcessorsListProps) { - const { ingestPipeline: ingestPipelineErrors } = useSelector( - (state: AppState) => state.errors - ); + const dispatch = useAppDispatch(); + const { + ingestPipeline: ingestPipelineErrors, + searchPipeline: searchPipelineErrors, + } = useSelector((state: AppState) => state.errors); const { values, errors, touched } = useFormikContext(); const [version, setVersion] = useState(''); const location = useLocation(); @@ -78,6 +85,14 @@ export function ProcessorsList(props: ProcessorsListProps) { const [isPopoverOpen, setPopover] = useState(false); const [processors, setProcessors] = useState([]); + function clearProcessorErrors(): void { + if (props.context === PROCESSOR_CONTEXT.INGEST) { + dispatch(setIngestPipelineErrors({ errors: {} })); + } else { + dispatch(setSearchPipelineErrors({ errors: {} })); + } + } + const closePopover = () => { setPopover(false); }; @@ -262,6 +277,7 @@ export function ProcessorsList(props: ProcessorsListProps) { // the list of processors. Additionally, persist any current form state // (touched, errors) so they are re-initialized when the form is reset. function addProcessor(processor: IProcessorConfig): void { + clearProcessorErrors(); props.setCachedFormikState({ errors, touched, @@ -299,6 +315,7 @@ export function ProcessorsList(props: ProcessorsListProps) { // (getting any updated/interim values along the way) delete // the specified processor from the list of processors function deleteProcessor(processorIdToDelete: string): void { + clearProcessorErrors(); const existingConfig = cloneDeep(props.uiConfig as WorkflowConfig); let newConfig = formikToUiConfig(values, existingConfig); switch (props.context) { @@ -349,11 +366,18 @@ export function ProcessorsList(props: ProcessorsListProps) { hasErrors && allTouched ? 'Invalid or missing fields detected' : undefined; - const processorRuntimeError = getIn( - ingestPipelineErrors, - `${processorIndex}.errorMsg`, - undefined - ) as string | undefined; + const processorRuntimeError = + props.context === PROCESSOR_CONTEXT.INGEST + ? getIn( + ingestPipelineErrors, + `${processorIndex}.errorMsg`, + undefined + ) + : getIn( + searchPipelineErrors, + `${processorIndex}.errorMsg`, + undefined + ); return ( diff --git a/public/store/reducers/errors_reducer.ts b/public/store/reducers/errors_reducer.ts index 55e6db2e..02660450 100644 --- a/public/store/reducers/errors_reducer.ts +++ b/public/store/reducers/errors_reducer.ts @@ -13,6 +13,7 @@ export const INITIAL_ERRORS_STATE = { const ERRORS_PREFIX = 'errors'; const SET_INGEST_PIPELINE_ERRORS = `${ERRORS_PREFIX}/setIngestPipelineErrors`; +const SET_SEARCH_PIPELINE_ERRORS = `${ERRORS_PREFIX}/setSearchPipelineErrors`; export const setIngestPipelineErrors = createAsyncThunk( SET_INGEST_PIPELINE_ERRORS, @@ -21,14 +22,25 @@ export const setIngestPipelineErrors = createAsyncThunk( } ); +export const setSearchPipelineErrors = createAsyncThunk( + SET_SEARCH_PIPELINE_ERRORS, + async ({ errors }: { errors: SearchPipelineErrors }, { rejectWithValue }) => { + return errors; + } +); + const errorsSlice = createSlice({ name: ERRORS_PREFIX, initialState: INITIAL_ERRORS_STATE, reducers: {}, extraReducers: (builder) => { - builder.addCase(setIngestPipelineErrors.fulfilled, (state, action) => { - state.ingestPipeline = action.payload; - }); + builder + .addCase(setIngestPipelineErrors.fulfilled, (state, action) => { + state.ingestPipeline = action.payload; + }) + .addCase(setSearchPipelineErrors.fulfilled, (state, action) => { + state.searchPipeline = action.payload; + }); }, }); From b12278f993a1b9807ea67642446b628165052dee Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Wed, 5 Feb 2025 09:21:50 -0800 Subject: [PATCH 8/8] Fix bug of dup index name on existing config Signed-off-by: Tyler Ohlsen --- public/utils/config_to_schema_utils.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/public/utils/config_to_schema_utils.ts b/public/utils/config_to_schema_utils.ts index 96ca938c..3465066f 100644 --- a/public/utils/config_to_schema_utils.ts +++ b/public/utils/config_to_schema_utils.ts @@ -77,9 +77,11 @@ function indexConfigToSchema( 'name', 'This index name is already in use. Use a different name', (name) => { - return !Object.values(indices || {}) - .map((index) => index.name) - .includes(name || ''); + return !( + Object.values(indices || {}) + .map((index) => index.name) + .includes(name || '') && name !== indexConfig?.name?.value + ); } ) .required('Required') as yup.Schema;