news 2026/4/18 14:29:58

Python异步爬虫实战:高效采集设计素材网站最新技术详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python异步爬虫实战:高效采集设计素材网站最新技术详解

引言:设计素材采集的挑战与机遇

在数字化设计时代,高质量的设计素材是设计师创作的基石。然而,面对众多设计素材网站,手动下载效率低下且难以批量获取。Python爬虫技术为我们提供了自动化采集的解决方案。本文将深入探讨如何使用最新的Python异步爬虫技术,高效、合规地采集设计素材网站。

技术栈概览

  • Python 3.8+:核心编程语言

  • aiohttp:异步HTTP客户端/服务器框架

  • asyncio:Python原生异步I/O框架

  • BeautifulSoup4:HTML解析库

  • aiofiles:异步文件操作

  • Redis:分布式缓存和任务队列(可选)

  • 代理IP池:应对反爬机制

项目结构设计

text

design-material-crawler/ ├── src/ │ ├── crawler.py # 主爬虫逻辑 │ ├── parser.py # 页面解析器 │ ├── storage.py # 数据存储模块 │ ├── proxy_manager.py # 代理管理器 │ └── utils.py # 工具函数 ├── config/ │ └── settings.py # 配置文件 ├── data/ # 采集的数据 ├── logs/ # 日志文件 └── requirements.txt # 依赖列表

完整代码实现

1. 环境配置与依赖安装

requirements.txt:

txt

aiohttp==3.9.1 beautifulsoup4==4.12.2 aiofiles==23.2.1 redis==5.0.1 asyncio-throttle==1.0.2 fake-useragent==1.4.0 python-dotenv==1.0.0 pillow==10.1.0 opencv-python==4.9.0.80

2. 配置文件

config/settings.py:

python

import os from dotenv import load_dotenv load_dotenv() class Config: # 爬虫配置 MAX_CONCURRENT_REQUESTS = 10 REQUEST_TIMEOUT = 30 RETRY_ATTEMPTS = 3 DELAY_BETWEEN_REQUESTS = 1.0 # 目标网站配置 TARGET_SITES = { 'unsplash': { 'base_url': 'https://unsplash.com', 'search_url': 'https://unsplash.com/napi/search/photos', 'per_page': 20 }, 'pexels': { 'base_url': 'https://www.pexels.com', 'search_url': 'https://www.pexels.com/api/v3/search', 'api_key': os.getenv('PEXELS_API_KEY', '') } } # 存储配置 STORAGE_PATH = './data' IMAGE_FORMATS = ['jpg', 'png', 'webp', 'svg'] MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB # 代理配置 USE_PROXY = True PROXY_POOL_URL = os.getenv('PROXY_POOL_URL', '') # Redis配置 REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0') # 请求头配置 HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }

3. 主爬虫类实现

src/crawler.py:

python

import asyncio import aiohttp import aiofiles import logging from typing import List, Dict, Any, Optional from pathlib import Path from urllib.parse import urljoin, urlparse import hashlib import json from datetime import datetime from config.settings import Config from src.proxy_manager import ProxyManager from src.parser import Parser from src.storage import StorageManager from src.utils import rate_limiter, retry_handler, generate_file_hash logger = logging.getLogger(__name__) class DesignMaterialCrawler: """设计素材网站异步爬虫""" def __init__(self, config: Config): self.config = config self.session: Optional[aiohttp.ClientSession] = None self.proxy_manager = ProxyManager(config) if config.USE_PROXY else None self.parser = Parser() self.storage = StorageManager(config) self.semaphore = asyncio.Semaphore(config.MAX_CONCURRENT_REQUESTS) self.visited_urls = set() async def __aenter__(self): """异步上下文管理器入口""" timeout = aiohttp.ClientTimeout(total=self.config.REQUEST_TIMEOUT) connector = aiohttp.TCPConnector(limit=100, force_close=True) self.session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers=self.config.HEADERS ) await self.storage.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() await self.storage.close() @retry_handler(max_retries=3, delay=1.0) @rate_limiter(max_calls=10, period=1.0) async def fetch_page(self, url: str, params: Dict = None) -> str: """获取页面内容""" if url in self.visited_urls: return "" proxy = None if self.proxy_manager: proxy = await self.proxy_manager.get_proxy() try: async with self.semaphore: async with self.session.get( url, params=params, proxy=proxy, ssl=False ) as response: response.raise_for_status() content = await response.text() self.visited_urls.add(url) # 记录成功请求 logger.info(f"成功获取页面: {url}, 状态码: {response.status}") return content except aiohttp.ClientError as e: logger.error(f"请求失败 {url}: {str(e)}") # 标记代理失效 if proxy and self.proxy_manager: await self.proxy_manager.mark_proxy_failed(proxy) raise except asyncio.TimeoutError: logger.error(f"请求超时: {url}") raise async def download_file(self, url: str, filepath: Path) -> bool: """下载文件到本地""" if filepath.exists(): logger.info(f"文件已存在: {filepath}") return True try: async with self.session.get(url) as response: if response.status == 200: # 检查文件类型和大小 content_type = response.headers.get('Content-Type', '') content_length = int(response.headers.get('Content-Length', 0)) if not self._is_valid_file(content_type, content_length): logger.warning(f"无效文件类型或大小: {url}") return False # 异步写入文件 async with aiofiles.open(filepath, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) logger.info(f"文件下载成功: {filepath}") return True else: logger.error(f"下载失败 {url}: 状态码 {response.status}") return False except Exception as e: logger.error(f"下载文件异常 {url}: {str(e)}") return False def _is_valid_file(self, content_type: str, content_length: int) -> bool: """验证文件类型和大小""" # 检查文件大小 if content_length > self.config.MAX_FILE_SIZE: return False # 检查文件类型 valid_types = [f'image/{fmt}' for fmt in self.config.IMAGE_FORMATS] valid_types.extend(['application/zip', 'application/x-rar-compressed']) return any(valid_type in content_type for valid_type in valid_types) async def crawl_site(self, site_name: str, keywords: List[str], max_items: int = 100): """爬取指定网站的设计素材""" site_config = self.config.TARGET_SITES.get(site_name) if not site_config: logger.error(f"未找到网站配置: {site_name}") return logger.info(f"开始爬取 {site_name},关键词: {keywords}") all_items = [] for keyword in keywords: logger.info(f"搜索关键词: {keyword}") page = 1 collected_items = 0 while collected_items < max_items: try: # 构建请求参数 params = self._build_search_params(site_name, keyword, page) # 获取搜索结果 if site_name == 'unsplash': search_url = site_config['search_url'] content = await self.fetch_page(search_url, params) data = json.loads(content) items = self.parser.parse_unsplash_results(data) elif site_name == 'pexels': if not site_config.get('api_key'): logger.error("Pexels需要API Key") break search_url = site_config['search_url'] headers = {'Authorization': site_config['api_key']} async with self.session.get(search_url, params=params, headers=headers) as response: data = await response.json() items = self.parser.parse_pexels_results(data) else: logger.error(f"不支持的网站: {site_name}") break if not items: break # 处理每个素材项 tasks = [] for item in items: if collected_items >= max_items: break tasks.append(self._process_material_item(item, site_name, keyword)) collected_items += 1 # 并发处理 results = await asyncio.gather(*tasks, return_exceptions=True) # 收集成功的结果 for result in results: if isinstance(result, dict): all_items.append(result) page += 1 await asyncio.sleep(1) # 礼貌延迟 except Exception as e: logger.error(f"爬取过程异常: {str(e)}") break # 保存元数据 if all_items: metadata_file = self.storage.save_metadata(all_items, site_name) logger.info(f"爬取完成,共收集 {len(all_items)} 个素材,元数据保存至: {metadata_file}") def _build_search_params(self, site_name: str, keyword: str, page: int) -> Dict: """构建搜索参数""" if site_name == 'unsplash': return { 'query': keyword, 'page': page, 'per_page': min(20, self.config.TARGET_SITES['unsplash']['per_page']) } elif site_name == 'pexels': return { 'query': keyword, 'page': page, 'per_page': 15 } return {} async def _process_material_item(self, item: Dict, site_name: str, keyword: str) -> Dict: """处理单个素材项""" try: # 下载主图片 image_url = item.get('image_url') if not image_url: return {} # 生成文件名 filename = self._generate_filename(item, site_name) filepath = self.storage.get_filepath(site_name, 'images', filename) # 下载文件 success = await self.download_file(image_url, filepath) if success: # 计算文件哈希 file_hash = await generate_file_hash(filepath) # 构建元数据 metadata = { 'id': item.get('id', ''), 'title': item.get('title', ''), 'description': item.get('description', ''), 'image_url': image_url, 'download_url': item.get('download_url', ''), 'author': item.get('author', {}), 'tags': item.get('tags', []), 'keywords': [keyword], 'site': site_name, 'file_path': str(filepath), 'file_hash': file_hash, 'file_size': filepath.stat().st_size, 'crawled_at': datetime.now().isoformat(), 'metadata': item } # 保存缩略图 await self.storage.create_thumbnail(filepath) return metadata except Exception as e: logger.error(f"处理素材项失败: {str(e)}") return {} def _generate_filename(self, item: Dict, site_name: str) -> str: """生成文件名""" item_id = item.get('id', '') title = item.get('title', 'untitled').lower() # 清理文件名 import re title_clean = re.sub(r'[^\w\-_\. ]', '_', title) title_clean = re.sub(r'\s+', '_', title_clean) # 提取扩展名 image_url = item.get('image_url', '') ext = Path(urlparse(image_url).path).suffix if image_url else '.jpg' return f"{site_name}_{item_id}_{title_clean}{ext}" async def crawl_batch(self, sites_keywords: Dict[str, List[str]], max_items_per_site: int = 50): """批量爬取多个网站""" tasks = [] for site_name, keywords in sites_keywords.items(): task = self.crawl_site(site_name, keywords, max_items_per_site) tasks.append(task) # 并发执行所有任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 for site_name, result in zip(sites_keywords.keys(), results): if isinstance(result, Exception): logger.error(f"爬取 {site_name} 失败: {str(result)}") else: logger.info(f"爬取 {site_name} 完成")

4. 页面解析器

src/parser.py:

python

from bs4 import BeautifulSoup import json from typing import List, Dict, Any import re class Parser: """页面解析器""" def parse_unsplash_results(self, data: Dict) -> List[Dict]: """解析Unsplash API返回数据""" results = [] if 'results' in data: for item in data['results']: result = { 'id': item.get('id', ''), 'title': item.get('description', 'Untitled'), 'description': item.get('alt_description', ''), 'image_url': item.get('urls', {}).get('regular', ''), 'download_url': item.get('links', {}).get('download', ''), 'author': { 'name': item.get('user', {}).get('name', ''), 'username': item.get('user', {}).get('username', ''), 'profile_url': item.get('user', {}).get('links', {}).get('html', '') }, 'tags': [tag.get('title', '') for tag in item.get('tags', [])], 'width': item.get('width', 0), 'height': item.get('height', 0), 'color': item.get('color', ''), 'likes': item.get('likes', 0) } results.append(result) return results def parse_pexels_results(self, data: Dict) -> List[Dict]: """解析Pexels API返回数据""" results = [] if 'photos' in data: for item in data['photos']: result = { 'id': str(item.get('id', '')), 'title': item.get('alt', 'Untitled'), 'description': '', 'image_url': item.get('src', {}).get('large', ''), 'download_url': item.get('url', ''), 'author': { 'name': item.get('photographer', ''), 'profile_url': item.get('photographer_url', '') }, 'tags': [], 'width': item.get('width', 0), 'height': item.get('height', 0), 'avg_color': item.get('avg_color', ''), 'liked': item.get('liked', False) } results.append(result) return results def parse_html_page(self, html: str, site_type: str) -> List[Dict]: """解析HTML页面(备用方法)""" soup = BeautifulSoup(html, 'html.parser') results = [] if site_type == 'unsplash': # 解析Unsplash页面结构 image_elements = soup.select('figure img') for img in image_elements: src = img.get('src') or img.get('data-src') if src and 'images.unsplash.com' in src: result = { 'image_url': src, 'title': img.get('alt', ''), 'author': self._extract_author_unsplash(img) } results.append(result) return results def _extract_author_unsplash(self, element) -> Dict: """从Unsplash元素提取作者信息""" # 根据实际页面结构调整 author_info = { 'name': '', 'username': '', 'profile_url': '' } # 尝试找到作者信息 parent = element.find_parent('a', href=re.compile(r'/@')) if parent and parent.get('href'): author_info['profile_url'] = 'https://unsplash.com' + parent['href'] author_info['username'] = parent['href'].split('/@')[-1] return author_info

5. 存储管理器

src/storage.py:

python

import json import csv import sqlite3 from pathlib import Path from typing import List, Dict, Any, Optional import aiofiles import asyncio from datetime import datetime import hashlib from PIL import Image import io class StorageManager: """存储管理器""" def __init__(self, config): self.config = config self.base_path = Path(config.STORAGE_PATH) self.db_conn: Optional[sqlite3.Connection] = None async def initialize(self): """初始化存储目录和数据库""" # 创建目录结构 directories = ['images', 'thumbnails', 'metadata', 'logs'] for dir_name in directories: (self.base_path / dir_name).mkdir(parents=True, exist_ok=True) # 初始化数据库 self._init_database() def _init_database(self): """初始化SQLite数据库""" db_path = self.base_path / 'materials.db' self.db_conn = sqlite3.connect(str(db_path)) # 创建表 cursor = self.db_conn.cursor() # 素材表 cursor.execute(''' CREATE TABLE IF NOT EXISTS materials ( id TEXT PRIMARY KEY, title TEXT, description TEXT, image_url TEXT, download_url TEXT, author_name TEXT, author_url TEXT, tags TEXT, keywords TEXT, site TEXT, file_path TEXT, file_hash TEXT UNIQUE, file_size INTEGER, width INTEGER, height INTEGER, color TEXT, likes INTEGER, crawled_at TEXT, metadata TEXT ) ''') # 下载记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS download_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, material_id TEXT, file_path TEXT, download_time TEXT, status TEXT, error_message TEXT, FOREIGN KEY (material_id) REFERENCES materials (id) ) ''') self.db_conn.commit() def get_filepath(self, site_name: str, file_type: str, filename: str) -> Path: """获取文件路径""" date_str = datetime.now().strftime('%Y/%m/%d') filepath = self.base_path / site_name / file_type / date_str / filename # 确保目录存在 filepath.parent.mkdir(parents=True, exist_ok=True) return filepath def save_metadata(self, items: List[Dict], site_name: str) -> Path: """保存元数据""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') metadata_file = self.base_path / 'metadata' / f'{site_name}_{timestamp}.json' # 保存为JSON with open(metadata_file, 'w', encoding='utf-8') as f: json.dump(items, f, ensure_ascii=False, indent=2) # 同时保存到数据库 self._save_to_database(items) # 保存为CSV(可选) csv_file = metadata_file.with_suffix('.csv') self._save_to_csv(items, csv_file) return metadata_file def _save_to_database(self, items: List[Dict]): """保存数据到数据库""" cursor = self.db_conn.cursor() for item in items: cursor.execute(''' INSERT OR REPLACE INTO materials ( id, title, description, image_url, download_url, author_name, author_url, tags, keywords, site, file_path, file_hash, file_size, width, height, color, likes, crawled_at, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( item.get('id'), item.get('title'), item.get('description'), item.get('image_url'), item.get('download_url'), item.get('author', {}).get('name'), item.get('author', {}).get('profile_url'), json.dumps(item.get('tags', [])), json.dumps(item.get('keywords', [])), item.get('site'), item.get('file_path'), item.get('file_hash'), item.get('file_size'), item.get('metadata', {}).get('width'), item.get('metadata', {}).get('height'), item.get('metadata', {}).get('color'), item.get('metadata', {}).get('likes'), item.get('crawled_at'), json.dumps(item.get('metadata', {})) )) self.db_conn.commit() def _save_to_csv(self, items: List[Dict], csv_file: Path): """保存数据到CSV""" if not items: return # 提取CSV字段 fieldnames = [ 'id', 'title', 'description', 'site', 'author_name', 'keywords', 'tags', 'file_path', 'file_size', 'crawled_at' ] with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for item in items: row = { 'id': item.get('id', ''), 'title': item.get('title', ''), 'description': item.get('description', ''), 'site': item.get('site', ''), 'author_name': item.get('author', {}).get('name', ''), 'keywords': ';'.join(item.get('keywords', [])), 'tags': ';'.join(item.get('tags', [])), 'file_path': item.get('file_path', ''), 'file_size': item.get('file_size', 0), 'crawled_at': item.get('crawled_at', '') } writer.writerow(row) async def create_thumbnail(self, image_path: Path, size: tuple = (200, 200)): """创建缩略图""" try: thumb_path = self.base_path / 'thumbnails' / image_path.name # 异步执行图像处理 await asyncio.to_thread(self._generate_thumbnail, image_path, thumb_path, size) except Exception as e: print(f"创建缩略图失败 {image_path}: {str(e)}") def _generate_thumbnail(self, src_path: Path, dst_path: Path, size: tuple): """生成缩略图(在单独线程中执行)""" with Image.open(src_path) as img: img.thumbnail(size, Image.Resampling.LANCZOS) # 确保缩略图目录存在 dst_path.parent.mkdir(parents=True, exist_ok=True) # 保存缩略图 img.save(dst_path, 'JPEG', quality=85) async def close(self): """关闭存储资源""" if self.db_conn: self.db_conn.close()

6. 工具函数

src/utils.py:

python

import asyncio import hashlib import time from functools import wraps from pathlib import Path from typing import Callable, Any import aiofiles def rate_limiter(max_calls: int, period: float): """限流装饰器""" def decorator(func): last_reset = time.time() call_count = 0 @wraps(func) async def wrapper(*args, **kwargs): nonlocal last_reset, call_count current_time = time.time() if current_time - last_reset > period: last_reset = current_time call_count = 0 if call_count >= max_calls: wait_time = period - (current_time - last_reset) if wait_time > 0: await asyncio.sleep(wait_time) last_reset = time.time() call_count = 0 call_count += 1 return await func(*args, **kwargs) return wrapper return decorator def retry_handler(max_retries: int = 3, delay: float = 1.0): """重试装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries: wait_time = delay * (2 ** attempt) # 指数退避 await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator async def generate_file_hash(filepath: Path, algorithm: str = 'sha256') -> str: """生成文件哈希""" hash_func = hashlib.new(algorithm) async with aiofiles.open(filepath, 'rb') as f: while chunk := await f.read(8192): hash_func.update(chunk) return hash_func.hexdigest() def validate_url(url: str) -> bool: """验证URL格式""" from urllib.parse import urlparse try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False class ProgressTracker: """进度跟踪器""" def __init__(self, total: int): self.total = total self.completed = 0 self.start_time = time.time() def update(self, increment: int = 1): """更新进度""" self.completed += increment # 计算进度百分比 percentage = (self.completed / self.total) * 100 # 计算预计剩余时间 elapsed = time.time() - self.start_time if self.completed > 0: estimated_total = elapsed * (self.total / self.completed) remaining = estimated_total - elapsed else: remaining = 0 print(f"进度: {percentage:.1f}% | 已完成: {self.completed}/{self.total} | " f"剩余时间: {remaining:.0f}秒")

7. 主程序入口

main.py:

python

import asyncio import logging from pathlib import Path import sys from config.settings import Config from src.crawler import DesignMaterialCrawler def setup_logging(): """配置日志""" log_dir = Path('logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / f'crawler_{asyncio.get_event_loop().time()}.log' logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) async def main(): """主函数""" # 配置日志 setup_logging() logger = logging.getLogger(__name__) # 加载配置 config = Config() # 定义爬取任务 sites_keywords = { 'unsplash': ['design', 'background', 'texture', 'pattern', 'minimal'], # 'pexels': ['design', 'creative', 'art', 'graphic'] # 需要API Key } try: async with DesignMaterialCrawler(config) as crawler: # 批量爬取 await crawler.crawl_batch(sites_keywords, max_items_per_site=20) # 或者单独爬取 # await crawler.crawl_site('unsplash', ['nature', 'technology'], max_items=50) except KeyboardInterrupt: logger.info("用户中断爬取过程") except Exception as e: logger.error(f"爬取过程发生错误: {str(e)}", exc_info=True) finally: logger.info("爬虫程序结束") if __name__ == '__main__': # 设置事件循环策略(Windows兼容) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 运行主程序 asyncio.run(main())

高级特性与优化

1. 分布式爬虫扩展

python

# src/distributed.py import asyncio import redis.asyncio as redis from typing import List, Dict import json class DistributedCrawler: """分布式爬虫管理器""" def __init__(self, config): self.config = config self.redis_client = None self.queue_name = 'crawler:tasks' self.result_name = 'crawler:results' async def initialize(self): """初始化Redis连接""" self.redis_client = await redis.from_url( self.config.REDIS_URL, decode_responses=True ) async def push_tasks(self, tasks: List[Dict]): """推送任务到队列""" for task in tasks: await self.redis_client.lpush( self.queue_name, json.dumps(task) ) async def get_results(self, count: int = 100) -> List[Dict]: """获取处理结果""" results = [] for _ in range(count): result = await self.redis_client.rpop(self.result_name) if result: results.append(json.loads(result)) return results

2. 图像内容分析

python

# src/image_analyzer.py import cv2 import numpy as np from PIL import Image import asyncio from typing import Dict class ImageAnalyzer: """图像内容分析器""" @staticmethod async def analyze_image(image_path: Path) -> Dict: """分析图像特征""" # 在单独线程中执行CPU密集型操作 return await asyncio.to_thread( ImageAnalyzer._analyze_image_sync, image_path ) @staticmethod def _analyze_image_sync(image_path: Path) -> Dict: """同步图像分析""" try: # 使用OpenCV分析 image = cv2.imread(str(image_path)) if image is None: return {} # 提取颜色直方图 hist = cv2.calcHist([image], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) hist = cv2.normalize(hist, hist).flatten() # 计算平均颜色 avg_color = cv2.mean(image)[:3] # 检测边缘 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 100, 200) edge_density = np.sum(edges > 0) / edges.size return { 'dominant_colors': hist.tolist(), 'average_color': avg_color, 'edge_density': float(edge_density), 'resolution': f"{image.shape[1]}x{image.shape[0]}" } except Exception as e: print(f"图像分析失败: {str(e)}") return {}

爬虫伦理与合规性

1. Robots.txt 遵守

python

# src/robots_checker.py import urllib.robotparser import aiohttp class RobotsChecker: """Robots.txt检查器""" def __init__(self): self.parser = urllib.robotparser.RobotFileParser() async def can_fetch(self, url: str, user_agent: str = '*') -> bool: """检查是否允许爬取""" base_url = self._extract_base_url(url) robots_url = f"{base_url}/robots.txt" try: async with aiohttp.ClientSession() as session: async with session.get(robots_url) as response: if response.status == 200: content = await response.text() self.parser.parse(content.splitlines()) return self.parser.can_fetch(user_agent, url) except: pass return True # 如果无法获取robots.txt,默认允许 def _extract_base_url(self, url: str) -> str: """提取基础URL""" from urllib.parse import urlparse parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}"

2. 使用注意事项

  1. 尊重版权:仅下载允许商业使用的素材

  2. 遵守条款:仔细阅读目标网站的Terms of Service

  3. 控制频率:添加适当延迟,避免对服务器造成压力

  4. 设置User-Agent:明确标识你的爬虫

  5. 处理错误:妥善处理404、429等HTTP状态码

  6. 数据去重:避免重复下载相同内容

性能优化建议

  1. 连接池复用:重用HTTP连接,减少TCP握手开销

  2. 异步文件IO:使用aiofiles避免文件操作阻塞事件循环

  3. 内存管理:及时释放大对象,使用生成器处理大量数据

  4. 错误恢复:实现检查点机制,支持断点续爬

  5. 缓存策略:缓存已解析的页面和数据

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 10:50:04

Gatling高并发场景下Sonic响应延迟基准测试

Gatling高并发场景下Sonic响应延迟基准测试 在虚拟数字人技术加速落地的今天&#xff0c;一个核心挑战正摆在开发者面前&#xff1a;如何让AI生成的“会说话的人像”不仅看起来自然&#xff0c;还能在成千上万用户同时访问时依然保持低延迟、高可用&#xff1f;这已经不再是单纯…

作者头像 李华
网站建设 2026/4/18 5:09:24

Sonic数字人支持MP3/WAV音频输入,轻松实现语音驱动动画

Sonic数字人支持MP3/WAV音频输入&#xff0c;轻松实现语音驱动动画 在短视频内容爆炸式增长的今天&#xff0c;一个现实问题摆在创作者面前&#xff1a;如何以最低成本、最快速度生成专业级的“人物讲解”视频&#xff1f;传统拍摄受限于演员档期、场地灯光和后期剪辑&#xff…

作者头像 李华
网站建设 2026/4/18 8:04:34

SLSA框架保障Sonic软件物料清单完整性

SLSA框架保障Sonic软件物料清单完整性 在AI模型日益成为关键生产要素的今天&#xff0c;一个看似不起眼的.ckpt文件背后&#xff0c;可能潜藏着巨大的安全风险。想象一下&#xff1a;你正在为某教育平台生成数字人讲师视频&#xff0c;使用的是一份从公开仓库下载的“官方”So…

作者头像 李华
网站建设 2026/4/18 8:15:32

无需3D建模!Sonic数字人仅需一张图+一段音频即可生成动态说话视频

无需3D建模&#xff01;Sonic数字人仅需一张图一段音频即可生成动态说话视频 在短视频内容爆炸式增长的今天&#xff0c;你是否曾想过&#xff1a;一个能自然开口说话的“数字人”&#xff0c;竟然不需要任何3D建模、也不用动画师逐帧调整&#xff1f;只需要一张照片和一段录音…

作者头像 李华
网站建设 2026/4/18 11:01:01

rr反向调试Sonic难以复现的问题

rr反向调试Sonic难以复现的问题 在数字人内容创作爆发式增长的今天&#xff0c;越来越多的内容团队依赖AI模型实现“一张图一段音频说话视频”的自动化生产。腾讯与浙江大学联合推出的轻量级唇形同步模型 Sonic 正是这一趋势下的代表性成果——无需3D建模、支持ComfyUI图形化操…

作者头像 李华
网站建设 2026/4/18 8:20:41

valgrind检查Sonic内存泄漏与越界访问

Valgrind检查Sonic内存泄漏与越界访问 在AI驱动的数字人系统日益普及的今天&#xff0c;一个看似微小的技术缺陷——比如一次未释放的内存分配或一行越界的数组访问——就可能让整个虚拟主播服务在直播中途崩溃。这不仅影响用户体验&#xff0c;更可能暴露安全漏洞。尤其是在像…

作者头像 李华