diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/index.ts b/web/app/components/workflow/hooks/use-workflow-run-event/index.ts new file mode 100644 index 0000000000..6c83e24994 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/index.ts @@ -0,0 +1,11 @@ +export * from './use-workflow-started' +export * from './use-workflow-finished' +export * from './use-workflow-failed' +export * from './use-workflow-node-started' +export * from './use-workflow-node-finished' +export * from './use-workflow-node-iteration-started' +export * from './use-workflow-node-iteration-next' +export * from './use-workflow-node-iteration-finished' +export * from './use-workflow-node-retry' +export * from './use-workflow-text-chunk' +export * from './use-workflow-text-replace' \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-failed.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-failed.ts new file mode 100644 index 0000000000..14ee69447a --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-failed.ts @@ -0,0 +1,26 @@ +import { useCallback } from 'react' +import produce from 'immer' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { WorkflowRunningStatus } from '@/app/components/workflow/types' + +export const useWorkflowFailed = () => { + const workflowStore = useWorkflowStore() + + const handleWorkflowFailed = useCallback(() => { + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.result = { + ...draft.result, + status: WorkflowRunningStatus.Failed, + } + })) + }, []) + + return { + handleWorkflowFailed, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-finished.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-finished.ts new file mode 100644 index 0000000000..25364a041f --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-finished.ts @@ -0,0 +1,35 @@ +import { useCallback } from 'react' +import produce from 'immer' +import type { WorkflowFinishedResponse } from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { getFilesInLogs } from '@/app/components/base/file-uploader/utils' + +export const useWorkflowFinished = () => { + const workflowStore = useWorkflowStore() + + const handleWorkflowFinished = useCallback((params: WorkflowFinishedResponse) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string' + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.result = { + ...draft.result, + ...data, + files: getFilesInLogs(data.outputs), + } as any + if (isStringOutput) { + draft.resultTabActive = true + draft.resultText = data.outputs[Object.keys(data.outputs)[0]] + } + })) + }, []) + + return { + handleWorkflowFinished, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts new file mode 100644 index 0000000000..82da4af499 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts @@ -0,0 +1,153 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import produce from 'immer' +import type { NodeFinishedResponse } from '@/types/workflow' +import { + BlockEnum, + NodeRunningStatus, +} from '@/app/components/workflow/types' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types' + +export const useWorkflowNodeFinished = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + + const handleWorkflowNodeFinished = useCallback((params: NodeFinishedResponse) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + } = store.getState() + const nodes = getNodes() + const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId + if (nodeParentId) { + if (!data.execution_metadata.parallel_mode_run_id) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterationIndex = data.execution_metadata?.iteration_index || 0 + if (!iterations.details[iterationIndex]) + iterations.details[iterationIndex] = [] + + const currIteration = iterations.details[iterationIndex] + const nodeIndex = currIteration.findIndex(node => + node.node_id === data.node_id && ( + node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), + ) + if (nodeIndex !== -1) { + currIteration[nodeIndex] = { + ...currIteration[nodeIndex], + ...(currIteration[nodeIndex].retryDetail + ? { retryDetail: currIteration[nodeIndex].retryDetail } + : {}), + ...data, + } as any + } + else { + currIteration.push({ + ...data, + } as any) + } + } + })) + } + else { + // open parallel mode + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterRunID = data.execution_metadata?.parallel_mode_run_id + + const currIteration = iterParallelLogMap.get(iterations.node_id)?.get(iterRunID) + const nodeIndex = currIteration?.findIndex(node => + node.node_id === data.node_id && ( + node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id), + ) + if (currIteration) { + if (nodeIndex !== undefined && nodeIndex !== -1) { + currIteration[nodeIndex] = { + ...currIteration[nodeIndex], + ...data, + } as any + } + else { + currIteration.push({ + ...data, + } as any) + } + } + setIterParallelLogMap(iterParallelLogMap) + const iterLogMap = iterParallelLogMap.get(iterations.node_id) + if (iterLogMap) + iterations.details = Array.from(iterLogMap.values()) + } + })) + } + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const currentIndex = draft.tracing!.findIndex((trace) => { + if (!trace.execution_metadata?.parallel_id) + return trace.node_id === data.node_id + return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id + }) + if (currentIndex > -1 && draft.tracing) { + draft.tracing[currentIndex] = { + ...data, + ...(draft.tracing[currentIndex].extras + ? { extras: draft.tracing[currentIndex].extras } + : {}), + ...(draft.tracing[currentIndex].retryDetail + ? { retryDetail: draft.tracing[currentIndex].retryDetail } + : {}), + } as any + } + })) + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + currentNode.data._runningStatus = data.status as any + if (data.status === NodeRunningStatus.Exception) { + if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch) + currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch + } + else { + if (data.node_type === BlockEnum.IfElse) + currentNode.data._runningBranchId = data?.outputs?.selected_case_id + + if (data.node_type === BlockEnum.QuestionClassifier) + currentNode.data._runningBranchId = data?.outputs?.class_id + } + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter((edge) => { + return edge.target === data.node_id + }) + incomeEdges.forEach((edge) => { + edge.data = { + ...edge.data, + _targetRunningStatus: data.status as any, + } + }) + }) + setEdges(newEdges) + } + }, []) + + return { + handleWorkflowNodeFinished, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-finished.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-finished.ts new file mode 100644 index 0000000000..d969662de3 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-finished.ts @@ -0,0 +1,48 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import produce from 'immer' +import type { IterationFinishedResponse } from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { NodeRunningStatus } from '@/app/components/workflow/types' +import { DEFAULT_ITER_TIMES } from '@/app/components/workflow/constants' + +export const useWorkflowNodeIterationFinished = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + + const handleWorkflowNodeIterationFinished = useCallback((params: IterationFinishedResponse) => { + const { data } = params + + const { + workflowRunningData, + setWorkflowRunningData, + setIterTimes, + } = workflowStore.getState() + const { + getNodes, + setNodes, + } = store.getState() + const nodes = getNodes() + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const currIterationNode = tracing.find(trace => trace.node_id === data.node_id) + if (currIterationNode) { + Object.assign(currIterationNode, { + ...data, + status: NodeRunningStatus.Succeeded, + }) + } + })) + setIterTimes(DEFAULT_ITER_TIMES) + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + + currentNode.data._runningStatus = data.status + }) + setNodes(newNodes) + }, []) + + return { + handleWorkflowNodeIterationFinished, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-next.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-next.ts new file mode 100644 index 0000000000..305b376417 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-next.ts @@ -0,0 +1,48 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import produce from 'immer' +import type { IterationNextResponse } from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowNodeIterationNext = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + + const handleWorkflowNodeIterationNext = useCallback((params: IterationNextResponse) => { + const { + workflowRunningData, + setWorkflowRunningData, + iterTimes, + setIterTimes, + } = workflowStore.getState() + + const { data } = params + const { + getNodes, + setNodes, + } = store.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id) + if (iteration) { + if (iteration.iterDurationMap && data.duration) + iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration + if (iteration.details!.length >= iteration.metadata.iterator_length!) + return + } + if (!data.parallel_mode_run_id) + iteration?.details!.push([]) + })) + const nodes = getNodes() + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + currentNode.data._iterationIndex = iterTimes + setIterTimes(iterTimes + 1) + }) + setNodes(newNodes) + }, []) + + return { + handleWorkflowNodeIterationNext, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-started.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-started.ts new file mode 100644 index 0000000000..cf381ba0bf --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-iteration-started.ts @@ -0,0 +1,87 @@ +import { useCallback } from 'react' +import { + useReactFlow, + useStoreApi, +} from 'reactflow' +import produce from 'immer' +import { useWorkflowStore } from '@/app/components/workflow/store' +import type { IterationStartedResponse } from '@/types/workflow' +import { NodeRunningStatus } from '@/app/components/workflow/types' +import { DEFAULT_ITER_TIMES } from '@/app/components/workflow/constants' + +export const useWorkflowNodeIterationStarted = () => { + const store = useStoreApi() + const reactflow = useReactFlow() + const workflowStore = useWorkflowStore() + + const handleWorkflowNodeIterationStarted = useCallback(( + params: IterationStartedResponse, + containerParams: { + clientWidth: number, + clientHeight: number, + }, + ) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + setIterTimes, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + transform, + } = store.getState() + const nodes = getNodes() + setIterTimes(DEFAULT_ITER_TIMES) + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + details: [], + iterDurationMap: {}, + } as any) + })) + + const { + setViewport, + } = reactflow + const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) + const currentNode = nodes[currentNodeIndex] + const position = currentNode.position + const zoom = transform[2] + + if (!currentNode.parentId) { + setViewport({ + x: (containerParams.clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, + y: (containerParams.clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, + zoom: transform[2], + }) + } + const newNodes = produce(nodes, (draft) => { + draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running + draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length + draft[currentNodeIndex].data._waitingRun = false + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter(edge => edge.target === data.node_id) + + incomeEdges.forEach((edge) => { + edge.data = { + ...edge.data, + _sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus, + _targetRunningStatus: NodeRunningStatus.Running, + _waitingRun: false, + } + }) + }) + setEdges(newEdges) + }, []) + + return { + handleWorkflowNodeIterationStarted, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-retry.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-retry.ts new file mode 100644 index 0000000000..e391a1e47c --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-retry.ts @@ -0,0 +1,98 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import produce from 'immer' +import type { + NodeFinishedResponse, + NodeTracing, +} from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowNodeRetry = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + + const handleWorkflowNodeRetry = useCallback((params: NodeFinishedResponse) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + } = store.getState() + + const nodes = getNodes() + const currentNode = nodes.find(node => node.id === data.node_id)! + const nodeParent = nodes.find(node => node.id === currentNode.parentId) + if (nodeParent) { + if (!data.execution_metadata.parallel_mode_run_id) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iteration = tracing.find(trace => trace.node_id === nodeParent.id) + + if (iteration && iteration.details?.length) { + const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id) + + if (currentNodeRetry) { + if (currentNodeRetry?.retryDetail) + currentNodeRetry?.retryDetail.push(data as NodeTracing) + else + currentNodeRetry.retryDetail = [data as NodeTracing] + } + } + })) + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iteration = tracing.find(trace => trace.node_id === nodeParent.id) + + if (iteration && iteration.details?.length) { + const iterRunID = data.execution_metadata?.parallel_mode_run_id + + const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID) + const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id) + + if (currentNodeRetry) { + if (currentNodeRetry?.retryDetail) + currentNodeRetry?.retryDetail.push(data as NodeTracing) + else + currentNodeRetry.retryDetail = [data as NodeTracing] + } + setIterParallelLogMap(iterParallelLogMap) + const iterLogMap = iterParallelLogMap.get(iteration.node_id) + if (iterLogMap) + iteration.details = Array.from(iterLogMap.values()) + } + })) + } + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id) + + if (currentRetryNodeIndex > -1) { + const currentRetryNode = tracing[currentRetryNodeIndex] + if (currentRetryNode.retryDetail) + draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing) + else + draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing] + } + })) + } + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + + currentNode.data._retryIndex = data.retry_index + }) + setNodes(newNodes) + }, []) + + return { + handleWorkflowNodeRetry, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts new file mode 100644 index 0000000000..833425a0be --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts @@ -0,0 +1,121 @@ +import { useCallback } from 'react' +import { + useReactFlow, + useStoreApi, +} from 'reactflow' +import produce from 'immer' +import type { NodeStartedResponse } from '@/types/workflow' +import { NodeRunningStatus } from '@/app/components/workflow/types' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowNodeStarted = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + const reactflow = useReactFlow() + + const handleWorkflowNodeStarted = useCallback(( + params: NodeStartedResponse, + containerParams: { + clientWidth: number, + clientHeight: number, + }, + ) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + transform, + } = store.getState() + const nodes = getNodes() + const node = nodes.find(node => node.id === data.node_id) + if (node?.parentId) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === node?.parentId) + const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1] + if (!data.parallel_run_id) { + currIteration?.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + } + else { + const nodeId = iterations?.node_id as string + if (!iterParallelLogMap.has(nodeId as string)) + iterParallelLogMap.set(iterations?.node_id as string, new Map()) + + const currentIterLogMap = iterParallelLogMap.get(nodeId)! + if (!currentIterLogMap.has(data.parallel_run_id)) + currentIterLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any]) + else + currentIterLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any) + setIterParallelLogMap(iterParallelLogMap) + if (iterations) + iterations.details = Array.from(currentIterLogMap.values()) + } + })) + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + })) + + const { + setViewport, + } = reactflow + const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) + const currentNode = nodes[currentNodeIndex] + const position = currentNode.position + const zoom = transform[2] + + if (!currentNode.parentId) { + setViewport({ + x: (containerParams.clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, + y: (containerParams.clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, + zoom: transform[2], + }) + } + const newNodes = produce(nodes, (draft) => { + draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running + draft[currentNodeIndex].data._waitingRun = false + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter((edge) => { + return edge.target === data.node_id + }) + + incomeEdges.forEach((edge) => { + const incomeNode = nodes.find(node => node.id === edge.source)! + if ( + (!incomeNode.data._runningBranchId && edge.sourceHandle === 'source') + || (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId) + ) { + edge.data = { + ...edge.data, + _sourceRunningStatus: incomeNode.data._runningStatus, + _targetRunningStatus: NodeRunningStatus.Running, + _waitingRun: false, + } + } + }) + }) + setEdges(newEdges) + } + }, []) + + return { + handleWorkflowNodeStarted, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts new file mode 100644 index 0000000000..cc9433ea0e --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts @@ -0,0 +1,41 @@ +import { + useWorkflowStarted, + useWorkflowFinished, + useWorkflowFailed, + useWorkflowNodeStarted, + useWorkflowNodeFinished, + useWorkflowNodeIterationStarted, + useWorkflowNodeIterationNext, + useWorkflowNodeIterationFinished, + useWorkflowNodeRetry, + useWorkflowTextChunk, + useWorkflowTextReplace, +} from '.' + +export const useWorkflowRunEvent = () => { + const { handleWorkflowStarted } = useWorkflowStarted() + const { handleWorkflowFinished } = useWorkflowFinished() + const { handleWorkflowFailed } = useWorkflowFailed() + const { handleWorkflowNodeStarted } = useWorkflowNodeStarted() + const { handleWorkflowNodeFinished } = useWorkflowNodeFinished() + const { handleWorkflowNodeIterationStarted } = useWorkflowNodeIterationStarted() + const { handleWorkflowNodeIterationNext } = useWorkflowNodeIterationNext() + const { handleWorkflowNodeIterationFinished } = useWorkflowNodeIterationFinished() + const { handleWorkflowNodeRetry } = useWorkflowNodeRetry() + const { handleWorkflowTextChunk } = useWorkflowTextChunk() + const { handleWorkflowTextReplace } = useWorkflowTextReplace() + + return { + handleWorkflowStarted, + handleWorkflowFinished, + handleWorkflowFailed, + handleWorkflowNodeStarted, + handleWorkflowNodeFinished, + handleWorkflowNodeIterationStarted, + handleWorkflowNodeIterationNext, + handleWorkflowNodeIterationFinished, + handleWorkflowNodeRetry, + handleWorkflowTextChunk, + handleWorkflowTextReplace, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-started.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-started.ts new file mode 100644 index 0000000000..d080a89e76 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-started.ts @@ -0,0 +1,58 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import produce from 'immer' +import type { WorkflowStartedResponse } from '@/types/workflow' +import { WorkflowRunningStatus } from '@/app/components/workflow/types' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowStarted = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + + const handleWorkflowStarted = useCallback((params: WorkflowStartedResponse) => { + const { task_id, data } = params + const { + workflowRunningData, + setWorkflowRunningData, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + } = store.getState() + setIterParallelLogMap(new Map()) + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.task_id = task_id + draft.result = { + ...draft?.result, + ...data, + status: WorkflowRunningStatus.Running, + } + })) + const nodes = getNodes() + const newNodes = produce(nodes, (draft) => { + draft.forEach((node) => { + node.data._waitingRun = true + node.data._runningBranchId = undefined + }) + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + draft.forEach((edge) => { + edge.data = { + ...edge.data, + _sourceRunningStatus: undefined, + _targetRunningStatus: undefined, + _waitingRun: true, + } + }) + }) + setEdges(newEdges) + }, []) + + return { + handleWorkflowStarted, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-chunk.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-chunk.ts new file mode 100644 index 0000000000..2d9c8ce93d --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-chunk.ts @@ -0,0 +1,25 @@ +import { useCallback } from 'react' +import produce from 'immer' +import type { TextChunkResponse } from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowTextChunk = () => { + const workflowStore = useWorkflowStore() + + const handleWorkflowTextChunk = useCallback((params: TextChunkResponse) => { + const { data: { text } } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.resultTabActive = true + draft.resultText += text + })) + }, []) + + return { + handleWorkflowTextChunk, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-replace.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-replace.ts new file mode 100644 index 0000000000..2fab8cc5ee --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-text-replace.ts @@ -0,0 +1,23 @@ +import { useCallback } from 'react' +import produce from 'immer' +import type { TextReplaceResponse } from '@/types/workflow' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowTextReplace = () => { + const workflowStore = useWorkflowStore() + + const handleWorkflowTextReplace = useCallback((params: TextReplaceResponse) => { + const { data: { text } } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.resultText = text + })) + }, []) + + return { + handleWorkflowTextReplace, + } +} \ No newline at end of file diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index cc1b0724a9..270f7ada72 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -8,13 +8,9 @@ import { v4 as uuidV4 } from 'uuid' import { usePathname } from 'next/navigation' import { useWorkflowStore } from '../store' import { useNodesSyncDraft } from '../hooks' -import { - BlockEnum, - NodeRunningStatus, - WorkflowRunningStatus, -} from '../types' -import { DEFAULT_ITER_TIMES } from '../constants' +import { WorkflowRunningStatus } from '../types' import { useWorkflowUpdate } from './use-workflow-interactions' +import { useWorkflowRunEvent } from './use-workflow-run-event/use-workflow-run-event' import { useStore as useAppStore } from '@/app/components/app/store' import type { IOtherOptions } from '@/service/base' import { ssePost } from '@/service/base' @@ -24,11 +20,6 @@ import { } from '@/service/workflow' import { useFeaturesStore } from '@/app/components/base/features/hooks' import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager' -import { - getFilesInLogs, -} from '@/app/components/base/file-uploader/utils' -import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types' -import type { NodeTracing } from '@/types/workflow' export const useWorkflowRun = () => { const store = useStoreApi() @@ -38,6 +29,19 @@ export const useWorkflowRun = () => { const { doSyncWorkflowDraft } = useNodesSyncDraft() const { handleUpdateWorkflowCanvas } = useWorkflowUpdate() const pathname = usePathname() + const { + handleWorkflowStarted, + handleWorkflowFinished, + handleWorkflowFailed, + handleWorkflowNodeStarted, + handleWorkflowNodeFinished, + handleWorkflowNodeIterationStarted, + handleWorkflowNodeIterationNext, + handleWorkflowNodeIterationFinished, + handleWorkflowNodeRetry, + handleWorkflowTextChunk, + handleWorkflowTextReplace, + } = useWorkflowRunEvent() const handleBackupDraft = useCallback(() => { const { @@ -135,8 +139,6 @@ export const useWorkflowRun = () => { if (appDetail?.mode === 'workflow') url = `/apps/${appDetail.id}/workflows/draft/run` - let prevNodeId = '' - const { setWorkflowRunningData, } = workflowStore.getState() @@ -169,570 +171,76 @@ export const useWorkflowRun = () => { }, { onWorkflowStarted: (params) => { - const { task_id, data } = params - const { - workflowRunningData, - setWorkflowRunningData, - setIterParallelLogMap, - } = workflowStore.getState() - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - setIterParallelLogMap(new Map()) - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.task_id = task_id - draft.result = { - ...draft?.result, - ...data, - status: WorkflowRunningStatus.Running, - } - })) - const nodes = getNodes() - const newNodes = produce(nodes, (draft) => { - draft.forEach((node) => { - node.data._waitingRun = true - node.data._runningBranchId = undefined - }) - }) - setNodes(newNodes) - const newEdges = produce(edges, (draft) => { - draft.forEach((edge) => { - edge.data = { - ...edge.data, - _sourceRunningStatus: undefined, - _targetRunningStatus: undefined, - _waitingRun: true, - } - }) - }) - setEdges(newEdges) + handleWorkflowStarted(params) if (onWorkflowStarted) onWorkflowStarted(params) }, onWorkflowFinished: (params) => { - const { data } = params - const { - workflowRunningData, - setWorkflowRunningData, - } = workflowStore.getState() - - const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string' - - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.result = { - ...draft.result, - ...data, - files: getFilesInLogs(data.outputs), - } as any - if (isStringOutput) { - draft.resultTabActive = true - draft.resultText = data.outputs[Object.keys(data.outputs)[0]] - } - })) - - prevNodeId = '' + handleWorkflowFinished(params) if (onWorkflowFinished) onWorkflowFinished(params) }, onError: (params) => { - const { - workflowRunningData, - setWorkflowRunningData, - } = workflowStore.getState() - - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.result = { - ...draft.result, - status: WorkflowRunningStatus.Failed, - } - })) + handleWorkflowFailed() if (onError) onError(params) }, onNodeStarted: (params) => { - const { data } = params - const { - workflowRunningData, - setWorkflowRunningData, - iterParallelLogMap, - setIterParallelLogMap, - } = workflowStore.getState() - const { - getNodes, - setNodes, - edges, - setEdges, - transform, - } = store.getState() - const nodes = getNodes() - const node = nodes.find(node => node.id === data.node_id) - if (node?.parentId) { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iterations = tracing.find(trace => trace.node_id === node?.parentId) - const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1] - if (!data.parallel_run_id) { - currIteration?.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) - } - else { - const nodeId = iterations?.node_id as string - if (!iterParallelLogMap.has(nodeId as string)) - iterParallelLogMap.set(iterations?.node_id as string, new Map()) + handleWorkflowNodeStarted( + params, + { + clientWidth, + clientHeight, + }, + ) - const currentIterLogMap = iterParallelLogMap.get(nodeId)! - if (!currentIterLogMap.has(data.parallel_run_id)) - currentIterLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any]) - else - currentIterLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any) - setIterParallelLogMap(iterParallelLogMap) - if (iterations) - iterations.details = Array.from(currentIterLogMap.values()) - } - })) - } - else { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) - })) - - const { - setViewport, - } = reactflow - const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) - const currentNode = nodes[currentNodeIndex] - const position = currentNode.position - const zoom = transform[2] - - if (!currentNode.parentId) { - setViewport({ - x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, - y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, - zoom: transform[2], - }) - } - const newNodes = produce(nodes, (draft) => { - draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running - draft[currentNodeIndex].data._waitingRun = false - }) - setNodes(newNodes) - const newEdges = produce(edges, (draft) => { - const incomeEdges = draft.filter((edge) => { - return edge.target === data.node_id - }) - - incomeEdges.forEach((edge) => { - const incomeNode = nodes.find(node => node.id === edge.source)! - if ( - (!incomeNode.data._runningBranchId && edge.sourceHandle === 'source') - || (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId) - ) { - edge.data = { - ...edge.data, - _sourceRunningStatus: incomeNode.data._runningStatus, - _targetRunningStatus: NodeRunningStatus.Running, - _waitingRun: false, - } - } - }) - }) - setEdges(newEdges) - } if (onNodeStarted) onNodeStarted(params) }, onNodeFinished: (params) => { - const { data } = params - const { - workflowRunningData, - setWorkflowRunningData, - iterParallelLogMap, - setIterParallelLogMap, - } = workflowStore.getState() - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() - const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId - if (nodeParentId) { - if (!data.execution_metadata.parallel_mode_run_id) { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node - - if (iterations && iterations.details) { - const iterationIndex = data.execution_metadata?.iteration_index || 0 - if (!iterations.details[iterationIndex]) - iterations.details[iterationIndex] = [] - - const currIteration = iterations.details[iterationIndex] - const nodeIndex = currIteration.findIndex(node => - node.node_id === data.node_id && ( - node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), - ) - if (nodeIndex !== -1) { - currIteration[nodeIndex] = { - ...currIteration[nodeIndex], - ...(currIteration[nodeIndex].retryDetail - ? { retryDetail: currIteration[nodeIndex].retryDetail } - : {}), - ...data, - } as any - } - else { - currIteration.push({ - ...data, - } as any) - } - } - })) - } - else { - // open parallel mode - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node - - if (iterations && iterations.details) { - const iterRunID = data.execution_metadata?.parallel_mode_run_id - - const currIteration = iterParallelLogMap.get(iterations.node_id)?.get(iterRunID) - const nodeIndex = currIteration?.findIndex(node => - node.node_id === data.node_id && ( - node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id), - ) - if (currIteration) { - if (nodeIndex !== undefined && nodeIndex !== -1) { - currIteration[nodeIndex] = { - ...currIteration[nodeIndex], - ...data, - } as any - } - else { - currIteration.push({ - ...data, - } as any) - } - } - setIterParallelLogMap(iterParallelLogMap) - const iterLogMap = iterParallelLogMap.get(iterations.node_id) - if (iterLogMap) - iterations.details = Array.from(iterLogMap.values()) - } - })) - } - } - else { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const currentIndex = draft.tracing!.findIndex((trace) => { - if (!trace.execution_metadata?.parallel_id) - return trace.node_id === data.node_id - return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id - }) - if (currentIndex > -1 && draft.tracing) { - draft.tracing[currentIndex] = { - ...data, - ...(draft.tracing[currentIndex].extras - ? { extras: draft.tracing[currentIndex].extras } - : {}), - ...(draft.tracing[currentIndex].retryDetail - ? { retryDetail: draft.tracing[currentIndex].retryDetail } - : {}), - } as any - } - })) - const newNodes = produce(nodes, (draft) => { - const currentNode = draft.find(node => node.id === data.node_id)! - currentNode.data._runningStatus = data.status as any - if (data.status === NodeRunningStatus.Exception) { - if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch) - currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch - } - else { - if (data.node_type === BlockEnum.IfElse) - currentNode.data._runningBranchId = data?.outputs?.selected_case_id - - if (data.node_type === BlockEnum.QuestionClassifier) - currentNode.data._runningBranchId = data?.outputs?.class_id - } - }) - setNodes(newNodes) - const newEdges = produce(edges, (draft) => { - const incomeEdges = draft.filter((edge) => { - return edge.target === data.node_id - }) - incomeEdges.forEach((edge) => { - edge.data = { - ...edge.data, - _targetRunningStatus: data.status as any, - } - }) - }) - setEdges(newEdges) - prevNodeId = data.node_id - } + handleWorkflowNodeFinished(params) if (onNodeFinished) onNodeFinished(params) }, onIterationStart: (params) => { - const { data } = params - const { - workflowRunningData, - setWorkflowRunningData, - setIterTimes, - } = workflowStore.getState() - const { - getNodes, - setNodes, - edges, - setEdges, - transform, - } = store.getState() - const nodes = getNodes() - setIterTimes(DEFAULT_ITER_TIMES) - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - details: [], - iterDurationMap: {}, - } as any) - })) - - const { - setViewport, - } = reactflow - const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) - const currentNode = nodes[currentNodeIndex] - const position = currentNode.position - const zoom = transform[2] - - if (!currentNode.parentId) { - setViewport({ - x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, - y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, - zoom: transform[2], - }) - } - const newNodes = produce(nodes, (draft) => { - draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running - draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length - draft[currentNodeIndex].data._waitingRun = false - }) - setNodes(newNodes) - const newEdges = produce(edges, (draft) => { - const incomeEdges = draft.filter(edge => edge.target === data.node_id) - - incomeEdges.forEach((edge) => { - edge.data = { - ...edge.data, - _sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus, - _targetRunningStatus: NodeRunningStatus.Running, - _waitingRun: false, - } - }) - }) - setEdges(newEdges) + handleWorkflowNodeIterationStarted( + params, + { + clientWidth, + clientHeight, + }, + ) if (onIterationStart) onIterationStart(params) }, onIterationNext: (params) => { - const { - workflowRunningData, - setWorkflowRunningData, - iterTimes, - setIterTimes, - } = workflowStore.getState() - - const { data } = params - const { - getNodes, - setNodes, - } = store.getState() - - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id) - if (iteration) { - if (iteration.iterDurationMap && data.duration) - iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration - if (iteration.details!.length >= iteration.metadata.iterator_length!) - return - } - if (!data.parallel_mode_run_id) - iteration?.details!.push([]) - })) - const nodes = getNodes() - const newNodes = produce(nodes, (draft) => { - const currentNode = draft.find(node => node.id === data.node_id)! - currentNode.data._iterationIndex = iterTimes - setIterTimes(iterTimes + 1) - }) - setNodes(newNodes) + handleWorkflowNodeIterationNext(params) if (onIterationNext) onIterationNext(params) }, onIterationFinish: (params) => { - const { data } = params - - const { - workflowRunningData, - setWorkflowRunningData, - setIterTimes, - } = workflowStore.getState() - const { - getNodes, - setNodes, - } = store.getState() - const nodes = getNodes() - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const currIterationNode = tracing.find(trace => trace.node_id === data.node_id) - if (currIterationNode) { - Object.assign(currIterationNode, { - ...data, - status: NodeRunningStatus.Succeeded, - }) - } - })) - setIterTimes(DEFAULT_ITER_TIMES) - const newNodes = produce(nodes, (draft) => { - const currentNode = draft.find(node => node.id === data.node_id)! - - currentNode.data._runningStatus = data.status - }) - setNodes(newNodes) - - prevNodeId = data.node_id + handleWorkflowNodeIterationFinished(params) if (onIterationFinish) onIterationFinish(params) }, onNodeRetry: (params) => { - const { data } = params - const { - workflowRunningData, - setWorkflowRunningData, - iterParallelLogMap, - setIterParallelLogMap, - } = workflowStore.getState() - const { - getNodes, - setNodes, - } = store.getState() - - const nodes = getNodes() - const currentNode = nodes.find(node => node.id === data.node_id)! - const nodeParent = nodes.find(node => node.id === currentNode.parentId) - if (nodeParent) { - if (!data.execution_metadata.parallel_mode_run_id) { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iteration = tracing.find(trace => trace.node_id === nodeParent.id) - - if (iteration && iteration.details?.length) { - const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id) - - if (currentNodeRetry) { - if (currentNodeRetry?.retryDetail) - currentNodeRetry?.retryDetail.push(data as NodeTracing) - else - currentNodeRetry.retryDetail = [data as NodeTracing] - } - } - })) - } - else { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iteration = tracing.find(trace => trace.node_id === nodeParent.id) - - if (iteration && iteration.details?.length) { - const iterRunID = data.execution_metadata?.parallel_mode_run_id - - const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID) - const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id) - - if (currentNodeRetry) { - if (currentNodeRetry?.retryDetail) - currentNodeRetry?.retryDetail.push(data as NodeTracing) - else - currentNodeRetry.retryDetail = [data as NodeTracing] - } - setIterParallelLogMap(iterParallelLogMap) - const iterLogMap = iterParallelLogMap.get(iteration.node_id) - if (iterLogMap) - iteration.details = Array.from(iterLogMap.values()) - } - })) - } - } - else { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id) - - if (currentRetryNodeIndex > -1) { - const currentRetryNode = tracing[currentRetryNodeIndex] - if (currentRetryNode.retryDetail) - draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing) - else - draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing] - } - })) - } - const newNodes = produce(nodes, (draft) => { - const currentNode = draft.find(node => node.id === data.node_id)! - - currentNode.data._retryIndex = data.retry_index - }) - setNodes(newNodes) + handleWorkflowNodeRetry(params) if (onNodeRetry) onNodeRetry(params) }, - onParallelBranchStarted: (params) => { - // console.log(params, 'parallel start') - }, - onParallelBranchFinished: (params) => { - // console.log(params, 'finished') - }, onTextChunk: (params) => { - const { data: { text } } = params - const { - workflowRunningData, - setWorkflowRunningData, - } = workflowStore.getState() - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.resultTabActive = true - draft.resultText += text - })) + handleWorkflowTextChunk(params) }, onTextReplace: (params) => { - const { data: { text } } = params - const { - workflowRunningData, - setWorkflowRunningData, - } = workflowStore.getState() - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.resultText = text - })) + handleWorkflowTextReplace(params) }, onTTSChunk: (messageId: string, audio: string, audioType?: string) => { if (!audio || audio === '')