Skip to content

Pipeline Module

Turning complexity into elegance with a workflow engine


Design Philosophy: Why Do We Need Pipeline?

As LLM applications become more深入, business logic is becoming increasingly complex. Take a typical RAG (Retrieval-Augmented Generation) scenario as an example:

  1. Get user input, determine if database lookup is needed based on intent.
  2. If needed, Embed text into vectors.
  3. Call vector database for retrieval.
  4. Merge retrieval results with user input through Template Render to form Prompt.
  5. Send to LLM for generation; if exceptions occur, retry or enter degraded logic.

If written with traditional imperative programming, this would inevitably result in a full screen of if err != nil, complex parameter passing, and lengthy functions that are difficult to reuse.

GoChat's Pipeline module is born to "decouple and orchestrate complexity" for you:

  • Strongly-typed Context Flow: Completely goodbye to using map[string]any for parameter passing that causes "type assertion crashes" and "key spelling errors". With Go 1.24+ generics, you can share global state with absolute type safety.
  • Programmable Business Nodes (Step): Encapsulate each single responsibility (such as "render template", "call API") as an independent Step, freely pluggable, recombinable, and reusable.
  • Declarative Control Flow: Use built-in NewIf and NewLoop to declare the program's direction like building with blocks, instead of manually writing obscure conditional judgments.

Core Mechanism and Component Flow

flowchart LR
    TypedContext((Strongly-typed Context\n e.g., *MyRAGContext))

    subgraph Pipeline Orchestration Executor
        Direction[Sequential Scheduling]

        Step1[Step 1\nIntent Understanding]
        IfBranch{IfStep\nNeed to Search?}
        Step2[Step 2\nVector Retrieval]
        Step3[Step 3\nLLM Final Generation]

        Direction --> Step1 --> IfBranch
        IfBranch -- Yes --> Step2 --> Step3
        IfBranch -- No --> Step3
    end

    Hook[[Hooks Monitoring System\nOnStart/OnError/OnComplete]]

    TypedContext --> Step1 --> TypedContext
    TypedContext --> Step2 --> TypedContext
    TypedContext --> Step3 --> TypedContext

    Pipeline -. lifecycle notification .-> Hook

Deep Dive into Generic Flow: Building Strongly-typed Context

With generics, you can define business-specific structs and smoothly flow them between all Steps, with IDE auto-completion and compile-time type error prevention.

package main

import (
    "context"
    "fmt"
    "strings"
    "github.com/DotNetAge/gochat/pkg/pipeline"
)

// 1. Define extremely explicit strongly-typed business context
type RAGContext struct {
    UserQuery    string
    NeedSearch   bool
    RetrievedDocs []string
    FinalAnswer  string
}

// 2. Write single-responsibility Steps
type IntentAnalyzeStep struct{}
func (s *IntentAnalyzeStep) Name() string { return "IntentAnalyze" }
func (s *IntentAnalyzeStep) Execute(ctx context.Context, state *RAGContext) error {
    // Goodbye to type assertion, direct strongly-typed operations!
    if strings.Contains(state.UserQuery, "latest") {
        state.NeedSearch = true
    }
    return nil
}

func main() {
    // 3. Assemble and execute Pipeline
    p := pipeline.New[*RAGContext]()
    p.AddStep(&IntentAnalyzeStep{})

    state := &RAGContext{UserQuery: "Go 1.24 latest features"}
    err := p.Execute(context.Background(), state)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Intent analysis result: Need to search? %v\n", state.NeedSearch)
}

Declarative Control Flow: If and Loop

No need to hardcode jump logic inside Steps; Pipeline provides high-level control at the architectural level. You can assemble IfStep for conditional branching, and use LoopStep to control bounded infinite loop retries.

// Assume there's a step for executing vector search here
type ExecuteVectorSearchStep struct{}
func (s *ExecuteVectorSearchStep) Name() string { return "VectorSearch" }
func (s *ExecuteVectorSearchStep) Execute(ctx context.Context, state *RAGContext) error {
    state.RetrievedDocs = append(state.RetrievedDocs, "Generics make types safer")
    return nil
}

// Assemble IfStep for branching
conditionalSearch := pipeline.NewIf[*RAGContext](
    "ConditionalSearch",
    func(ctx context.Context, state *RAGContext) bool {
        return state.NeedSearch // Dynamically decide based on previous step's intent result
    },
    &ExecuteVectorSearchStep{}, // Then: enter this Step when returning true
    nil,                        // Else: skip with no operation (also supports mounting fallback processing Step)
)

// Build pipeline with branching decisions
p := pipeline.New[*RAGContext]()
p.AddStep(&IntentAnalyzeStep{}).
  AddStep(conditionalSearch) // Push branching as a normal step into the pipeline

// Test execution
state := &RAGContext{UserQuery: "Go 1.24 latest features"}
_ = p.Execute(context.Background(), state)

Extension Development Guide: How to Write Custom Steps and Hooks

GoChat's workflow is entirely extension-based. Both business operators and global lifecycle interceptors can be completed through simple interface implementation.

1. How to Extend a Step Operator?

Implement the pipeline.Step[T] interface. The interface constraints are extremely clean:

  • Name() string: Used to identify the current node in error reporting and logging systems.
  • Execute(ctx, state T) error: The computation process that reads parameters from state and writes back responses.

If you determine in your logic that the task is completely accomplished or some short-circuit event that doesn't need to be passed backward has occurred, you can directly return the early termination flag:

func (s *MyStep) Execute(ctx context.Context, state *MyContext) error {
    if state.AlreadyHasAnswer {
        // pipeline.ErrReturn causes the pipeline to exit normally and safely, without triggering Error hooks
        return pipeline.ErrReturn
    }
    return nil
}

2. How to Implement System-Level Integration Hooks (Aspect-Oriented Programming)?

The bottleneck of LLM applications is often a slow API call or an extremely long retrieval. Using Pipeline's Hook interface, you can inject unified time consumption monitoring,链路追踪 (Tracing), and logging without invading business code.

type MetricsHook struct{}

func (h *MetricsHook) OnStepStart(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
    fmt.Printf("[Tracing] Step [%s] started executing...\n", step.Name())
}

func (h *MetricsHook) OnStepError(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext, err error) {
    fmt.Printf("[Alert] Step [%s] crashed: %v\n", step.Name(), err)
}

func (h *MetricsHook) OnStepComplete(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
    fmt.Printf("[Tracing] Step [%s] executed successfully!\n", step.Name())
}

// Easily mount to any existing Pipeline
func main() {
    p := pipeline.New[*RAGContext]()
    p.AddHook(&MetricsHook{}) // All Step actions passing through this pipeline will be intercepted and observed
}

Through this mechanism, complex chains like ReAct Agent's "loop think-call tool-reflect" can be easily built with highly scalable, controllable, and readable code skeletons using LoopStep, IfStep, and lifecycle Hook.