最佳实践研究

本文档总结数字员工开发与运营的最佳实践,涵盖性能优化、安全性、可靠性、可维护性等关键方面

1. 性能优化最佳实践

1.1 并发处理

原则:合理使用并发提升吞吐量

python
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Callable
import asyncio

class PerformanceOptimizer:
    """性能优化最佳实践"""
    
    # 1. 任务级并发:同时处理多个独立任务
    async def process_tasks_concurrently(
        self,
        tasks: List[Callable],
        max_workers: int = 5
    ) -> List[dict]:
        """并发执行多个独立任务"""
        loop = asyncio.get_event_loop()
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                loop.run_in_executor(executor, task)
                for task in tasks
            ]
            
            results = []
            for future in as_completed(futures):
                result = await future
                results.append(result)
            
            return results
    
    # 2. IO 异步化:避免阻塞等待
    async def fetch_all_data(self, urls: List[str]) -> List[dict]:
        """并发获取多个数据源"""
        async with asyncio.TaskGroup() as tg:
            tasks = [
                self._fetch_url(url) for url in urls
            ]
            return await asyncio.gather(*tasks)
    
    # 3. 批量处理:减少数据库操作次数
    async def batch_insert(self, records: List[dict], batch_size: int = 100):
        """批量插入数据"""
        results = []
        
        for i in range(0, len(records), batch_size):
            batch = records[i:i + batch_size]
            result = await self._insert_batch(batch)
            results.extend(result)
        
        return results

性能指标

场景 单线程 并发优化 提升
API 调用 (100次) 100秒 5秒 20x
数据抓取 30分钟 5分钟 6x
批量写入 (10000条) 500秒 50秒 10x

1.2 资源管理

原则:合理控制资源使用,避免资源泄漏

python
import resource
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class ResourceManager:
    """资源管理最佳实践"""
    
    # 1. 连接池管理
    async def get_browser(self) -> Browser:
        """从连接池获取浏览器实例"""
        return await self.browser_pool.acquire()
    
    async def release_browser(self, browser: Browser):
        """归还浏览器实例到连接池"""
        await self.browser_pool.release(browser)
    
    # 2. 内存管理
    async def process_large_dataset(self, data: list):
        """分批处理大数据集,避免内存溢出"""
        batch_size = 1000
        results = []
        
        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            processed = self._process_batch(batch)
            results.extend(processed)
            
            # 及时释放内存
            del batch
            del processed
            
            # 手动触发垃圾回收(必要时)
            if i % 10000 == 0:
                import gc
                gc.collect()
        
        return results
    
    # 3. 文件句柄管理
    @asynccontextmanager
    async def file_handler(self, path: str, mode: str = 'r') -> AsyncGenerator:
        """使用上下文管理器确保文件关闭"""
        file = await self._async_open(path, mode)
        try:
            yield file
        finally:
            await file.close()

1.3 缓存策略

原则:合理使用缓存减少重复计算

python
from cachetools import TTLCache, LRUCache
from typing import TypeVar, Callable, Optional
import hashlib
import json

T = TypeVar('T')

class CacheManager:
    """缓存管理最佳实践"""
    
    def __init__(self):
        # TTL 缓存:自动过期
        self.ttl_cache = TTLCache(maxsize=1000, ttl=300)  # 5分钟过期
        
        # LRU 缓存:最近最少使用
        self.lru_cache = LRUCache(maxsize=500)
    
    def cached(self, ttl: int = 300, maxsize: int = 1000):
        """缓存装饰器"""
        cache = TTLCache(maxsize=maxsize, ttl=ttl)
        
        def decorator(func: Callable) -> Callable:
            def wrapper(*args, **kwargs):
                # 生成缓存 key
                key = self._generate_cache_key(args, kwargs)
                
                # 检查缓存
                if key in cache:
                    return cache[key]
                
                # 执行函数
                result = func(*args, **kwargs)
                
                # 存入缓存
                cache[key] = result
                return result
            
            return wrapper
        return decorator
    
    async def cached_async(
        self,
        func: Callable,
        cache: dict,
        key_func: Optional[Callable] = None
    ) -> T:
        """异步方法缓存"""
        key = key_func() if key_func else self._generate_cache_key()
        
        if key in cache:
            return cache[key]
        
        result = await func()
        cache[key] = result
        return result
    
    def _generate_cache_key(self, args: tuple = (), kwargs: dict = None) -> str:
        """生成缓存 key"""
        key_data = {
            "args": str(args),
            "kwargs": str(kwargs) if kwargs else ""
        }
        return hashlib.md5(
            json.dumps(key_data).encode()
        ).hexdigest()

2. 安全性最佳实践

2.1 认证与授权

python
from typing import Optional
from dataclasses import dataclass
from enum import Enum

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    EXECUTE = "execute"
    ADMIN = "admin"

@dataclass
class User:
    """用户认证信息"""
    user_id: str
    roles: List[str]
    permissions: List[Permission]

class SecurityManager:
    """安全管理最佳实践"""
    
    # 1. 最小权限原则
    def check_permission(
        self,
        user: User,
        required: Permission,
        resource: str
    ) -> bool:
        """检查用户权限"""
        if required in user.permissions:
            return True
        
        # 角色权限检查
        role_permissions = self._get_role_permissions(user.roles)
        return required in role_permissions
    
    # 2. 敏感数据保护
    def mask_sensitive_data(self, data: dict) -> dict:
        """脱敏处理"""
        sensitive_fields = [
            "password", "credit_card", "ssn", "api_key"
        ]
        
        masked = data.copy()
        for field in sensitive_fields:
            if field in masked:
                masked[field] = "****"
        
        return masked
    
    # 3. 审计日志
    def audit_log(
        self,
        user_id: str,
        action: str,
        resource: str,
        result: str
    ):
        """记录审计日志"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "action": action,
            "resource": resource,
            "result": result,
            "ip_address": self._get_client_ip()
        }
        
        # 异步写入审计日志
        asyncio.create_task(self._write_audit_log(log_entry))

2.2 数据加密

python
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class EncryptionManager:
    """加密管理最佳实践"""
    
    def __init__(self):
        self.encryption_key = self._load_encryption_key()
        self.fernet = Fernet(self.encryption_key)
    
    def encrypt_data(self, data: str) -> str:
        """加密数据"""
        return self.fernet.encrypt(data.encode()).decode()
    
    def decrypt_data(self, encrypted_data: str) -> str:
        """解密数据"""
        return self.fernet.decrypt(encrypted_data.encode()).decode()
    
    def hash_password(self, password: str, salt: bytes = None) -> tuple:
        """安全存储密码"""
        if not salt:
            salt = os.urandom(16)
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=480000,
        )
        
        key = kdf.derive(password.encode())
        
        return (
            base64.b64encode(key).decode(),
            base64.b64encode(salt).decode()
        )
    
    def verify_password(
        self,
        password: str,
        stored_key: str,
        salt: str
    ) -> bool:
        """验证密码"""
        salt_bytes = base64.b64decode(salt.encode())
        hashed_key, _ = self.hash_password(password, salt_bytes)
        
        return hashed_key == stored_key

2.3 输入验证

python
import re
from pydantic import BaseModel, validator
from typing import Optional

class InputValidator:
    """输入验证最佳实践"""
    
    # 1. 模式验证
    EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
    PHONE_PATTERN = re.compile(r'^\+?1?\d{9,15}$')
    
    def validate_email(self, email: str) -> bool:
        """验证邮箱格式"""
        return bool(self.EMAIL_PATTERN.match(email))
    
    def validate_phone(self, phone: str) -> bool:
        """验证手机号格式"""
        return bool(self.PHONE_PATTERN.match(phone))
    
    # 2. SQL 注入防护
    def sanitize_input(self, user_input: str) -> str:
        """清理用户输入"""
        # 移除危险字符
        dangerous_chars = ["'", '"', ";", "--", "/*", "*/", "xp_"]
        
        sanitized = user_input
        for char in dangerous_chars:
            sanitized = sanitized.replace(char, "")
        
        return sanitized
    
    # 3. 使用参数化查询
    async def safe_query(self, query: str, params: tuple):
        """安全的数据库查询"""
        # 永远使用参数化查询
        return await self.db.execute(query, params)

2.4 访问控制

python
class AccessControlManager:
    """访问控制最佳实践"""
    
    def __init__(self):
        self.rate_limiter = RateLimiter(max_requests=100, time_window=60)
    
    async def check_rate_limit(self, user_id: str) -> bool:
        """请求频率限制"""
        return self.rate_limiter.allow_request(user_id)
    
    async def enforce_access_control(
        self,
        user: User,
        resource: str,
        action: str
    ) -> bool:
        """强制访问控制"""
        # 1. 检查速率限制
        if not await self.check_rate_limit(user.user_id):
            raise RateLimitExceeded()
        
        # 2. 检查权限
        if not self.has_permission(user, resource, action):
            raise AccessDenied()
        
        # 3. 记录访问
        await self.log_access(user, resource, action)
        
        return True

3. 可靠性最佳实践

3.1 错误处理

python
from typing import Optional, Type
import traceback
import logging

logger = logging.getLogger(__name__)

class ErrorHandler:
    """错误处理最佳实践"""
    
    # 1. 定义明确的错误类型
    class DigitalEmployeeError(Exception):
        """数字员工基础异常"""
        def __init__(self, message: str, code: str):
            self.message = message
            self.code = code
            super().__init__(message)
    
    class ConfigurationError(DigitalEmployeeError):
        """配置错误"""
    
    class ExecutionError(DigitalEmployeeError):
        """执行错误"""
    
    class TimeoutError(DigitalEmployeeError):
        """超时错误"""
    
    # 2. 统一错误处理
    async def safe_execute(
        self,
        operation: Callable,
        error_handlers: dict = None
    ) -> dict:
        """安全执行操作"""
        try:
            result = await operation()
            return {"status": "success", "result": result}
        
        except Exception as e:
            # 记录错误
            logger.error(f"操作失败: {e}\n{traceback.format_exc()}")
            
            # 查找对应的错误处理器
            error_type = type(e).__name__
            if error_handlers and error_type in error_handlers:
                return await error_handlers[error_type](e)
            
            # 默认错误处理
            return {
                "status": "error",
                "error_type": error_type,
                "message": str(e)
            }

3.2 重试机制

python
import asyncio
from functools import wraps
from typing import Callable, List

class RetryPolicy:
    """重试策略"""
    
    def __init__(
        self,
        max_retries: int = 3,
        delay: float = 1.0,
        backoff: float = 2.0,
        retryable_exceptions: List[Type] = None
    ):
        self.max_retries = max_retries
        self.delay = delay
        self.backoff = backoff
        self.retryable_exceptions = retryable_exceptions or [
            ConnectionError,
            TimeoutError,
            TransientError
        ]
    
    def with_retry(self, func: Callable) -> Callable:
        """重试装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                
                except self.retryable_exceptions as e:
                    last_exception = e
                    
                    if attempt < self.max_retries:
                        delay = self.delay * (self.backoff ** attempt)
                        logger.warning(
                            f"第 {attempt + 1} 次尝试失败,"
                            f"{delay} 秒后重试: {e}"
                        )
                        await asyncio.sleep(delay)
                    else:
                        logger.error(f"重试次数耗尽: {e}")
                        raise
            
            raise last_exception
        
        return wrapper

3.3 熔断器模式

python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 熔断
    HALF_OPEN = "half_open"  # 半开

@dataclass
class CircuitBreaker:
    """熔断器实现"""
    
    failure_threshold: int = 5      # 失败阈值
    success_threshold: int = 2      # 恢复阈值
    timeout: int = 60               # 超时时间(秒)
    
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def record_success(self):
        """记录成功"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        
        elif self.state == CircuitState.CLOSED:
            self.failure_count = max(0, self.failure_count - 1)
    
    def record_failure(self):
        """记录失败"""
        self.last_failure_time = datetime.now()
        self.failure_count += 1
        
        if self.state == CircuitState.CLOSED:
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
        
        elif self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.success_count = 0
    
    def allow_request(self) -> bool:
        """检查是否允许请求"""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            if self._timeout_elapsed():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                return True
            return False
        
        return True  # HALF_OPEN 状态允许请求

3.4 幂等性设计

python
from uuid import uuid4
from typing import Optional
from dataclasses import dataclass

@dataclass
class IdempotencyKey:
    """幂等性 key"""
    key: str
    created_at: datetime
    expires_at: datetime
    result: dict

class IdempotencyManager:
    """幂等性管理"""
    
    def __init__(self, ttl: int = 3600):  # 1小时过期
        self.cache = {}
        self.ttl = ttl
    
    def get_idempotency_key(self, request_id: str) -> Optional[IdempotencyKey]:
        """获取幂等性 key"""
        return self.cache.get(request_id)
    
    def create_idempotency_record(
        self,
        request_id: str,
        operation: str
    ) -> IdempotencyKey:
        """创建幂等性记录"""
        record = IdempotencyKey(
            key=request_id,
            created_at=datetime.now(),
            expires_at=datetime.now() + timedelta(seconds=self.ttl),
            result={"status": "processing"}
        )
        
        self.cache[request_id] = record
        return record
    
    def complete_request(
        self,
        request_id: str,
        result: dict
    ):
        """完成请求记录"""
        if request_id in self.cache:
            self.cache[request_id].result = result
    
    async def execute_idempotent(
        self,
        request_id: str,
        operation: Callable
    ) -> dict:
        """执行幂等操作"""
        # 检查是否已有结果
        existing = self.get_idempotency_key(request_id)
        if existing:
            return existing.result
        
        # 创建新记录
        self.create_idempotency_record(request_id, operation.__name__)
        
        try:
            # 执行操作
            result = await operation()
            
            # 记录结果
            self.complete_request(request_id, {
                "status": "success",
                "data": result
            })
            
            return result
        
        except Exception as e:
            self.complete_request(request_id, {
                "status": "error",
                "error": str(e)
            })
            raise

4. 可观测性最佳实践

4.1 日志记录

python
import logging
import json
from datetime import datetime
from contextvars import ContextVar

logger = logging.getLogger(__name__)

# 上下文变量
request_id_var = ContextVar('request_id', default=None)
user_id_var = ContextVar('user_id', default=None)

class StructuredLogger:
    """结构化日志"""
    
    @staticmethod
    def log(
        level: str,
        message: str,
        extra: dict = None
    ):
        """结构化日志记录"""
        log_data = {
            "timestamp": datetime.now().isoformat(),
            "level": level,
            "message": message,
            "request_id": request_id_var.get(),
            "user_id": user_id_var.get()
        }
        
        if extra:
            log_data["extra"] = extra
        
        getattr(logger, level.lower())(json.dumps(log_data))
    
    @staticmethod
    def info(message: str, extra: dict = None):
        StructuredLogger.log("INFO", message, extra)
    
    @staticmethod
    def error(message: str, extra: dict = None):
        StructuredLogger.log("ERROR", message, extra)
    
    @staticmethod
    def warning(message: str, extra: dict = None):
        StructuredLogger.log("WARNING", message, extra)

4.2 指标监控

python
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps

# 定义指标
REQUEST_COUNT = Counter(
    'digital_employee_requests_total',
    '总请求数',
    ['agent', 'operation', 'status']
)

REQUEST_LATENCY = Histogram(
    'digital_employee_request_duration_seconds',
    '请求延迟',
    ['agent', 'operation'],
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)

ACTIVE_AGENTS = Gauge(
    'digital_employee_active_agents',
    '活跃的 Agent 数量',
    ['agent_type']
)

class MetricsCollector:
    """指标收集"""
    
    @staticmethod
    def track_request(agent: str, operation: str, status: str, duration: float):
        """记录请求指标"""
        REQUEST_COUNT.labels(
            agent=agent,
            operation=operation,
            status=status
        ).inc()
        
        REQUEST_LATENCY.labels(
            agent=agent,
            operation=operation
        ).observe(duration)
    
    @staticmethod
    def track_agent_activity(agent_type: str, active: bool):
        """跟踪 Agent 活动状态"""
        ACTIVE_AGENTS.labels(agent_type=agent_type).set(1 if active else 0)

4.3 分布式追踪

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource

class DistributedTracer:
    """分布式追踪"""
    
    def __init__(self, service_name: str):
        # 配置追踪
        resource = Resource.create({"service.name": service_name})
        provider = TracerProvider(resource=resource)
        
        # 添加导出器
        provider.add_span_processor(
            BatchSpanProcessor(self._create_exporter())
        )
        
        trace.set_tracer_provider(provider)
        self.tracer = trace.get_tracer(service_name)
    
    def _create_exporter(self):
        """创建导出器(根据后端选择)"""
        # Jaeger / Zipkin / OTLP 等
        pass
    
    @contextmanager
    def create_span(self, name: str, attributes: dict = None):
        """创建追踪 Span"""
        with self.tracer.start_as_current_span(name) as span:
            if attributes:
                for key, value in attributes.items():
                    span.set_attribute(key, value)
            
            yield span
    
    def add_event(self, name: str, attributes: dict = None):
        """添加事件"""
        span = trace.get_current_span()
        span.add_event(name, attributes=attributes or {})

5. 可维护性最佳实践

5.1 代码组织

digital-employee/
├── src/
│   ├── agents/          # Agent 定义
│   │   ├── base_agent.py
│   │   ├── browser_agent.py
│   │   └── data_agent.py
│   ├── tools/           # 工具实现
│   │   ├── browser_tools.py
│   │   ├── database_tools.py
│   │   └── api_tools.py
│   ├── workflows/       # 工作流模板
│   │   ├── base_workflow.py
│   │   └── templates/
│   ├── core/           # 核心功能
│   │   ├── memory.py
│   │   ├── planner.py
│   │   └── executor.py
│   └── utils/          # 工具函数
│       ├── logger.py
│       ├── metrics.py
│       └── config.py
├── tests/              # 测试
├── config/             # 配置文件
└── docs/               # 文档

5.2 配置管理

python
from dataclasses import dataclass, field
from typing import Dict, Optional
import yaml

@dataclass
class AgentConfig:
    """Agent 配置"""
    name: str
    model: str
    max_retries: int = 3
    timeout: int = 300
    tools: list = field(default_factory=list)
    memory_config: Dict = field(default_factory=dict)

class ConfigManager:
    """配置管理"""
    
    def __init__(self, config_path: str):
        self.config_path = config_path
        self._config_cache = {}
    
    def load_config(self, env: str = "development") -> AgentConfig:
        """加载配置"""
        if env in self._config_cache:
            return self._config_cache[env]
        
        with open(self.config_path) as f:
            raw_config = yaml.safe_load(f)
        
        config = self._parse_config(raw_config, env)
        self._config_cache[env] = config
        
        return config
    
    def _parse_config(self, raw: dict, env: str) -> AgentConfig:
        """解析配置"""
        env_config = raw.get(env, {})
        base_config = raw.get("default", {})
        
        return AgentConfig(
            name=env_config.get("name", base_config.get("name")),
            model=self._resolve_model(env_config, base_config),
            max_retries=env_config.get("max_retries", base_config.get("max_retries", 3)),
            timeout=env_config.get("timeout", base_config.get("timeout", 300)),
            tools=env_config.get("tools", base_config.get("tools", [])),
            memory_config=env_config.get("memory", base_config.get("memory", {}))
        )

5.3 测试策略

python
import pytest
from unittest.mock import AsyncMock, MagicMock

class TestDigitalEmployee:
    """数字员工测试"""
    
    @pytest.fixture
    def agent(self):
        """创建测试 Agent"""
        return DigitalEmployeeAgent(config=test_config)
    
    @pytest.mark.asyncio
    async def test_execute_task_success(self, agent):
        """测试任务成功执行"""
        # Mock 依赖
        agent.browser = AsyncMock()
        agent.browser.goto = AsyncMock()
        agent.browser.evaluate = AsyncMock(return_value=[{"name": "test"}])
        
        # 执行
        result = await agent.execute("采集产品数据")
        
        # 验证
        assert result["status"] == "success"
        assert len(result["data"]) == 1
    
    @pytest.mark.asyncio
    async def test_handle_error_gracefully(self, agent):
        """测试错误处理"""
        agent.browser = AsyncMock()
        agent.browser.goto = AsyncMock(side_effect=ConnectionError("Network error"))
        
        result = await agent.execute("访问网页")
        
        assert result["status"] == "error"
        assert "error" in result
    
    @pytest.mark.asyncio
    async def test_retry_on_failure(self, agent):
        """测试失败重试"""
        call_count = 0
        
        async def failing_operation():
            nonlocal call_count
            call_count += 1
            if call_count < 3:
                raise ConnectionError("Temporary failure")
            return "success"
        
        result = await agent.execute_with_retry(
            failing_operation,
            max_retries=3,
            delay=0.01
        )
        
        assert result == "success"
        assert call_count == 3

6. 运营最佳实践

6.1 部署策略

python
class DeploymentManager:
    """部署管理"""
    
    async def blue_green_deploy(self, new_version: str):
        """蓝绿部署"""
        # 1. 在新环境部署
        new_env = await self.create_environment(version=new_version)
        
        # 2. 健康检查
        healthy = await self.health_check(new_env)
        
        if healthy:
            # 3. 切换流量
            await self.switch_traffic(new_env)
            
            # 4. 保留旧环境一段时间
            await self.schedule_cleanup(old_env, delay_hours=24)
        else:
            # 回滚
            await self.rollback(new_env)
    
    async def canary_deploy(self, new_version: str, traffic_percent: int = 10):
        """金丝雀发布"""
        # 1. 部署新版本
        new_env = await self.create_environment(version=new_version)
        
        # 2. 分配少量流量
        await self.configure_traffic_split(new_env, traffic_percent)
        
        # 3. 监控指标
        metrics = await self.monitor_metrics(new_env, duration_minutes=30)
        
        if self.is_stable(metrics):
            # 4. 逐步增加流量
            for percent in [25, 50, 100]:
                await asyncio.sleep(600)  # 等待10分钟
                await self.configure_traffic_split(new_env, percent)
                metrics = await self.monitor_metrics(new_env, duration_minutes=5)
                
                if not self.is_stable(metrics):
                    await self.rollback(new_env)
                    return

6.2 监控告警

python
class MonitoringManager:
    """监控告警管理"""
    
    # 关键指标
    CRITICAL_METRICS = {
        "error_rate": {"threshold": 0.05, "duration": 300},
        "response_time_p99": {"threshold": 5.0, "duration": 300},
        "cpu_usage": {"threshold": 0.9, "duration": 600},
        "memory_usage": {"threshold": 0.9, "duration": 600}
    }
    
    async def monitor_system(self):
        """系统监控"""
        while True:
            metrics = await self.collect_metrics()
            
            for metric_name, metric_value in metrics.items():
                if metric_name in self.CRITICAL_METRICS:
                    config = self.CRITICAL_METRICS[metric_name]
                    
                    if metric_value > config["threshold"]:
                        await self.trigger_alert(
                            metric=metric_name,
                            value=metric_value,
                            threshold=config["threshold"],
                            severity="critical"
                        )
            
            await asyncio.sleep(60)  # 每分钟检查
    
    async def trigger_alert(
        self,
        metric: str,
        value: float,
        threshold: float,
        severity: str
    ):
        """触发告警"""
        alert = {
            "metric": metric,
            "value": value,
            "threshold": threshold,
            "severity": severity,
            "timestamp": datetime.now().isoformat()
        }
        
        # 发送告警通知
        await self._send_pagerduty(alert)
        await self._send_slack(alert)
        await self._send_email(alert)

6.3 故障恢复

python
class RecoveryManager:
    """故障恢复"""
    
    async def auto_recovery(self, failure: dict):
        """自动恢复"""
        recovery_strategies = {
            "service_crash": self._restart_service,
            "memory_leak": self._restart_and_clear_cache,
            "database_disconnect": self._reconnect_database,
            "network_issue": self._reset_network
        }
        
        failure_type = failure["type"]
        
        if failure_type in recovery_strategies:
            strategy = recovery_strategies[failure_type]
            result = await strategy(failure)
            
            # 记录恢复尝试
            await self.log_recovery_attempt(failure, result)
            
            return result
        
        return {"action": "manual_intervention_required"}
    
    async def _restart_service(self, failure: dict) -> dict:
        """重启服务"""
        service_name = failure["service"]
        
        # 停止服务
        await self.service_manager.stop(service_name)
        
        # 等待
        await asyncio.sleep(10)
        
        # 重启
        await self.service_manager.start(service_name)
        
        # 健康检查
        healthy = await self.service_manager.health_check(service_name)
        
        return {
            "action": "restarted",
            "service": service_name,
            "healthy": healthy
        }

总结

最佳实践清单

领域 关键实践 优先级
性能 并发处理、资源管理、缓存
安全 认证授权、数据加密、输入验证 极高
可靠 错误处理、重试机制、熔断器
可观测 结构化日志、指标监控、分布式追踪
可维护 代码组织、配置管理、测试覆盖
运营 渐进部署、智能监控、自动恢复

持续改进

  1. 定期审查: 每月审查性能和可靠性指标
  2. 演练: 每季度进行故障演练
  3. 更新: 跟随框架和库的最新最佳实践
  4. 反馈: 建立用户反馈机制