1. 网页数据抓取
示例:自动采集产品信息
python
from openclaw import Agent, BrowserTool, Config
# 配置 Agent
config = Config(
name="product_scraper",
role="电商数据采集助手",
goal="从指定电商网站采集产品信息"
)
# 创建 Agent
agent = Agent(config)
# 定义数据抓取工具
@agent.tool
async def scrape_products(url: str, category: str) -> dict:
"""
抓取指定分类的产品列表信息
Args:
url: 产品列表页面 URL
category: 产品分类名称
Returns:
包含产品信息的字典列表
"""
browser = BrowserTool()
await browser.goto(url)
# 滚动加载更多内容
for _ in range(5):
await browser.scroll_down()
await browser.wait(1)
# 提取产品信息
products = await browser.evaluate("""
Array.from(document.querySelectorAll('.product-item'))
.map(item => ({
name: item.querySelector('.title')?.textContent?.trim(),
price: item.querySelector('.price')?.textContent?.trim(),
url: item.querySelector('a')?.href,
rating: item.querySelector('.rating')?.textContent?.trim()
}))
""")
return {
"category": category,
"count": len(products),
"products": products
}
# 执行任务
async def main():
result = await agent.execute(
"从 https://example.com/electronics 抓取所有电子产品信息"
)
print(f"采集到 {result['count']} 个产品")
return result
if __name__ == "__main__":
import asyncio
asyncio.run(main())
示例说明
- 使用
BrowserTool进行页面导航和交互 - 通过 JavaScript 表达式提取页面数据
- 支持分页滚动和数据加载
2. 表单自动填写
示例:批量提交表单数据
python
from openclaw import Agent, BrowserTool, DataHandler
@agent.tool
async def batch_form_submit(
form_url: str,
data_file: str,
field_mapping: dict
) -> dict:
"""
批量填写和提交表单
Args:
form_url: 表单页面 URL
data_file: CSV 数据文件路径
field_mapping: 字段映射 {csv列名: 表单字段选择器}
Returns:
提交结果统计
"""
browser = BrowserTool()
data_handler = DataHandler()
# 读取数据
records = data_handler.read_csv(data_file)
results = []
for record in records:
await browser.goto(form_url)
# 填充表单
for csv_field, selector in field_mapping.items():
value = record.get(csv_field, "")
await browser.fill(selector, value)
# 提交表单
await browser.click('#submit-button')
# 等待结果
await browser.wait_for_selector('.success-message', timeout=10)
results.append({
"status": "success",
"record_id": record.get("id")
})
return {
"total": len(results),
"success": sum(1 for r in results if r["status"] == "success"),
"failed": sum(1 for r in results if r["status"] == "failed"),
"results": results
}
3. 定时任务执行
示例:每日报告生成
python
from openclaw import Agent, Scheduler, ReportGenerator
from datetime import datetime, timedelta
class DailyReportAgent:
"""每日报告生成 Agent"""
def __init__(self):
self.agent = Agent(Config(
name="daily_report",
role="报告生成助手",
goal="自动生成每日业务报告"
))
self.scheduler = Scheduler()
async def collect_data(self, data_sources: list) -> dict:
"""收集各数据源数据"""
all_data = {}
for source in data_sources:
if source["type"] == "api":
data = await self.call_api(source["endpoint"])
elif source["type"] == "database":
data = await self.query_database(source["query"])
elif source["type"] == "file":
data = self.read_file(source["path"])
all_data[source["name"]] = data
return all_data
async def generate_report(self, data: dict) -> str:
"""生成格式化报告"""
report = ReportGenerator(
template="daily_report.html",
data=data
)
return report.render()
async def send_report(self, report: str, recipients: list):
"""发送报告邮件"""
# 邮件发送逻辑
pass
async def run_daily_task(self):
"""执行每日任务"""
# 1. 收集数据
data = await self.collect_data([
{"name": "sales", "type": "api", "endpoint": "/api/sales"},
{"name": "users", "type": "database", "query": "SELECT * FROM daily_stats"},
{"name": "logs", "type": "file", "path": "/data/logs/today.json"}
])
# 2. 生成报告
report = await self.generate_report(data)
# 3. 发送报告
await self.send_report(report, ["manager@company.com"])
return "报告生成完成"
def start_scheduler(self):
"""启动定时调度"""
self.scheduler.every().day.at("08:00").do(self.run_daily_task)
self.scheduler.start()
if __name__ == "__main__":
agent = DailyReportAgent()
agent.start_scheduler()
4. 多步骤工作流
示例:招聘流程自动化
python
from openclaw import Agent, Workflow, parallel_execution
class RecruitmentAutomation:
"""招聘流程自动化"""
def __init__(self):
self.agent = Agent(Config(
name="recruitment",
role="招聘助手",
goal="自动化处理招聘流程"
))
self.workflow = Workflow()
@agent.tool
async def screen_resumes(self, resumes: list, criteria: dict) -> list:
"""
简历筛选
Args:
resumes: 简历列表
criteria: 筛选标准 {学历: "本科", 经验: "3年+"}
Returns:
符合要求的简历列表
"""
qualified = []
for resume in resumes:
if self.match_criteria(resume, criteria):
qualified.append(resume)
return qualified
@agent.tool
async def schedule_interview(
self,
candidate: dict,
interviewers: list,
availability: list
) -> dict:
"""安排面试"""
# 智能选择面试时间
for time_slot in availability:
if all(interviewer["available"] for interviewer in interviewers):
return {
"candidate": candidate["name"],
"time": time_slot,
"interviewers": [i["name"] for i in interviewers],
"meeting_link": f"https://meet.example.com/{time_slot}"
}
return None
@agent.tool
async def send_offer(self, candidate: dict, offer_details: dict) -> bool:
"""发送录用通知"""
# 邮件发送逻辑
pass
async def run_full_process(self, job_application_pool: list):
"""执行完整招聘流程"""
# 第一步:简历筛选
screened = await self.screen_resumes(
job_application_pool,
{"学历": "本科", "经验": "3年+", "技能": ["Python", "AI"]}
)
# 第二步:安排面试(并行)
interview_results = await parallel_execution([
self.schedule_interview(c, ["interviewer1@example.com"], ["2024-01-15 10:00"])
for c in screened[:5] # 前5名候选人
])
# 第三步:发送录用通知
for result in interview_results:
if result:
await self.send_offer(result["candidate"], {
"position": "Senior Engineer",
"salary": "200k/year"
})
return {
"screened": len(screened),
"interviews_scheduled": len(interview_results),
"offers_sent": sum(1 for r in interview_results if r)
}
5. 异常处理模式
示例:容错和重试机制
python
from openclaw import Agent, RetryPolicy, CircuitBreaker
class ResilientAutomation:
"""具备容错能力的自动化流程"""
def __init__(self):
self.agent = Agent(Config(
name="resilient",
role="可靠自动化助手",
goal="确保任务稳定完成"
))
# 配置重试策略
self.retry_policy = RetryPolicy(
max_retries=3,
delay=5,
exponential_backoff=True,
retry_on=["timeout", "network_error"]
)
# 配置熔断器
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_time=60
)
@agent.tool(retry=self.retry_policy, circuit_breaker=self.circuit_breaker)
async def reliable_api_call(self, endpoint: str, params: dict) -> dict:
"""可靠的 API 调用,带重试和熔断保护"""
# 实现带有错误处理的 API 调用
pass
@agent.tool
async def safe_browser_operation(
self,
operation: callable,
fallback: callable = None
) -> dict:
"""安全的浏览器操作,带回退机制"""
try:
return await operation()
except Exception as e:
# 记录错误
print(f"操作失败: {e}")
# 尝试恢复
if fallback:
return await fallback()
# 返回错误信息
return {"status": "error", "message": str(e)}
6. 数据处理管道
示例:ETL 数据处理
python
from openclaw import Agent, DataPipeline, Transformer
class DataETLPipeline:
"""数据 ETL 处理管道"""
def __init__(self):
self.pipeline = DataPipeline()
self.agent = Agent(Config(
name="data_etl",
role="数据处理助手",
goal="自动化数据清洗和转换"
))
def build_pipeline(self):
"""构建数据处理管道"""
# 定义数据源
source = self.pipeline.add_source(
name="csv_files",
type="file",
path="/data/input/*.csv"
)
# 定义转换步骤
transformer = Transformer()
step1 = self.pipeline.add_step(
name="clean_data",
transform=transformer.clean_nulls
)
step2 = self.pipeline.add_step(
name="normalize_dates",
transform=transformer.normalize_date_format,
columns=["created_at", "updated_at"]
)
step3 = self.pipeline.add_step(
name="calculate_metrics",
transform=transformer.calculate_aggregations,
metrics=["revenue", "quantity"]
)
# 定义目标
self.pipeline.add_target(
name="processed_data",
type="database",
table="daily_metrics"
)
# 定义依赖关系
source -> step1 -> step2 -> step3 -> target
return self.pipeline
async def run_etl(self):
"""执行 ETL 流程"""
pipeline = self.build_pipeline()
result = await pipeline.execute()
return result