工作流模板

本文档提供可复用的数字员工工作流模板,加速自动化项目开发

模板 1:数据采集工作流

场景描述

定期从多个数据源采集数据,进行清洗和汇总,生成分析报告。

工作流结构

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  数据源  │───>│  采集    │───>│  清洗    │───>│  转换    │
│  连接    │    │  引擎    │    │  处理    │    │  标准化  │
└──────────┘    └──────────┘    └──────────┘    └──────────┘
                                                  │
┌──────────┐    ┌──────────┐                     │
│  发送    │<───│  报告    │<─────────────────────┘
│  通知    │    │  生成    │
└──────────┘    └──────────┘

模板代码

python
from openclaw import (
    Agent, Workflow, Scheduler,
    BrowserTool, DataHandler, ReportGenerator
)
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DataSourceConfig:
    """数据源配置"""
    name: str
    type: str  # api, database, file, browser
    connection: dict
    schedule: str  # cron 表达式
    extract_query: str

@dataclass
class WorkflowConfig:
    """工作流配置"""
    data_sources: List[DataSourceConfig]
    transformations: List[callable]
    output_format: str
    notification: dict

class DataCollectionWorkflow:
    """数据采集工作流模板"""
    
    def __init__(self, config: WorkflowConfig):
        self.config = config
        self.agent = Agent(Config(
            name="data_collection",
            role="数据采集助手",
            goal="自动化数据收集和报告生成"
        ))
        self.workflow = Workflow()
    
    def build_workflow(self):
        """构建工作流"""
        
        # 1. 创建采集任务
        for source in self.config.data_sources:
            self._add_extraction_step(source)
        
        # 2. 添加数据转换
        self._add_transformation_steps()
        
        # 3. 添加输出生成
        self._add_output_step()
        
        # 4. 添加通知步骤
        self._add_notification_step()
        
        return self.workflow
    
    def _add_extraction_step(self, source: DataSourceConfig):
        """添加数据提取步骤"""
        
        @self.workflow.step(name=f"extract_{source.name}")
        async def extract_step(context):
            if source.type == "api":
                data = await self._extract_from_api(source)
            elif source.type == "database":
                data = await self._extract_from_database(source)
            elif source.type == "file":
                data = self._extract_from_file(source)
            elif source.type == "browser":
                data = await self._extract_from_browser(source)
            
            context[f"data_{source.name}"] = data
            return data
    
    def _add_transformation_steps(self):
        """添加数据转换步骤"""
        
        @self.workflow.step(name="merge_data")
        async def merge_step(context):
            all_data = {}
            for key, value in context.items():
                if key.startswith("data_"):
                    all_data[key] = value
            return self._merge_datasets(all_data)
        
        @self.workflow.step(name="clean_data")
        async def clean_step(context):
            data = context.get("merged_data", [])
            for transform in self.config.transformations:
                data = transform(data)
            return data
    
    def _add_output_step(self):
        """添加输出生成步骤"""
        
        @self.workflow.step(name="generate_output")
        async def output_step(context):
            cleaned_data = context.get("cleaned_data", [])
            
            if self.config.output_format == "report":
                return self._generate_report(cleaned_data)
            elif self.config.output_format == "api":
                return self._generate_api_response(cleaned_data)
            elif self.config.output_format == "database":
                return self._save_to_database(cleaned_data)
    
    def _add_notification_step(self):
        """添加通知步骤"""
        
        @self.workflow.step(name="send_notification")
        async def notify_step(context):
            output = context.get("output", {})
            
            if self.config.notification.get("type") == "email":
                await self._send_email(
                    self.config.notification["recipients"],
                    output["summary"]
                )
            elif self.config.notification.get("type") == "slack":
                await self._send_slack(
                    self.config.notification["channel"],
                    output["summary"]
                )
    
    async def run(self):
        """执行工作流"""
        self.build_workflow()
        return await self.workflow.execute()

# 使用示例
config = WorkflowConfig(
    data_sources=[
        DataSourceConfig(
            name="sales",
            type="api",
            connection={"url": "https://api.sales.com/v1"},
            schedule="0 8 * * *",
            extract_query="SELECT * FROM daily_sales"
        ),
        DataSourceConfig(
            name="inventory",
            type="database",
            connection={"db": "inventory.db"},
            schedule="0 8 * * *",
            extract_query="SELECT * FROM current_stock"
        )
    ],
    transformations=[
        lambda data: [d for d in data if d.get("status") == "active"],
        lambda data: [{**d, "normalized_date": d["date"].isoformat()} for d in data]
    ],
    output_format="report",
    notification={
        "type": "email",
        "recipients": ["team@company.com"]
    }
)

workflow = DataCollectionWorkflow(config)
result = asyncio.run(workflow.run())

模板 2:表单处理工作流

场景描述

自动处理来自多个渠道的表单提交,进行验证、审批和数据存储。

工作流结构

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  表单    │───>│  格式    │───>│  数据    │───>│  业务    │
│  接收    │    │  统一    │    │  验证    │    │  规则    │
└──────────┘    └──────────┘    └──────────┘    └──────────┘
                                              │
       ┌─────────────────┬───────────────────┘
       │                 │
       ▼                 ▼
┌──────────┐      ┌──────────┐
│  存储    │      │  异常    │
│  数据库  │      │  人工    │
└──────────┘      │  审批    │
                  └──────────┘

模板代码

python
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class FormStatus(Enum):
    PENDING = "pending"
    VALIDATED = "validated"
    APPROVED = "approved"
    REJECTED = "rejected"
    COMPLETED = "completed"

@dataclass
class FormField:
    name: str
    type: str
    required: bool
    validation_rules: List[dict]

@dataclass
class FormConfig:
    name: str
    fields: List[FormField]
    storage_table: str
    approval_rules: dict

class FormProcessingWorkflow:
    """表单处理工作流模板"""
    
    def __init__(self, config: FormConfig):
        self.config = config
        self.agent = Agent(Config(
            name="form_processor",
            role="表单处理助手",
            goal="自动化表单验证和处理"
        ))
    
    async def process_form(self, form_data: dict, source: str) -> dict:
        """
        处理单个表单
        
        Args:
            form_data: 表单数据
            source: 表单来源 (web, api, email)
        
        Returns:
            处理结果
        """
        # 1. 格式统一
        normalized_data = self._normalize_format(form_data, source)
        
        # 2. 数据验证
        validation_result = await self._validate_data(normalized_data)
        
        if not validation_result["valid"]:
            return {
                "status": FormStatus.REJECTED,
                "reason": validation_result["errors"]
            }
        
        # 3. 业务规则检查
        business_result = await self._check_business_rules(normalized_data)
        
        if not business_result["passed"]:
            return {
                "status": FormStatus.PENDING,
                "action": "manual_review",
                "reason": business_result["warnings"]
            }
        
        # 4. 存储数据
        await self._store_data(normalized_data)
        
        # 5. 触发后续流程
        await self._trigger_workflows(normalized_data)
        
        return {
            "status": FormStatus.APPROVED,
            "form_id": normalized_data.get("id"),
            "message": "表单处理完成"
        }
    
    async def batch_process(self, forms: List[dict]) -> dict:
        """批量处理表单"""
        results = []
        for form in forms:
            result = await self.process_form(form, "batch")
            results.append(result)
        
        return {
            "total": len(forms),
            "approved": sum(1 for r in results if r["status"] == FormStatus.APPROVED),
            "pending": sum(1 for r in results if r["status"] == FormStatus.PENDING),
            "rejected": sum(1 for r in results if r["status"] == FormStatus.REJECTED),
            "results": results
        }

模板 3:审批流程工作流

场景描述

实现多级审批流程,自动路由审批请求,跟踪审批状态。

模板代码

python
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ApprovalLevel:
    """审批级别配置"""
    level: int
    approvers: List[str]
    condition: callable  # 触发条件
    timeout_hours: int
    escalation: str  # 超时升级策略

@dataclass
class ApprovalRequest:
    """审批请求"""
    id: str
    type: str
    data: dict
    requester: str
    current_level: int
    status: str
    history: List[dict]

class ApprovalWorkflow:
    """审批流程工作流模板"""
    
    def __init__(self, name: str, levels: List[ApprovalLevel]):
        self.name = name
        self.levels = levels
        self.agent = Agent(Config(
            name=f"approval_{name}",
            role="审批助手",
            goal="自动化审批流程管理"
        ))
    
    async def submit_request(self, request: ApprovalRequest) -> dict:
        """提交审批请求"""
        request.status = "pending"
        request.current_level = 1
        request.history = [{
            "action": "submit",
            "timestamp": datetime.now().isoformat(),
            "level": 0
        }]
        
        # 启动审批流程
        await self._route_to_approvers(request)
        
        return request
    
    async def approve(self, request_id: str, approver: str, comment: str = "") -> dict:
        """审批通过"""
        request = await self._get_request(request_id)
        
        # 验证审批权限
        current_level_config = self._get_level_config(request.current_level)
        if approver not in current_level_config.approvers:
            return {"error": "无审批权限"}
        
        # 记录审批
        request.history.append({
            "action": "approve",
            "approver": approver,
            "comment": comment,
            "timestamp": datetime.now().isoformat(),
            "level": request.current_level
        })
        
        # 检查是否还有下一级
        next_level = request.current_level + 1
        if next_level <= len(self.levels):
            request.current_level = next_level
            await self._route_to_approvers(request)
        else:
            request.status = "approved"
            await self._on_approved(request)
        
        return request
    
    async def reject(self, request_id: str, approver: str, reason: str) -> dict:
        """审批拒绝"""
        request = await self._get_request(request_id)
        
        request.history.append({
            "action": "reject",
            "approver": approver,
            "reason": reason,
            "timestamp": datetime.now().isoformat(),
            "level": request.current_level
        })
        
        request.status = "rejected"
        await self._on_rejected(request)
        
        return request
    
    async def check_pending(self) -> List[dict]:
        """检查待审批请求"""
        pending = await self._get_pending_requests()
        
        # 检查超时
        for request in pending:
            level_config = self._get_level_config(request.current_level)
            elapsed = datetime.now() - request.submitted_at
            
            if elapsed.total_seconds() > level_config.timeout_hours * 3600:
                await self._escalate(request, level_config)
        
        return pending

模板 4:监控告警工作流

场景描述

持续监控系统状态,检测异常,自动触发告警和响应动作。

模板代码

python
from typing import Dict, List, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MonitorConfig:
    """监控配置"""
    name: str
    metric: str  # 监控指标
    source: str  # 数据源
    check_interval: int  # 检查间隔(秒)
    thresholds: dict  # 阈值配置 {warning: 80, critical: 90}
    alert_channels: List[str]

@dataclass
class AlertRule:
    """告警规则"""
    name: str
    condition: callable  # 触发条件
    severity: str  # info, warning, critical
    actions: List[Callable]

class MonitoringWorkflow:
    """监控告警工作流模板"""
    
    def __init__(self):
        self.monitors: Dict[str, MonitorConfig] = {}
        self.alert_rules: List[AlertRule] = []
        self.agent = Agent(Config(
            name="monitoring",
            role="监控助手",
            goal="自动化系统监控和告警"
        ))
    
    def add_monitor(self, config: MonitorConfig):
        """添加监控项"""
        self.monitors[config.name] = config
    
    def add_alert_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.alert_rules.append(rule)
    
    async def start_monitoring(self):
        """启动监控"""
        import asyncio
        
        while True:
            for name, config in self.monitors.items():
                await self._check_metric(config)
            
            await asyncio.sleep(1)  # 最小检查间隔
    
    async def _check_metric(self, config: MonitorConfig):
        """检查单个指标"""
        # 采集指标数据
        value = await self._collect_metric(config.metric, config.source)
        
        # 检查阈值
        status = "normal"
        if value >= config.thresholds.get("critical", 100):
            status = "critical"
        elif value >= config.thresholds.get("warning", 80):
            status = "warning"
        
        # 记录状态
        await self._record_status(config.name, value, status)
        
        # 触发告警
        if status in ["warning", "critical"]:
            await self._trigger_alert(config, value, status)
    
    async def _trigger_alert(self, config: MonitorConfig, value: float, status: str):
        """触发告警"""
        alert = {
            "monitor": config.name,
            "value": value,
            "status": status,
            "timestamp": datetime.now().isoformat()
        }
        
        # 执行告警规则
        for rule in self.alert_rules:
            if rule.condition(alert):
                for action in rule.actions:
                    await action(alert)
        
        # 发送告警通知
        for channel in config.alert_channels:
            await self._send_to_channel(channel, alert)

模板 5:定时任务编排

场景描述

编排多个定时任务,处理任务依赖和执行顺序。

模板代码

python
from typing import Dict, List
from dataclasses import dataclass, field
from datetime import datetime
import croniter

@dataclass
class Task:
    """任务定义"""
    name: str
    schedule: str  # cron 表达式
    command: callable
    dependencies: List[str] = field(default_factory=list)
    timeout: int = 300  # 超时时间(秒)
    retry_count: int = 3  # 重试次数
    priority: int = 0  # 优先级

class ScheduledTaskOrchestrator:
    """定时任务编排器"""
    
    def __init__(self, tasks: List[Task]):
        self.tasks = {t.name: t for t in tasks}
        self.dependencies = self._build_dependency_graph()
        self.agent = Agent(Config(
            name="task_orchestrator",
            role="任务编排助手",
            goal="自动化任务调度和编排"
        ))
    
    def _build_dependency_graph(self) -> Dict[str, List[str]]:
        """构建依赖图"""
        graph = {task.name: set(task.dependencies) for task in self.tasks.values()}
        return graph
    
    def get_execution_order(self, task_name: str) -> List[str]:
        """获取任务执行顺序"""
        visited = set()
        order = []
        
        def visit(name):
            if name in visited:
                return
            visited.add(name)
            
            for dep in self.dependencies.get(name, []):
                visit(dep)
            
            order.append(name)
        
        visit(task_name)
        return order
    
    async def execute_task(self, task_name: str) -> dict:
        """执行单个任务"""
        task = self.tasks[task_name]
        
        # 按依赖顺序执行
        execution_order = self.get_execution_order(task_name)
        
        results = {}
        for name in execution_order:
            if name == task_name:
                # 执行目标任务
                results[name] = await self._run_with_retry(
                    self.tasks[name].command,
                    self.tasks[name].retry_count
                )
            elif name in self.dependencies.get(task_name, []):
                # 执行依赖任务
                results[name] = await self._run_with_retry(
                    self.tasks[name].command,
                    self.tasks[name].retry_count
                )
        
        return {
            "task": task_name,
            "execution_order": execution_order,
            "results": results
        }
    
    async def run_scheduled_tasks(self):
        """运行所有定时任务"""
        now = datetime.now()
        
        for task in self.tasks.values():
            cron = croniter.croniter(task.schedule, now)
            next_run = cron.get_next(datetime)
            
            if self._should_run(now, next_run):
                await self.execute_task(task.name)

模板使用指南

选择合适的模板

场景 推荐模板
数据采集和报告 模板 1:数据采集工作流
表单处理和验证 模板 2:表单处理工作流
多级审批流程 模板 3:审批流程工作流
系统监控告警 模板 4:监控告警工作流
定时任务调度 模板 5:定时任务编排

自定义扩展

所有模板都支持以下扩展方式:

  1. 添加自定义步骤: 在 build_workflow() 方法中添加新步骤
  2. 修改验证规则: 重写验证方法实现自定义逻辑
  3. 集成外部系统: 实现对应的适配器接口
  4. 调整触发条件: 修改条件判断逻辑