跳转至

Pipeline 模块

把复杂变成优雅的工作流引擎


设计理念:为什么我们需要 Pipeline?

随着 LLM 应用的深入,业务逻辑正在变得越发庞杂。以一个典型的 RAG(检索增强生成)场景为例: 1. 获取用户输入,判断意图是否需要查库。 2. 若需要,则对文本进行 Embedding 向量化。 3. 调用向量数据库进行检索。 4. 将检索结果与用户输入通过模板渲染(Template Render)合并为 Prompt。 5. 发送给大模型进行生成,一旦发生异常还需要重试或进入降级逻辑。

如果用传统的命令式编程编写,这势必产生满屏的 if err != nil、复杂的参数传递和难以复用的冗长函数。

GoChat 的 Pipeline 模块正是为您“解耦并编排复杂性”而生: - 强类型上下文流转:彻底告别使用 map[string]any 传递参数导致的“类型断言崩溃”与“Key 拼写错误”。借助 Go 1.24+ 泛型,您能以绝对类型安全的方式共享全局状态。 - 可编程的业务节点 (Step):将每一项单一职责(如“渲染模板”、“调用 API”)封装为一个独立的 Step,可以自由拔插、重组和复用。 - 声明式的控制流:像搭积木一样使用内置的 NewIfNewLoop 来声明程序的走向,而非手动编写晦涩的条件判断。


核心机制与组件流转

flowchart LR
    TypedContext((强类型上下文\n例如 *MyRAGContext))

    subgraph Pipeline 编排执行器
        Direction[顺序调度]

        Step1[Step 1\n理解意图]
        IfBranch{IfStep\n需要查资料?}
        Step2[Step 2\n向量检索]
        Step3[Step 3\nLLM 最终生成]

        Direction --> Step1 --> IfBranch
        IfBranch -- Yes --> Step2 --> Step3
        IfBranch -- No --> Step3
    end

    Hook[[Hooks 监控系统\nOnStart/OnError/OnComplete]]

    TypedContext --> Step1 --> TypedContext
    TypedContext --> Step2 --> TypedContext
    TypedContext --> Step3 --> TypedContext

    Pipeline -. lifecycle notification .-> Hook

深入泛型流转:构建强类型的 Context

利用泛型,您可以定义属于您业务特有结构体,并在所有 Step 间丝滑流转,IDE 自动补全,编译期杜绝类型错误。

package main

import (
    "context"
    "fmt"
    "strings"
    "github.com/DotNetAge/gochat/pkg/pipeline"
)

// 1. 定义极其明确的强类型业务上下文
type RAGContext struct {
    UserQuery    string
    NeedSearch   bool
    RetrievedDocs []string
    FinalAnswer  string
}

// 2. 编写职责单一的 Step
type IntentAnalyzeStep struct{}
func (s *IntentAnalyzeStep) Name() string { return "IntentAnalyze" }
func (s *IntentAnalyzeStep) Execute(ctx context.Context, state *RAGContext) error {
    // 告别类型断言,直接强类型操作!
    if strings.Contains(state.UserQuery, "最新") {
        state.NeedSearch = true
    }
    return nil
}

func main() {
    // 3. 组装并执行 Pipeline
    p := pipeline.New[*RAGContext]()
    p.AddStep(&IntentAnalyzeStep{})

    state := &RAGContext{UserQuery: "Go 1.24 最新特性"}
    err := p.Execute(context.Background(), state)
    if err != nil {
        panic(err)
    }

    fmt.Printf("意图分析结果: 是否需要检索? %v\n", state.NeedSearch)
}

声明式控制流:If 与 Loop

不需要在 Step 内部硬编码跳转逻辑,Pipeline 在架构层面提供了高维度的控制手段。 您可以组装 IfStep 控制条件分支,以及通过 LoopStep 控制带有上限次数的死循环重试。

// 假设这里有一个执行向量搜索的步骤
type ExecuteVectorSearchStep struct{}
func (s *ExecuteVectorSearchStep) Name() string { return "VectorSearch" }
func (s *ExecuteVectorSearchStep) Execute(ctx context.Context, state *RAGContext) error {
    state.RetrievedDocs = append(state.RetrievedDocs, "泛型使得类型更安全")
    return nil
}

// 组装 IfStep 控制分支
conditionalSearch := pipeline.NewIf[*RAGContext](
    "ConditionalSearch",
    func(ctx context.Context, state *RAGContext) bool {
        return state.NeedSearch // 根据上一步的意图结果动态决策
    },
    &ExecuteVectorSearchStep{}, // Then: 当返回 true 时进入该 Step
    nil,                        // Else: 无操作直接跳过 (也支持挂载降级处理的 Step)
)

// 构建拥有分支决策的管线
p := pipeline.New[*RAGContext]()
p.AddStep(&IntentAnalyzeStep{}).
  AddStep(conditionalSearch) // 将分支作为一个普通步骤推入管线

// 测试执行
state := &RAGContext{UserQuery: "Go 1.24 最新特性"}
_ = p.Execute(context.Background(), state)

扩展开发指南:如何编写自定义 Step 与 Hook

GoChat 的工作流完全是基于扩展设计的。无论是业务算子还是全局生命周期拦截器,都可以通过简单的接口实现来完成。

1. 如何扩展一个 Step 算子?

实现 pipeline.Step[T] 接口即可。接口约束极为干净: - Name() string: 用于在报错和日志系统中识别当前节点。 - Execute(ctx, state T) error: 从状态读取参数并写回响应的计算过程。

如果在您的逻辑中认定任务已经彻底达成或者发生了某种无需向后传递的短路事件,可以直接返回提前终止标志:

func (s *MyStep) Execute(ctx context.Context, state *MyContext) error {
    if state.AlreadyHasAnswer {
        // pipeline.ErrReturn 会使管线正常并安全地退出,不会触发 Error 钩子
        return pipeline.ErrReturn 
    }
    return nil
}

2. 如何实现系统级集成 Hook(面向切面编程)?

大模型应用的瓶颈往往在于某一次慢速的 API 调用或超长检索。利用 Pipeline 的 Hook 接口,您无需侵入业务代码,即可在外部注入统一的耗时监控、链路追踪(Tracing)和日志记录。

type MetricsHook struct{}

func (h *MetricsHook) OnStepStart(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
    fmt.Printf("[追踪] 步骤 [%s] 开始执行...\n", step.Name())
}

func (h *MetricsHook) OnStepError(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext, err error) {
    fmt.Printf("[警报] 步骤 [%s] 发生崩溃: %v\n", step.Name(), err)
}

func (h *MetricsHook) OnStepComplete(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
    fmt.Printf("[追踪] 步骤 [%s] 执行成功!\n", step.Name())
}

// 轻松挂载到任何已有 Pipeline
func main() {
    p := pipeline.New[*RAGContext]()
    p.AddHook(&MetricsHook{}) // 所有经过此管线的 Step 动作都会被它劫持观察
}

通过这套机制,复杂如 ReAct Agent 的“循环思考-调用工具-反思”链条,都能通过 LoopStepIfStep 和生命周期 Hook 轻松搭建出极具扩展性、控制性和可读性的代码骨架。