模板 1:数据采集工作流
场景描述
定期从多个数据源采集数据,进行清洗和汇总,生成分析报告。
工作流结构
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 数据源 │───>│ 采集 │───>│ 清洗 │───>│ 转换 │
│ 连接 │ │ 引擎 │ │ 处理 │ │ 标准化 │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
┌──────────┐ ┌──────────┐ │
│ 发送 │<───│ 报告 │<─────────────────────┘
│ 通知 │ │ 生成 │
└──────────┘ └──────────┘
模板代码
python
from openclaw import (
Agent, Workflow, Scheduler,
BrowserTool, DataHandler, ReportGenerator
)
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DataSourceConfig:
"""数据源配置"""
name: str
type: str # api, database, file, browser
connection: dict
schedule: str # cron 表达式
extract_query: str
@dataclass
class WorkflowConfig:
"""工作流配置"""
data_sources: List[DataSourceConfig]
transformations: List[callable]
output_format: str
notification: dict
class DataCollectionWorkflow:
"""数据采集工作流模板"""
def __init__(self, config: WorkflowConfig):
self.config = config
self.agent = Agent(Config(
name="data_collection",
role="数据采集助手",
goal="自动化数据收集和报告生成"
))
self.workflow = Workflow()
def build_workflow(self):
"""构建工作流"""
# 1. 创建采集任务
for source in self.config.data_sources:
self._add_extraction_step(source)
# 2. 添加数据转换
self._add_transformation_steps()
# 3. 添加输出生成
self._add_output_step()
# 4. 添加通知步骤
self._add_notification_step()
return self.workflow
def _add_extraction_step(self, source: DataSourceConfig):
"""添加数据提取步骤"""
@self.workflow.step(name=f"extract_{source.name}")
async def extract_step(context):
if source.type == "api":
data = await self._extract_from_api(source)
elif source.type == "database":
data = await self._extract_from_database(source)
elif source.type == "file":
data = self._extract_from_file(source)
elif source.type == "browser":
data = await self._extract_from_browser(source)
context[f"data_{source.name}"] = data
return data
def _add_transformation_steps(self):
"""添加数据转换步骤"""
@self.workflow.step(name="merge_data")
async def merge_step(context):
all_data = {}
for key, value in context.items():
if key.startswith("data_"):
all_data[key] = value
return self._merge_datasets(all_data)
@self.workflow.step(name="clean_data")
async def clean_step(context):
data = context.get("merged_data", [])
for transform in self.config.transformations:
data = transform(data)
return data
def _add_output_step(self):
"""添加输出生成步骤"""
@self.workflow.step(name="generate_output")
async def output_step(context):
cleaned_data = context.get("cleaned_data", [])
if self.config.output_format == "report":
return self._generate_report(cleaned_data)
elif self.config.output_format == "api":
return self._generate_api_response(cleaned_data)
elif self.config.output_format == "database":
return self._save_to_database(cleaned_data)
def _add_notification_step(self):
"""添加通知步骤"""
@self.workflow.step(name="send_notification")
async def notify_step(context):
output = context.get("output", {})
if self.config.notification.get("type") == "email":
await self._send_email(
self.config.notification["recipients"],
output["summary"]
)
elif self.config.notification.get("type") == "slack":
await self._send_slack(
self.config.notification["channel"],
output["summary"]
)
async def run(self):
"""执行工作流"""
self.build_workflow()
return await self.workflow.execute()
# 使用示例
config = WorkflowConfig(
data_sources=[
DataSourceConfig(
name="sales",
type="api",
connection={"url": "https://api.sales.com/v1"},
schedule="0 8 * * *",
extract_query="SELECT * FROM daily_sales"
),
DataSourceConfig(
name="inventory",
type="database",
connection={"db": "inventory.db"},
schedule="0 8 * * *",
extract_query="SELECT * FROM current_stock"
)
],
transformations=[
lambda data: [d for d in data if d.get("status") == "active"],
lambda data: [{**d, "normalized_date": d["date"].isoformat()} for d in data]
],
output_format="report",
notification={
"type": "email",
"recipients": ["team@company.com"]
}
)
workflow = DataCollectionWorkflow(config)
result = asyncio.run(workflow.run())
模板 2:表单处理工作流
场景描述
自动处理来自多个渠道的表单提交,进行验证、审批和数据存储。
工作流结构
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 表单 │───>│ 格式 │───>│ 数据 │───>│ 业务 │
│ 接收 │ │ 统一 │ │ 验证 │ │ 规则 │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
┌─────────────────┬───────────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ 存储 │ │ 异常 │
│ 数据库 │ │ 人工 │
└──────────┘ │ 审批 │
└──────────┘
模板代码
python
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class FormStatus(Enum):
PENDING = "pending"
VALIDATED = "validated"
APPROVED = "approved"
REJECTED = "rejected"
COMPLETED = "completed"
@dataclass
class FormField:
name: str
type: str
required: bool
validation_rules: List[dict]
@dataclass
class FormConfig:
name: str
fields: List[FormField]
storage_table: str
approval_rules: dict
class FormProcessingWorkflow:
"""表单处理工作流模板"""
def __init__(self, config: FormConfig):
self.config = config
self.agent = Agent(Config(
name="form_processor",
role="表单处理助手",
goal="自动化表单验证和处理"
))
async def process_form(self, form_data: dict, source: str) -> dict:
"""
处理单个表单
Args:
form_data: 表单数据
source: 表单来源 (web, api, email)
Returns:
处理结果
"""
# 1. 格式统一
normalized_data = self._normalize_format(form_data, source)
# 2. 数据验证
validation_result = await self._validate_data(normalized_data)
if not validation_result["valid"]:
return {
"status": FormStatus.REJECTED,
"reason": validation_result["errors"]
}
# 3. 业务规则检查
business_result = await self._check_business_rules(normalized_data)
if not business_result["passed"]:
return {
"status": FormStatus.PENDING,
"action": "manual_review",
"reason": business_result["warnings"]
}
# 4. 存储数据
await self._store_data(normalized_data)
# 5. 触发后续流程
await self._trigger_workflows(normalized_data)
return {
"status": FormStatus.APPROVED,
"form_id": normalized_data.get("id"),
"message": "表单处理完成"
}
async def batch_process(self, forms: List[dict]) -> dict:
"""批量处理表单"""
results = []
for form in forms:
result = await self.process_form(form, "batch")
results.append(result)
return {
"total": len(forms),
"approved": sum(1 for r in results if r["status"] == FormStatus.APPROVED),
"pending": sum(1 for r in results if r["status"] == FormStatus.PENDING),
"rejected": sum(1 for r in results if r["status"] == FormStatus.REJECTED),
"results": results
}
模板 3:审批流程工作流
场景描述
实现多级审批流程,自动路由审批请求,跟踪审批状态。
模板代码
python
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ApprovalLevel:
"""审批级别配置"""
level: int
approvers: List[str]
condition: callable # 触发条件
timeout_hours: int
escalation: str # 超时升级策略
@dataclass
class ApprovalRequest:
"""审批请求"""
id: str
type: str
data: dict
requester: str
current_level: int
status: str
history: List[dict]
class ApprovalWorkflow:
"""审批流程工作流模板"""
def __init__(self, name: str, levels: List[ApprovalLevel]):
self.name = name
self.levels = levels
self.agent = Agent(Config(
name=f"approval_{name}",
role="审批助手",
goal="自动化审批流程管理"
))
async def submit_request(self, request: ApprovalRequest) -> dict:
"""提交审批请求"""
request.status = "pending"
request.current_level = 1
request.history = [{
"action": "submit",
"timestamp": datetime.now().isoformat(),
"level": 0
}]
# 启动审批流程
await self._route_to_approvers(request)
return request
async def approve(self, request_id: str, approver: str, comment: str = "") -> dict:
"""审批通过"""
request = await self._get_request(request_id)
# 验证审批权限
current_level_config = self._get_level_config(request.current_level)
if approver not in current_level_config.approvers:
return {"error": "无审批权限"}
# 记录审批
request.history.append({
"action": "approve",
"approver": approver,
"comment": comment,
"timestamp": datetime.now().isoformat(),
"level": request.current_level
})
# 检查是否还有下一级
next_level = request.current_level + 1
if next_level <= len(self.levels):
request.current_level = next_level
await self._route_to_approvers(request)
else:
request.status = "approved"
await self._on_approved(request)
return request
async def reject(self, request_id: str, approver: str, reason: str) -> dict:
"""审批拒绝"""
request = await self._get_request(request_id)
request.history.append({
"action": "reject",
"approver": approver,
"reason": reason,
"timestamp": datetime.now().isoformat(),
"level": request.current_level
})
request.status = "rejected"
await self._on_rejected(request)
return request
async def check_pending(self) -> List[dict]:
"""检查待审批请求"""
pending = await self._get_pending_requests()
# 检查超时
for request in pending:
level_config = self._get_level_config(request.current_level)
elapsed = datetime.now() - request.submitted_at
if elapsed.total_seconds() > level_config.timeout_hours * 3600:
await self._escalate(request, level_config)
return pending
模板 4:监控告警工作流
场景描述
持续监控系统状态,检测异常,自动触发告警和响应动作。
模板代码
python
from typing import Dict, List, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class MonitorConfig:
"""监控配置"""
name: str
metric: str # 监控指标
source: str # 数据源
check_interval: int # 检查间隔(秒)
thresholds: dict # 阈值配置 {warning: 80, critical: 90}
alert_channels: List[str]
@dataclass
class AlertRule:
"""告警规则"""
name: str
condition: callable # 触发条件
severity: str # info, warning, critical
actions: List[Callable]
class MonitoringWorkflow:
"""监控告警工作流模板"""
def __init__(self):
self.monitors: Dict[str, MonitorConfig] = {}
self.alert_rules: List[AlertRule] = []
self.agent = Agent(Config(
name="monitoring",
role="监控助手",
goal="自动化系统监控和告警"
))
def add_monitor(self, config: MonitorConfig):
"""添加监控项"""
self.monitors[config.name] = config
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.alert_rules.append(rule)
async def start_monitoring(self):
"""启动监控"""
import asyncio
while True:
for name, config in self.monitors.items():
await self._check_metric(config)
await asyncio.sleep(1) # 最小检查间隔
async def _check_metric(self, config: MonitorConfig):
"""检查单个指标"""
# 采集指标数据
value = await self._collect_metric(config.metric, config.source)
# 检查阈值
status = "normal"
if value >= config.thresholds.get("critical", 100):
status = "critical"
elif value >= config.thresholds.get("warning", 80):
status = "warning"
# 记录状态
await self._record_status(config.name, value, status)
# 触发告警
if status in ["warning", "critical"]:
await self._trigger_alert(config, value, status)
async def _trigger_alert(self, config: MonitorConfig, value: float, status: str):
"""触发告警"""
alert = {
"monitor": config.name,
"value": value,
"status": status,
"timestamp": datetime.now().isoformat()
}
# 执行告警规则
for rule in self.alert_rules:
if rule.condition(alert):
for action in rule.actions:
await action(alert)
# 发送告警通知
for channel in config.alert_channels:
await self._send_to_channel(channel, alert)
模板 5:定时任务编排
场景描述
编排多个定时任务,处理任务依赖和执行顺序。
模板代码
python
from typing import Dict, List
from dataclasses import dataclass, field
from datetime import datetime
import croniter
@dataclass
class Task:
"""任务定义"""
name: str
schedule: str # cron 表达式
command: callable
dependencies: List[str] = field(default_factory=list)
timeout: int = 300 # 超时时间(秒)
retry_count: int = 3 # 重试次数
priority: int = 0 # 优先级
class ScheduledTaskOrchestrator:
"""定时任务编排器"""
def __init__(self, tasks: List[Task]):
self.tasks = {t.name: t for t in tasks}
self.dependencies = self._build_dependency_graph()
self.agent = Agent(Config(
name="task_orchestrator",
role="任务编排助手",
goal="自动化任务调度和编排"
))
def _build_dependency_graph(self) -> Dict[str, List[str]]:
"""构建依赖图"""
graph = {task.name: set(task.dependencies) for task in self.tasks.values()}
return graph
def get_execution_order(self, task_name: str) -> List[str]:
"""获取任务执行顺序"""
visited = set()
order = []
def visit(name):
if name in visited:
return
visited.add(name)
for dep in self.dependencies.get(name, []):
visit(dep)
order.append(name)
visit(task_name)
return order
async def execute_task(self, task_name: str) -> dict:
"""执行单个任务"""
task = self.tasks[task_name]
# 按依赖顺序执行
execution_order = self.get_execution_order(task_name)
results = {}
for name in execution_order:
if name == task_name:
# 执行目标任务
results[name] = await self._run_with_retry(
self.tasks[name].command,
self.tasks[name].retry_count
)
elif name in self.dependencies.get(task_name, []):
# 执行依赖任务
results[name] = await self._run_with_retry(
self.tasks[name].command,
self.tasks[name].retry_count
)
return {
"task": task_name,
"execution_order": execution_order,
"results": results
}
async def run_scheduled_tasks(self):
"""运行所有定时任务"""
now = datetime.now()
for task in self.tasks.values():
cron = croniter.croniter(task.schedule, now)
next_run = cron.get_next(datetime)
if self._should_run(now, next_run):
await self.execute_task(task.name)
模板使用指南
选择合适的模板
| 场景 | 推荐模板 |
|---|---|
| 数据采集和报告 | 模板 1:数据采集工作流 |
| 表单处理和验证 | 模板 2:表单处理工作流 |
| 多级审批流程 | 模板 3:审批流程工作流 |
| 系统监控告警 | 模板 4:监控告警工作流 |
| 定时任务调度 | 模板 5:定时任务编排 |
自定义扩展
所有模板都支持以下扩展方式:
- 添加自定义步骤: 在
build_workflow()方法中添加新步骤 - 修改验证规则: 重写验证方法实现自定义逻辑
- 集成外部系统: 实现对应的适配器接口
- 调整触发条件: 修改条件判断逻辑