网站未备案什么意思,姑苏区最新通告,优秀营销软文100篇,事业网站网站建设方案华为云FlexusDeepSeek征文 | 当大模型遇见边缘计算#xff1a;Flexus赋能低延迟AI Agent #x1f31f; 嗨#xff0c;我是IRpickstars#xff01;
#x1f30c; 总有一行代码#xff0c;能点亮万千星辰。
#x1f50d; 在技术的宇宙中#xff0c;我愿做永不停歇的探…
华为云FlexusDeepSeek征文 | 当大模型遇见边缘计算Flexus赋能低延迟AI Agent 嗨我是IRpickstars 总有一行代码能点亮万千星辰。 在技术的宇宙中我愿做永不停歇的探索者。
✨ 用代码丈量世界用算法解码未来。我是摘星人也是造梦者。 每一次编译都是新的征程每一个bug都是未解的谜题。让我们携手在0和1的星河中书写属于开发者的浪漫诗篇。 目录
一、引言
二、技术背景与架构设计
2.1 技术栈选择
2.2 整体架构设计
2.3 核心优势分析
三、环境搭建与配置
3.1 华为云Flexus实例创建
步骤1登录华为云控制台
步骤2配置实例规格
步骤3安全组配置
3.2 基础环境安装
3.3 DeepSeek模型部署
步骤1创建模型服务目录结构
步骤2编写模型服务代码
四、Docker容器化部署
4.1 创建Dockerfile
4.2 创建requirements.txt
4.3 Docker Compose配置
五、负载均衡与高可用配置
5.1 Nginx配置
5.2 监控与日志配置
六、性能优化策略
6.1 模型量化优化
6.2 缓存策略优化
七、部署与测试验证
7.1 一键部署脚本
7.2 性能测试脚本
八. 结论
九. 参考链接 一、引言
随着人工智能技术的飞速发展大语言模型LLM已经成为AI应用的核心驱动力。然而传统的云端部署模式在面对实时性要求较高的场景时往往存在网络延迟、带宽限制等问题。边缘计算的兴起为解决这一困境提供了新的思路。本文将深入探讨如何利用华为云Flexus弹性云服务器结合DeepSeek大模型构建高性能、低延迟的AI Agent系统。
二、技术背景与架构设计
2.1 技术栈选择
在构建边缘AI Agent系统时我们选择了以下技术栈
华为云Flexus提供弹性、高性能的计算资源DeepSeek模型国产化大语言模型具备优秀的推理能力Docker容器化确保环境一致性和快速部署Redis缓存提升响应速度Nginx负载均衡保证系统稳定性
2.2 整体架构设计 图1基于华为云Flexus的边缘AI Agent架构图
2.3 核心优势分析
低延迟响应边缘部署减少数据传输距离高可用性多节点部署自动故障转移弹性扩缩容根据负载动态调整资源成本优化按需付费资源利用率最大化
三、环境搭建与配置
3.1 华为云Flexus实例创建
步骤1登录华为云控制台
访问华为云官网登录控制台进入弹性云服务器ECS服务选择Flexus云服务器
步骤2配置实例规格
# 推荐配置
CPU: 8核
内存: 16GB
存储: 100GB SSD
网络: 5Mbps带宽
操作系统: Ubuntu 20.04 LTS
步骤3安全组配置
# 开放必要端口 HTTP: 80 HTTPS: 443 SSH: 22 自定义: 8080 (AI Agent服务端口) 自定义: 6379 (Redis端口)
3.2 基础环境安装
#!/bin/bash
# 系统更新和基础软件安装脚本# 更新系统包
sudo apt update sudo apt upgrade -y# 安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh# 启动Docker服务
sudo systemctl start docker
sudo systemctl enable docker# 安装Docker Compose
sudo curl -L https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
sudo chmod x /usr/local/bin/docker-compose# 安装Python环境
sudo apt install python3 python3-pip python3-venv -y# 创建虚拟环境
python3 -m venv ai_agent_env
source ai_agent_env/bin/activateecho 基础环境安装完成
3.3 DeepSeek模型部署
步骤1创建模型服务目录结构
mkdir -p ai_agent_project/{models,src,config,data,logs}
cd ai_agent_project
步骤2编写模型服务代码
# src/deepseek_service.py
import torch
import asyncio
import logging
from typing import Dict, List, Optional
from transformers import AutoTokenizer, AutoModelForCausalLM
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis
import json
import hashlib# 配置日志
logging.basicConfig(levellogging.INFO)
logger logging.getLogger(__name__)class ChatRequest(BaseModel):聊天请求模型message: strsession_id: Optional[str] Nonemax_tokens: int 512temperature: float 0.7class ChatResponse(BaseModel):聊天响应模型response: strsession_id: strtokens_used: intresponse_time: floatclass DeepSeekAgent:DeepSeek AI Agent核心类def __init__(self, model_name: str deepseek-ai/deepseek-coder-6.7b-instruct):初始化DeepSeek AgentArgs:model_name: 模型名称self.model_name model_nameself.device torch.device(cuda if torch.cuda.is_available() else cpu)self.tokenizer Noneself.model Noneself.redis_client None# 性能统计self.request_count 0self.total_response_time 0.0logger.info(f初始化DeepSeek Agent使用设备: {self.device})async def initialize(self):异步初始化模型和Redis连接try:# 初始化Redis客户端self.redis_client redis.Redis(hostlocalhost, port6379, decode_responsesTrue,socket_connect_timeout5)# 加载分词器logger.info(加载分词器...)self.tokenizer AutoTokenizer.from_pretrained(self.model_name,trust_remote_codeTrue)# 加载模型logger.info(加载模型...)self.model AutoModelForCausalLM.from_pretrained(self.model_name,torch_dtypetorch.float16 if self.device.type cuda else torch.float32,device_mapauto,trust_remote_codeTrue)logger.info(模型初始化完成)except Exception as e:logger.error(f模型初始化失败: {e})raisedef _generate_cache_key(self, message: str, temperature: float, max_tokens: int) - str:生成缓存键content f{message}_{temperature}_{max_tokens}return hashlib.md5(content.encode()).hexdigest()async def _get_cached_response(self, cache_key: str) - Optional[str]:获取缓存响应try:cached self.redis_client.get(cache_key)if cached:logger.info(命中缓存)return json.loads(cached)return Noneexcept Exception as e:logger.warning(f缓存读取失败: {e})return Noneasync def _set_cached_response(self, cache_key: str, response: str, ttl: int 3600):设置缓存响应try:self.redis_client.setex(cache_key, ttl, json.dumps(response))except Exception as e:logger.warning(f缓存写入失败: {e})async def generate_response(self, request: ChatRequest) - ChatResponse:生成响应Args:request: 聊天请求Returns:ChatResponse: 聊天响应import timestart_time time.time()try:# 生成缓存键cache_key self._generate_cache_key(request.message, request.temperature, request.max_tokens)# 尝试获取缓存cached_response await self._get_cached_response(cache_key)if cached_response:response_time time.time() - start_timereturn ChatResponse(responsecached_response,session_idrequest.session_id or default,tokens_usedlen(self.tokenizer.encode(cached_response)),response_timeresponse_time)# 构建输入messages [{role: user, content: request.message}]# 编码输入inputs self.tokenizer.apply_chat_template(messages,add_generation_promptTrue,return_tensorspt).to(self.device)# 生成响应with torch.no_grad():outputs self.model.generate(inputs,max_new_tokensrequest.max_tokens,temperaturerequest.temperature,do_sampleTrue,pad_token_idself.tokenizer.eos_token_id,eos_token_idself.tokenizer.eos_token_id)# 解码响应response_tokens outputs[0][inputs.shape[-1]:]response_text self.tokenizer.decode(response_tokens, skip_special_tokensTrue)# 缓存响应await self._set_cached_response(cache_key, response_text)# 计算响应时间response_time time.time() - start_time# 更新统计信息self.request_count 1self.total_response_time response_timereturn ChatResponse(responseresponse_text,session_idrequest.session_id or default,tokens_usedlen(response_tokens),response_timeresponse_time)except Exception as e:logger.error(f生成响应失败: {e})raise HTTPException(status_code500, detailstr(e))def get_performance_stats(self) - Dict:获取性能统计avg_response_time (self.total_response_time / self.request_count if self.request_count 0 else 0)return {total_requests: self.request_count,average_response_time: avg_response_time,model_device: str(self.device),model_name: self.model_name}# 初始化FastAPI应用
app FastAPI(titleDeepSeek AI Agent, version1.0.0)# 全局Agent实例
agent DeepSeekAgent()app.on_event(startup)
async def startup_event():应用启动时初始化await agent.initialize()app.post(/chat, response_modelChatResponse)
async def chat_endpoint(request: ChatRequest):聊天接口return await agent.generate_response(request)app.get(/health)
async def health_check():健康检查接口return {status: healthy, stats: agent.get_performance_stats()}app.get(/stats)
async def get_stats():获取性能统计return agent.get_performance_stats()if __name__ __main__:import uvicornuvicorn.run(app, host0.0.0.0, port8080, workers1) 四、Docker容器化部署
4.1 创建Dockerfile
# Dockerfile
FROM nvidia/cuda:11.8-devel-ubuntu20.04# 设置环境变量
ENV DEBIAN_FRONTENDnoninteractive
ENV PYTHONUNBUFFERED1# 安装系统依赖
RUN apt-get update apt-get install -y \python3 \python3-pip \python3-dev \git \curl \ rm -rf /var/lib/apt/lists/*# 设置工作目录
WORKDIR /app# 复制requirements文件
COPY requirements.txt .# 安装Python依赖
RUN pip3 install --no-cache-dir -r requirements.txt# 复制应用代码
COPY src/ ./src/
COPY config/ ./config/# 暴露端口
EXPOSE 8080# 设置启动命令
CMD [python3, src/deepseek_service.py]
4.2 创建requirements.txt # requirements.txt torch2.0.0 transformers4.35.0 fastapi0.104.0 uvicorn0.24.0 redis5.0.0 pydantic2.0.0 numpy1.24.0 accelerate0.24.0 bitsandbytes0.41.0 4.3 Docker Compose配置
# docker-compose.yml
version: 3.8services:redis:image: redis:7-alpinecontainer_name: ai_agent_redisports:- 6379:6379volumes:- redis_data:/datacommand: redis-server --appendonly yeshealthcheck:test: [CMD, redis-cli, ping]interval: 10stimeout: 5sretries: 5networks:- ai_agent_networkai_agent:build: .container_name: ai_agent_serviceports:- 8080:8080depends_on:redis:condition: service_healthyvolumes:- ./models:/app/models- ./logs:/app/logsenvironment:- CUDA_VISIBLE_DEVICES0- REDIS_HOSTredis- REDIS_PORT6379deploy:resources:reservations:devices:- driver: nvidiacount: 1capabilities: [gpu]networks:- ai_agent_networkhealthcheck:test: [CMD, curl, -f, http://localhost:8080/health]interval: 30stimeout: 10sretries: 3start_period: 120snginx:image: nginx:alpinecontainer_name: ai_agent_nginxports:- 80:80- 443:443volumes:- ./nginx.conf:/etc/nginx/nginx.conf- ./ssl:/etc/nginx/ssldepends_on:ai_agent:condition: service_healthynetworks:- ai_agent_networkvolumes:redis_data:networks:ai_agent_network:driver: bridge
五、负载均衡与高可用配置
5.1 Nginx配置
# nginx.conf
events {worker_connections 1024;
}http {upstream ai_agent_backend {# 轮询负载均衡server ai_agent:8080 weight1 max_fails3 fail_timeout30s;# 如果有多个实例可以添加更多server# server ai_agent_2:8080 weight1 max_fails3 fail_timeout30s;}# 限流配置limit_req_zone $binary_remote_addr zoneapi_limit:10m rate10r/s;server {listen 80;server_name localhost;# API接口代理location /api/ {limit_req zoneapi_limit burst20 nodelay;proxy_pass http://ai_agent_backend/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 超时设置proxy_connect_timeout 30s;proxy_send_timeout 60s;proxy_read_timeout 60s;# 缓存设置proxy_cache_bypass $http_upgrade;}# 健康检查location /health {proxy_pass http://ai_agent_backend/health;access_log off;}# 静态文件服务location /static/ {alias /var/www/static/;expires 1y;add_header Cache-Control public, immutable;}# 访问日志access_log /var/log/nginx/ai_agent_access.log;error_log /var/log/nginx/ai_agent_error.log;}
}
5.2 监控与日志配置
# src/monitoring.py
import psutil
import logging
import time
import json
from datetime import datetime
from typing import Dict, Anyclass SystemMonitor:系统监控类def __init__(self, log_file: str /app/logs/monitor.log):初始化监控器Args:log_file: 日志文件路径self.log_file log_file# 配置日志logging.basicConfig(filenamelog_file,levellogging.INFO,format%(asctime)s - %(levelname)s - %(message)s)self.logger logging.getLogger(__name__)def get_system_metrics(self) - Dict[str, Any]:获取系统指标try:# CPU使用率cpu_percent psutil.cpu_percent(interval1)# 内存使用情况memory psutil.virtual_memory()memory_percent memory.percentmemory_available memory.available / (1024**3) # GB# 磁盘使用情况disk psutil.disk_usage(/)disk_percent disk.percentdisk_free disk.free / (1024**3) # GB# 网络统计network psutil.net_io_counters()# GPU使用情况如果可用gpu_info self._get_gpu_info()metrics {timestamp: datetime.now().isoformat(),cpu_percent: cpu_percent,memory_percent: memory_percent,memory_available_gb: round(memory_available, 2),disk_percent: disk_percent,disk_free_gb: round(disk_free, 2),network_bytes_sent: network.bytes_sent,network_bytes_recv: network.bytes_recv,gpu_info: gpu_info}return metricsexcept Exception as e:self.logger.error(f获取系统指标失败: {e})return {}def _get_gpu_info(self) - Dict[str, Any]:获取GPU信息try:import torchif torch.cuda.is_available():gpu_count torch.cuda.device_count()gpu_info []for i in range(gpu_count):memory_allocated torch.cuda.memory_allocated(i) / (1024**3) # GBmemory_reserved torch.cuda.memory_reserved(i) / (1024**3) # GBgpu_info.append({device_id: i,name: torch.cuda.get_device_name(i),memory_allocated_gb: round(memory_allocated, 2),memory_reserved_gb: round(memory_reserved, 2)})return {available: True, devices: gpu_info}else:return {available: False, devices: []}except Exception as e:self.logger.warning(f获取GPU信息失败: {e})return {available: False, devices: []}def log_metrics(self):记录系统指标metrics self.get_system_metrics()if metrics:self.logger.info(f系统指标: {json.dumps(metrics)})def start_monitoring(self, interval: int 60):开始监控self.logger.info(开始系统监控)while True:try:self.log_metrics()time.sleep(interval)except KeyboardInterrupt:self.logger.info(监控已停止)breakexcept Exception as e:self.logger.error(f监控异常: {e})time.sleep(interval)# 集成到主服务中
def start_background_monitoring():启动后台监控import threadingmonitor SystemMonitor()monitor_thread threading.Thread(targetmonitor.start_monitoring,args(30,), # 每30秒记录一次daemonTrue)monitor_thread.start()return monitor 六、性能优化策略
6.1 模型量化优化
# src/model_optimization.py
import torch
from transformers import BitsAndBytesConfig
from typing import Optionalclass ModelOptimizer:模型优化器staticmethoddef create_quantization_config(load_in_4bit: bool True,bnb_4bit_quant_type: str nf4,bnb_4bit_use_double_quant: bool True,bnb_4bit_compute_dtype: torch.dtype torch.float16) - BitsAndBytesConfig:创建量化配置Args:load_in_4bit: 是否使用4位量化bnb_4bit_quant_type: 量化类型bnb_4bit_use_double_quant: 是否使用双重量化bnb_4bit_compute_dtype: 计算数据类型Returns:BitsAndBytesConfig: 量化配置return BitsAndBytesConfig(load_in_4bitload_in_4bit,bnb_4bit_quant_typebnb_4bit_quant_type,bnb_4bit_use_double_quantbnb_4bit_use_double_quant,bnb_4bit_compute_dtypebnb_4bit_compute_dtype)staticmethoddef optimize_model_loading(model_name: str, device: str auto) - dict:优化模型加载参数Args:model_name: 模型名称device: 设备类型Returns:dict: 优化后的加载参数# 检查可用内存if torch.cuda.is_available():gpu_memory torch.cuda.get_device_properties(0).total_memorygpu_memory_gb gpu_memory / (1024**3)# 根据GPU内存选择优化策略if gpu_memory_gb 8:# 小显存优化使用8位量化return {torch_dtype: torch.float16,device_map: device,quantization_config: ModelOptimizer.create_quantization_config(),low_cpu_mem_usage: True,trust_remote_code: True}elif gpu_memory_gb 16:# 中等显存优化使用半精度return {torch_dtype: torch.float16,device_map: device,low_cpu_mem_usage: True,trust_remote_code: True}else:# 大显存标准加载return {torch_dtype: torch.float16,device_map: device,trust_remote_code: True}else:# CPU模式return {torch_dtype: torch.float32,device_map: cpu,low_cpu_mem_usage: True,trust_remote_code: True} 6.2 缓存策略优化
# src/cache_manager.py
import json
import hashlib
import time
from typing import Any, Optional, Dict
import redis
from datetime import datetime, timedeltaclass AdvancedCacheManager:高级缓存管理器def __init__(self, redis_host: str localhost, redis_port: int 6379):初始化缓存管理器Args:redis_host: Redis主机地址redis_port: Redis端口self.redis_client redis.Redis(hostredis_host,portredis_port,decode_responsesTrue,socket_connect_timeout5,socket_timeout5)# 缓存层级配置self.cache_levels {hot: {ttl: 300, prefix: hot:}, # 5分钟热缓存warm: {ttl: 1800, prefix: warm:}, # 30分钟温缓存cold: {ttl: 7200, prefix: cold:} # 2小时冷缓存}# 统计信息self.stats {hits: 0,misses: 0,total_requests: 0}def _generate_key(self, data: Dict[str, Any], level: str warm) - str:生成缓存键Args:data: 待缓存的数据level: 缓存级别Returns:str: 缓存键# 创建唯一标识符content json.dumps(data, sort_keysTrue)hash_key hashlib.sha256(content.encode()).hexdigest()prefix self.cache_levels[level][prefix]return f{prefix}{hash_key}def get(self, data: Dict[str, Any]) - Optional[Any]:获取缓存数据Args:data: 查询参数Returns:Optional[Any]: 缓存的数据或Noneself.stats[total_requests] 1# 按优先级查找缓存for level in [hot, warm, cold]:cache_key self._generate_key(data, level)try:cached_data self.redis_client.get(cache_key)if cached_data:self.stats[hits] 1# 如果是温缓存或冷缓存命中提升到热缓存if level ! hot:self._promote_to_hot(data, json.loads(cached_data))return json.loads(cached_data)except Exception as e:print(f缓存读取错误 [{level}]: {e})continueself.stats[misses] 1return Nonedef set(self, data: Dict[str, Any], value: Any, level: str warm):设置缓存数据Args:data: 键数据value: 值数据level: 缓存级别cache_key self._generate_key(data, level)ttl self.cache_levels[level][ttl]try:# 添加元数据cache_value {data: value,timestamp: datetime.now().isoformat(),level: level}self.redis_client.setex(cache_key,ttl,json.dumps(cache_value))except Exception as e:print(f缓存写入错误: {e})def _promote_to_hot(self, data: Dict[str, Any], cached_data: Dict[str, Any]):将缓存提升到热缓存try:self.set(data, cached_data[data], hot)except Exception as e:print(f缓存提升错误: {e})def get_stats(self) - Dict[str, Any]:获取缓存统计信息hit_rate (self.stats[hits] / self.stats[total_requests] if self.stats[total_requests] 0 else 0)return {**self.stats,hit_rate: round(hit_rate * 100, 2),redis_info: self._get_redis_info()}def _get_redis_info(self) - Dict[str, Any]:获取Redis信息try:info self.redis_client.info()return {used_memory_mb: round(info.get(used_memory, 0) / (1024**2), 2),connected_clients: info.get(connected_clients, 0),total_commands_processed: info.get(total_commands_processed, 0)}except Exception as e:return {error: str(e)}def clear_cache(self, pattern: str *):清理缓存try:keys self.redis_client.keys(pattern)if keys:self.redis_client.delete(*keys)return len(keys)return 0except Exception as e:print(f缓存清理错误: {e})return 0 七、部署与测试验证
7.1 一键部署脚本
#!/bin/bash
# deploy.sh - 一键部署脚本set -eecho 华为云Flexus AI Agent 部署脚本 # 检查必要的工具
check_requirements() {echo 检查部署要求...if ! command -v docker /dev/null; thenecho 错误: Docker 未安装exit 1fiif ! command -v docker-compose /dev/null; thenecho 错误: Docker Compose 未安装exit 1fiecho ✓ 环境检查通过
}# 创建必要的目录
create_directories() {echo 创建项目目录...mkdir -p {models,logs,ssl,data}echo ✓ 目录创建完成
}# 下载模型如果需要
download_models() {echo 检查模型文件...if [ ! -d models/deepseek-coder-6.7b-instruct ]; thenecho 下载DeepSeek模型这可能需要一些时间...# 这里可以添加模型下载逻辑echo 注意: 请手动下载模型文件到 models/ 目录fiecho ✓ 模型检查完成
}# 构建和启动服务
deploy_services() {echo 构建Docker镜像...docker-compose buildecho 启动服务...docker-compose up -decho 等待服务启动...sleep 30# 健康检查echo 执行健康检查...max_attempts10attempt1while [ $attempt -le $max_attempts ]; doif curl -f http://localhost/api/health /dev/null; thenecho ✓ 服务启动成功return 0fiecho 等待服务启动... ($attempt/$max_attempts)sleep 10((attempt))doneecho ❌ 服务启动失败docker-compose logsexit 1
}# 显示部署信息
show_deployment_info() {echo echo 部署完成 echo 服务地址: http://$(curl -s ifconfig.me)/apiecho 健康检查: http://$(curl -s ifconfig.me)/api/healthecho 统计信息: http://$(curl -s ifconfig.me)/api/statsecho echo 查看日志: docker-compose logs -fecho 停止服务: docker-compose downecho
}# 主函数
main() {check_requirementscreate_directoriesdownload_modelsdeploy_servicesshow_deployment_info
}# 执行部署
main $
7.2 性能测试脚本
# tests/performance_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Any
import json
import matplotlib.pyplot as plt
import seaborn as snsclass PerformanceTester:性能测试类def __init__(self, base_url: str http://localhost/api):初始化性能测试器Args:base_url: API基础URLself.base_url base_urlself.results []async def single_request(self, session: aiohttp.ClientSession, message: str) - Dict[str, Any]:执行单个请求Args:session: HTTP会话message: 测试消息Returns:Dict[str, Any]: 请求结果start_time time.time()try:async with session.post(f{self.base_url}/chat,json{message: message,max_tokens: 100,temperature: 0.7},timeoutaiohttp.ClientTimeout(total60)) as response:if response.status 200:data await response.json()end_time time.time()return {success: True,response_time: end_time - start_time,tokens_used: data.get(tokens_used, 0),server_response_time: data.get(response_time, 0),message_length: len(message)}else:return {success: False,error: fHTTP {response.status},response_time: time.time() - start_time}except Exception as e:return {success: False,error: str(e),response_time: time.time() - start_time}async def concurrent_test(self, messages: List[str], concurrent_users: int 10,iterations: int 5) - Dict[str, Any]:并发测试Args:messages: 测试消息列表concurrent_users: 并发用户数iterations: 迭代次数Returns:Dict[str, Any]: 测试结果print(f开始并发测试: {concurrent_users}个并发用户, {iterations}次迭代)async with aiohttp.ClientSession() as session:all_results []for iteration in range(iterations):print(f执行第 {iteration 1}/{iterations} 轮测试...)# 创建并发任务tasks []for i in range(concurrent_users):message messages[i % len(messages)]task self.single_request(session, message)tasks.append(task)# 执行并发请求iteration_start time.time()results await asyncio.gather(*tasks)iteration_end time.time()# 收集结果for result in results:result[iteration] iteration 1result[total_iteration_time] iteration_end - iteration_startall_results.append(result)self.results all_resultsreturn self.analyze_results()def analyze_results(self) - Dict[str, Any]:分析测试结果successful_requests [r for r in self.results if r[success]]failed_requests [r for r in self.results if not r[success]]if not successful_requests:return {error: 没有成功的请求,total_requests: len(self.results),failed_requests: len(failed_requests)}# 响应时间统计response_times [r[response_time] for r in successful_requests]server_response_times [r[server_response_time] for r in successful_requests]# 吞吐量统计total_time max([r.get(total_iteration_time, 0) for r in self.results])throughput len(successful_requests) / total_time if total_time 0 else 0analysis {总体统计: {总请求数: len(self.results),成功请求数: len(successful_requests),失败请求数: len(failed_requests),成功率: round(len(successful_requests) / len(self.results) * 100, 2),吞吐量 (req/s): round(throughput, 2)},响应时间统计: {平均响应时间: round(statistics.mean(response_times), 3),中位数响应时间: round(statistics.median(response_times), 3),最小响应时间: round(min(response_times), 3),最大响应时间: round(max(response_times), 3),95百分位: round(self._percentile(response_times, 95), 3),99百分位: round(self._percentile(response_times, 99), 3)},服务器处理时间: {平均处理时间: round(statistics.mean(server_response_times), 3),中位数处理时间: round(statistics.median(server_response_times), 3),最小处理时间: round(min(server_response_times), 3),最大处理时间: round(max(server_response_times), 3)}}# 错误统计if failed_requests:error_types {}for req in failed_requests:error req.get(error, Unknown)error_types[error] error_types.get(error, 0) 1analysis[错误统计] error_typesreturn analysisdef _percentile(self, data: List[float], percentile: int) - float:计算百分位数sorted_data sorted(data)index int((percentile / 100) * len(sorted_data))return sorted_data[index - 1] if index 0 else sorted_data[0]def generate_report(self, output_file: str performance_report.json):生成测试报告analysis self.analyze_results()# 保存JSON报告with open(output_file, w, encodingutf-8) as f:json.dump({test_timestamp: time.strftime(%Y-%m-%d %H:%M:%S),analysis: analysis,raw_results: self.results}, f, indent2, ensure_asciiFalse)print(f测试报告已保存到: {output_file})return analysisdef visualize_results(self):可视化测试结果if not self.results:print(没有测试结果可视化)returnsuccessful_requests [r for r in self.results if r[success]]if not successful_requests:print(没有成功的请求可视化)return# 创建图表fig, axes plt.subplots(2, 2, figsize(15, 10))# 响应时间分布response_times [r[response_time] for r in successful_requests]axes[0, 0].hist(response_times, bins20, alpha0.7)axes[0, 0].set_title(响应时间分布)axes[0, 0].set_xlabel(响应时间 (秒))axes[0, 0].set_ylabel(频次)# 响应时间趋势axes[0, 1].plot(response_times)axes[0, 1].set_title(响应时间趋势)axes[0,
八. 结论
当DeepSeek等大模型遇见边缘计算为AI Agent的智能化和实时化带来了前所未有的机遇。华为云Flexus凭借其秒级弹性、按需付费、异构算力支持等特性在这一融合趋势中扮演了至关重要的角色。它不仅可以作为强大的“近边缘”算力引擎为资源受限的边缘设备提供低延迟、高性能的大模型推理能力还能通过弹性伸缩有效应对AI Agent负载的动态变化实现性能与成本的最佳平衡。
通过构建端-边-Flexus近边缘-云的协同架构开发者可以充分利用DeepSeek的智能结合Flexus的弹性打造出响应更迅速、体验更流畅、能力更强大的AI Agent广泛应用于智能零售、智慧城市、工业物联网、自动驾驶等众多领域加速千行百业的智能化转型。虽然仍面临一些挑战但Flexus与边缘计算、大模型的结合无疑为AI Agent的未来发展开辟了广阔的前景。 九. 参考链接
华为云 Flexus 产品页面与文档
产品介绍: 华为云Flexus云服务_云服务器_Flexus-华为云
帮助文档: 成长地图_Flexus云服务-华为云
华为云 弹性伸缩 (Auto Scaling) 官方文档
https://support.huaweicloud.com/productdocs/AS/index.html
华为云 容器镜像服务 SWR (SoftWare Repository for Container) 文档
成长地图_容器镜像服务 SWR-华为云
华为云 API网关 (API Gateway) 文档
成长地图_API网关 APIG-华为云
DeepSeek AI 相关信息 (请关注其官方发布渠道获取最新信息)
(通常为官方网站、GitHub项目、研究论文等)
边缘计算相关概念与标准
ETSI MEC (Multi-access Edge Computing): ETSI - Multi-access Edge Computing - Standards for MEC
Flask Web Framework Documentation:
Welcome to Flask — Flask Documentation (3.1.x) 嗨我是IRpickstars如果你觉得这篇技术分享对你有启发 ️ 点击【点赞】让更多开发者看到这篇干货 【关注】解锁更多架构设计性能优化秘籍 【评论】留下你的技术见解或实战困惑 作为常年奋战在一线的技术博主我特别期待与你进行深度技术对话。每一个问题都是新的思考维度每一次讨论都能碰撞出创新的火花。 点击这里 IRpickstars的主页 获取最新技术解析与实战干货 ⚡️ 我的更新节奏 每周三晚8点深度技术长文每周日早10点高效开发技巧突发技术热点48小时内专题解析