自动化示例

本章节提供基于 OpenClaw 的自动化实现示例,涵盖常见业务场景

1. 网页数据抓取

示例:自动采集产品信息

python
from openclaw import Agent, BrowserTool, Config

# 配置 Agent
config = Config(
    name="product_scraper",
    role="电商数据采集助手",
    goal="从指定电商网站采集产品信息"
)

# 创建 Agent
agent = Agent(config)

# 定义数据抓取工具
@agent.tool
async def scrape_products(url: str, category: str) -> dict:
    """
    抓取指定分类的产品列表信息
    
    Args:
        url: 产品列表页面 URL
        category: 产品分类名称
    
    Returns:
        包含产品信息的字典列表
    """
    browser = BrowserTool()
    
    await browser.goto(url)
    
    # 滚动加载更多内容
    for _ in range(5):
        await browser.scroll_down()
        await browser.wait(1)
    
    # 提取产品信息
    products = await browser.evaluate("""
        Array.from(document.querySelectorAll('.product-item'))
            .map(item => ({
                name: item.querySelector('.title')?.textContent?.trim(),
                price: item.querySelector('.price')?.textContent?.trim(),
                url: item.querySelector('a')?.href,
                rating: item.querySelector('.rating')?.textContent?.trim()
            }))
    """)
    
    return {
        "category": category,
        "count": len(products),
        "products": products
    }

# 执行任务
async def main():
    result = await agent.execute(
        "从 https://example.com/electronics 抓取所有电子产品信息"
    )
    print(f"采集到 {result['count']} 个产品")
    return result

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

示例说明

  • 使用 BrowserTool 进行页面导航和交互
  • 通过 JavaScript 表达式提取页面数据
  • 支持分页滚动和数据加载

2. 表单自动填写

示例:批量提交表单数据

python
from openclaw import Agent, BrowserTool, DataHandler

@agent.tool
async def batch_form_submit(
    form_url: str,
    data_file: str,
    field_mapping: dict
) -> dict:
    """
    批量填写和提交表单
    
    Args:
        form_url: 表单页面 URL
        data_file: CSV 数据文件路径
        field_mapping: 字段映射 {csv列名: 表单字段选择器}
    
    Returns:
        提交结果统计
    """
    browser = BrowserTool()
    data_handler = DataHandler()
    
    # 读取数据
    records = data_handler.read_csv(data_file)
    
    results = []
    for record in records:
        await browser.goto(form_url)
        
        # 填充表单
        for csv_field, selector in field_mapping.items():
            value = record.get(csv_field, "")
            await browser.fill(selector, value)
        
        # 提交表单
        await browser.click('#submit-button')
        
        # 等待结果
        await browser.wait_for_selector('.success-message', timeout=10)
        
        results.append({
            "status": "success",
            "record_id": record.get("id")
        })
    
    return {
        "total": len(results),
        "success": sum(1 for r in results if r["status"] == "success"),
        "failed": sum(1 for r in results if r["status"] == "failed"),
        "results": results
    }

3. 定时任务执行

示例:每日报告生成

python
from openclaw import Agent, Scheduler, ReportGenerator
from datetime import datetime, timedelta

class DailyReportAgent:
    """每日报告生成 Agent"""
    
    def __init__(self):
        self.agent = Agent(Config(
            name="daily_report",
            role="报告生成助手",
            goal="自动生成每日业务报告"
        ))
        self.scheduler = Scheduler()
    
    async def collect_data(self, data_sources: list) -> dict:
        """收集各数据源数据"""
        all_data = {}
        
        for source in data_sources:
            if source["type"] == "api":
                data = await self.call_api(source["endpoint"])
            elif source["type"] == "database":
                data = await self.query_database(source["query"])
            elif source["type"] == "file":
                data = self.read_file(source["path"])
            
            all_data[source["name"]] = data
        
        return all_data
    
    async def generate_report(self, data: dict) -> str:
        """生成格式化报告"""
        report = ReportGenerator(
            template="daily_report.html",
            data=data
        )
        return report.render()
    
    async def send_report(self, report: str, recipients: list):
        """发送报告邮件"""
        # 邮件发送逻辑
        pass
    
    async def run_daily_task(self):
        """执行每日任务"""
        # 1. 收集数据
        data = await self.collect_data([
            {"name": "sales", "type": "api", "endpoint": "/api/sales"},
            {"name": "users", "type": "database", "query": "SELECT * FROM daily_stats"},
            {"name": "logs", "type": "file", "path": "/data/logs/today.json"}
        ])
        
        # 2. 生成报告
        report = await self.generate_report(data)
        
        # 3. 发送报告
        await self.send_report(report, ["manager@company.com"])
        
        return "报告生成完成"
    
    def start_scheduler(self):
        """启动定时调度"""
        self.scheduler.every().day.at("08:00").do(self.run_daily_task)
        self.scheduler.start()

if __name__ == "__main__":
    agent = DailyReportAgent()
    agent.start_scheduler()

4. 多步骤工作流

示例:招聘流程自动化

python
from openclaw import Agent, Workflow, parallel_execution

class RecruitmentAutomation:
    """招聘流程自动化"""
    
    def __init__(self):
        self.agent = Agent(Config(
            name="recruitment",
            role="招聘助手",
            goal="自动化处理招聘流程"
        ))
        self.workflow = Workflow()
    
    @agent.tool
    async def screen_resumes(self, resumes: list, criteria: dict) -> list:
        """
        简历筛选
        
        Args:
            resumes: 简历列表
            criteria: 筛选标准 {学历: "本科", 经验: "3年+"}
        
        Returns:
            符合要求的简历列表
        """
        qualified = []
        for resume in resumes:
            if self.match_criteria(resume, criteria):
                qualified.append(resume)
        return qualified
    
    @agent.tool
    async def schedule_interview(
        self,
        candidate: dict,
        interviewers: list,
        availability: list
    ) -> dict:
        """安排面试"""
        # 智能选择面试时间
        for time_slot in availability:
            if all(interviewer["available"] for interviewer in interviewers):
                return {
                    "candidate": candidate["name"],
                    "time": time_slot,
                    "interviewers": [i["name"] for i in interviewers],
                    "meeting_link": f"https://meet.example.com/{time_slot}"
                }
        return None
    
    @agent.tool
    async def send_offer(self, candidate: dict, offer_details: dict) -> bool:
        """发送录用通知"""
        # 邮件发送逻辑
        pass
    
    async def run_full_process(self, job_application_pool: list):
        """执行完整招聘流程"""
        
        # 第一步:简历筛选
        screened = await self.screen_resumes(
            job_application_pool,
            {"学历": "本科", "经验": "3年+", "技能": ["Python", "AI"]}
        )
        
        # 第二步:安排面试(并行)
        interview_results = await parallel_execution([
            self.schedule_interview(c, ["interviewer1@example.com"], ["2024-01-15 10:00"])
            for c in screened[:5]  # 前5名候选人
        ])
        
        # 第三步:发送录用通知
        for result in interview_results:
            if result:
                await self.send_offer(result["candidate"], {
                    "position": "Senior Engineer",
                    "salary": "200k/year"
                })
        
        return {
            "screened": len(screened),
            "interviews_scheduled": len(interview_results),
            "offers_sent": sum(1 for r in interview_results if r)
        }

5. 异常处理模式

示例:容错和重试机制

python
from openclaw import Agent, RetryPolicy, CircuitBreaker

class ResilientAutomation:
    """具备容错能力的自动化流程"""
    
    def __init__(self):
        self.agent = Agent(Config(
            name="resilient",
            role="可靠自动化助手",
            goal="确保任务稳定完成"
        ))
        
        # 配置重试策略
        self.retry_policy = RetryPolicy(
            max_retries=3,
            delay=5,
            exponential_backoff=True,
            retry_on=["timeout", "network_error"]
        )
        
        # 配置熔断器
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_time=60
        )
    
    @agent.tool(retry=self.retry_policy, circuit_breaker=self.circuit_breaker)
    async def reliable_api_call(self, endpoint: str, params: dict) -> dict:
        """可靠的 API 调用,带重试和熔断保护"""
        # 实现带有错误处理的 API 调用
        pass
    
    @agent.tool
    async def safe_browser_operation(
        self,
        operation: callable,
        fallback: callable = None
    ) -> dict:
        """安全的浏览器操作,带回退机制"""
        try:
            return await operation()
        except Exception as e:
            # 记录错误
            print(f"操作失败: {e}")
            
            # 尝试恢复
            if fallback:
                return await fallback()
            
            # 返回错误信息
            return {"status": "error", "message": str(e)}

6. 数据处理管道

示例:ETL 数据处理

python
from openclaw import Agent, DataPipeline, Transformer

class DataETLPipeline:
    """数据 ETL 处理管道"""
    
    def __init__(self):
        self.pipeline = DataPipeline()
        self.agent = Agent(Config(
            name="data_etl",
            role="数据处理助手",
            goal="自动化数据清洗和转换"
        ))
    
    def build_pipeline(self):
        """构建数据处理管道"""
        
        # 定义数据源
        source = self.pipeline.add_source(
            name="csv_files",
            type="file",
            path="/data/input/*.csv"
        )
        
        # 定义转换步骤
        transformer = Transformer()
        
        step1 = self.pipeline.add_step(
            name="clean_data",
            transform=transformer.clean_nulls
        )
        
        step2 = self.pipeline.add_step(
            name="normalize_dates",
            transform=transformer.normalize_date_format,
            columns=["created_at", "updated_at"]
        )
        
        step3 = self.pipeline.add_step(
            name="calculate_metrics",
            transform=transformer.calculate_aggregations,
            metrics=["revenue", "quantity"]
        )
        
        # 定义目标
        self.pipeline.add_target(
            name="processed_data",
            type="database",
            table="daily_metrics"
        )
        
        # 定义依赖关系
        source -> step1 -> step2 -> step3 -> target
        
        return self.pipeline
    
    async def run_etl(self):
        """执行 ETL 流程"""
        pipeline = self.build_pipeline()
        result = await pipeline.execute()
        return result