Pipeline Module¶
Turning complexity into elegance with a workflow engine
Design Philosophy: Why Do We Need Pipeline?¶
As LLM applications become more深入, business logic is becoming increasingly complex. Take a typical RAG (Retrieval-Augmented Generation) scenario as an example:
- Get user input, determine if database lookup is needed based on intent.
- If needed, Embed text into vectors.
- Call vector database for retrieval.
- Merge retrieval results with user input through Template Render to form Prompt.
- Send to LLM for generation; if exceptions occur, retry or enter degraded logic.
If written with traditional imperative programming, this would inevitably result in a full screen of if err != nil, complex parameter passing, and lengthy functions that are difficult to reuse.
GoChat's Pipeline module is born to "decouple and orchestrate complexity" for you:
- Strongly-typed Context Flow: Completely goodbye to using
map[string]anyfor parameter passing that causes "type assertion crashes" and "key spelling errors". With Go 1.24+ generics, you can share global state with absolute type safety. - Programmable Business Nodes (Step): Encapsulate each single responsibility (such as "render template", "call API") as an independent Step, freely pluggable, recombinable, and reusable.
- Declarative Control Flow: Use built-in
NewIfandNewLoopto declare the program's direction like building with blocks, instead of manually writing obscure conditional judgments.
Core Mechanism and Component Flow¶
flowchart LR
TypedContext((Strongly-typed Context\n e.g., *MyRAGContext))
subgraph Pipeline Orchestration Executor
Direction[Sequential Scheduling]
Step1[Step 1\nIntent Understanding]
IfBranch{IfStep\nNeed to Search?}
Step2[Step 2\nVector Retrieval]
Step3[Step 3\nLLM Final Generation]
Direction --> Step1 --> IfBranch
IfBranch -- Yes --> Step2 --> Step3
IfBranch -- No --> Step3
end
Hook[[Hooks Monitoring System\nOnStart/OnError/OnComplete]]
TypedContext --> Step1 --> TypedContext
TypedContext --> Step2 --> TypedContext
TypedContext --> Step3 --> TypedContext
Pipeline -. lifecycle notification .-> Hook
Deep Dive into Generic Flow: Building Strongly-typed Context¶
With generics, you can define business-specific structs and smoothly flow them between all Steps, with IDE auto-completion and compile-time type error prevention.
package main
import (
"context"
"fmt"
"strings"
"github.com/DotNetAge/gochat/pkg/pipeline"
)
// 1. Define extremely explicit strongly-typed business context
type RAGContext struct {
UserQuery string
NeedSearch bool
RetrievedDocs []string
FinalAnswer string
}
// 2. Write single-responsibility Steps
type IntentAnalyzeStep struct{}
func (s *IntentAnalyzeStep) Name() string { return "IntentAnalyze" }
func (s *IntentAnalyzeStep) Execute(ctx context.Context, state *RAGContext) error {
// Goodbye to type assertion, direct strongly-typed operations!
if strings.Contains(state.UserQuery, "latest") {
state.NeedSearch = true
}
return nil
}
func main() {
// 3. Assemble and execute Pipeline
p := pipeline.New[*RAGContext]()
p.AddStep(&IntentAnalyzeStep{})
state := &RAGContext{UserQuery: "Go 1.24 latest features"}
err := p.Execute(context.Background(), state)
if err != nil {
panic(err)
}
fmt.Printf("Intent analysis result: Need to search? %v\n", state.NeedSearch)
}
Declarative Control Flow: If and Loop¶
No need to hardcode jump logic inside Steps; Pipeline provides high-level control at the architectural level.
You can assemble IfStep for conditional branching, and use LoopStep to control bounded infinite loop retries.
// Assume there's a step for executing vector search here
type ExecuteVectorSearchStep struct{}
func (s *ExecuteVectorSearchStep) Name() string { return "VectorSearch" }
func (s *ExecuteVectorSearchStep) Execute(ctx context.Context, state *RAGContext) error {
state.RetrievedDocs = append(state.RetrievedDocs, "Generics make types safer")
return nil
}
// Assemble IfStep for branching
conditionalSearch := pipeline.NewIf[*RAGContext](
"ConditionalSearch",
func(ctx context.Context, state *RAGContext) bool {
return state.NeedSearch // Dynamically decide based on previous step's intent result
},
&ExecuteVectorSearchStep{}, // Then: enter this Step when returning true
nil, // Else: skip with no operation (also supports mounting fallback processing Step)
)
// Build pipeline with branching decisions
p := pipeline.New[*RAGContext]()
p.AddStep(&IntentAnalyzeStep{}).
AddStep(conditionalSearch) // Push branching as a normal step into the pipeline
// Test execution
state := &RAGContext{UserQuery: "Go 1.24 latest features"}
_ = p.Execute(context.Background(), state)
Extension Development Guide: How to Write Custom Steps and Hooks¶
GoChat's workflow is entirely extension-based. Both business operators and global lifecycle interceptors can be completed through simple interface implementation.
1. How to Extend a Step Operator?¶
Implement the pipeline.Step[T] interface. The interface constraints are extremely clean:
Name() string: Used to identify the current node in error reporting and logging systems.Execute(ctx, state T) error: The computation process that reads parameters from state and writes back responses.
If you determine in your logic that the task is completely accomplished or some short-circuit event that doesn't need to be passed backward has occurred, you can directly return the early termination flag:
func (s *MyStep) Execute(ctx context.Context, state *MyContext) error {
if state.AlreadyHasAnswer {
// pipeline.ErrReturn causes the pipeline to exit normally and safely, without triggering Error hooks
return pipeline.ErrReturn
}
return nil
}
2. How to Implement System-Level Integration Hooks (Aspect-Oriented Programming)?¶
The bottleneck of LLM applications is often a slow API call or an extremely long retrieval. Using Pipeline's Hook interface, you can inject unified time consumption monitoring,链路追踪 (Tracing), and logging without invading business code.
type MetricsHook struct{}
func (h *MetricsHook) OnStepStart(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
fmt.Printf("[Tracing] Step [%s] started executing...\n", step.Name())
}
func (h *MetricsHook) OnStepError(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext, err error) {
fmt.Printf("[Alert] Step [%s] crashed: %v\n", step.Name(), err)
}
func (h *MetricsHook) OnStepComplete(ctx context.Context, step pipeline.Step[*RAGContext], state *RAGContext) {
fmt.Printf("[Tracing] Step [%s] executed successfully!\n", step.Name())
}
// Easily mount to any existing Pipeline
func main() {
p := pipeline.New[*RAGContext]()
p.AddHook(&MetricsHook{}) // All Step actions passing through this pipeline will be intercepted and observed
}
Through this mechanism, complex chains like ReAct Agent's "loop think-call tool-reflect" can be easily built with highly scalable, controllable, and readable code skeletons using LoopStep, IfStep, and lifecycle Hook.