news 2026/4/17 19:47:08

Python招聘信息聚合爬虫实战:使用Playwright与异步技术构建高效数据采集系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python招聘信息聚合爬虫实战:使用Playwright与异步技术构建高效数据采集系统

引言

在当今互联网时代,招聘信息的及时获取和整合对于求职者、招聘方以及人力资源研究者都具有重要意义。传统的单一招聘平台已无法满足多样化的信息需求,因此构建一个招聘信息聚合爬虫系统显得尤为必要。本文将深入探讨如何使用Python最新技术栈构建一个高效、稳定、可扩展的招聘信息聚合爬虫系统。

技术选型与架构设计

核心技术栈

  1. Playwright:微软推出的新一代浏览器自动化工具,支持多浏览器,比Selenium更快速稳定

  2. Asyncio:Python原生异步IO框架,实现高并发数据采集

  3. FastAPI:现代、快速的Web框架,用于构建API接口

  4. MongoDB:NoSQL数据库,适合存储非结构化的招聘数据

  5. Redis:缓存和数据去重

  6. Docker:容器化部署

系统架构

text

数据采集层 → 数据处理层 → 数据存储层 → API服务层 ↑ ↑ ↑ ↑ Playwright PySpark MongoDB FastAPI Asyncio Pandas Redis Uvicorn

项目实现

1. 环境配置与依赖安装

python

# requirements.txt playwright==1.40.0 asyncio==3.4.3 aiohttp==3.9.1 fastapi==0.104.1 uvicorn[standard]==0.24.0 pymongo==4.5.0 redis==5.0.1 pandas==2.1.3 pydantic==2.5.0 beautifulsoup4==4.12.2 lxml==4.9.3 celery==5.3.4 docker==6.1.3

2. 核心爬虫类实现

python

import asyncio import aiohttp from typing import List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime import json from urllib.parse import urljoin, urlparse import hashlib from contextlib import asynccontextmanager from playwright.async_api import async_playwright, Page, Browser from pydantic import BaseModel, Field from motor.motor_asyncio import AsyncIOMotorClient import redis.asyncio as redis import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 数据模型定义 class JobPosition(BaseModel): """职位数据模型""" id: str = Field(default_factory=lambda: hashlib.md5().hexdigest()) title: str company: str location: str salary: Optional[str] = None experience: Optional[str] = None education: Optional[str] = None job_type: Optional[str] = None description: str requirements: List[str] = Field(default_factory=list) benefits: List[str] = Field(default_factory=list) source: str # 来源网站 source_url: str published_date: datetime crawl_time: datetime = Field(default_factory=datetime.now) tags: List[str] = Field(default_factory=list) class Config: json_encoders = { datetime: lambda dt: dt.isoformat() } @dataclass class CrawlerConfig: """爬虫配置类""" user_agent: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" timeout: int = 30000 headless: bool = True max_concurrent: int = 5 retry_count: int = 3 proxy: Optional[str] = None class AsyncJobCrawler: """异步招聘信息爬虫""" def __init__(self, config: CrawlerConfig = None): self.config = config or CrawlerConfig() self.session = None self.browser = None self.context = None self.redis_client = None self.mongo_client = None self.db = None async def init_resources(self): """初始化资源""" # 初始化Redis连接 self.redis_client = await redis.Redis( host='localhost', port=6379, db=0, decode_responses=True ) # 初始化MongoDB连接 self.mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') self.db = self.mongo_client.job_aggregator # 初始化Playwright self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=self.config.headless, args=['--disable-blink-features=AutomationControlled'] ) # 创建浏览器上下文 self.context = await self.browser.new_context( user_agent=self.config.user_agent, viewport={'width': 1920, 'height': 1080} ) async def close_resources(self): """关闭资源""" if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() if self.redis_client: await self.redis_client.close() @asynccontextmanager async def get_page(self): """获取页面上下文管理器""" page = await self.context.new_page() try: yield page finally: await page.close() async def crawl_boss_zhipin(self, keyword: str, city: str = "北京") -> List[JobPosition]: """爬取BOSS直聘""" positions = [] base_url = f"https://www.zhipin.com/web/geek/job" async with self.get_page() as page: # 设置请求拦截,避免被检测 await page.route("**/*", lambda route: route.continue_()) # 构造查询参数 params = { "query": keyword, "city": city, "page": 1 } try: await page.goto(f"{base_url}?{self._dict_to_query(params)}", timeout=self.config.timeout) # 等待内容加载 await page.wait_for_selector(".job-list-box", timeout=10000) # 模拟滚动加载 for _ in range(3): await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) # 提取职位信息 job_items = await page.query_selector_all(".job-card-wrapper") for item in job_items[:10]: # 限制数量避免被封 try: title_elem = await item.query_selector(".job-title") company_elem = await item.query_selector(".company-name") salary_elem = await item.query_selector(".salary") if all([title_elem, company_elem]): title = await title_elem.text_content() company = await company_elem.text_content() salary = await salary_elem.text_content() if salary_elem else None # 获取详情页链接 detail_link = await item.get_attribute("href") if detail_link: detail_url = urljoin("https://www.zhipin.com", detail_link) # 爬取详情页 position_detail = await self._crawl_detail_page(detail_url) position = JobPosition( title=title.strip(), company=company.strip(), salary=salary.strip() if salary else None, location=city, source="BOSS直聘", source_url=detail_url, published_date=datetime.now(), **position_detail ) # 去重检查 if await self._is_duplicate(position): continue positions.append(position) # 保存到数据库 await self.save_position(position) except Exception as e: logger.error(f"解析职位项失败: {e}") continue except Exception as e: logger.error(f"爬取BOSS直聘失败: {e}") return positions async def _crawl_detail_page(self, url: str) -> Dict[str, Any]: """爬取详情页信息""" detail_info = { "description": "", "requirements": [], "benefits": [] } try: async with self.get_page() as page: await page.goto(url, timeout=self.config.timeout) # 等待内容加载 await page.wait_for_selector(".job-detail", timeout=5000) # 提取职位描述 desc_elem = await page.query_selector(".job-sec-text") if desc_elem: detail_info["description"] = await desc_elem.text_content() # 提取职位要求 req_elems = await page.query_selector_all(".job-requirement li") detail_info["requirements"] = [ await elem.text_content() for elem in req_elems ] # 提取福利待遇 benefit_elems = await page.query_selector_all(".job-benefits span") detail_info["benefits"] = [ await elem.text_content() for elem in benefit_elems ] except Exception as e: logger.error(f"爬取详情页失败 {url}: {e}") return detail_info async def crawl_lagou(self, keyword: str, city: str = "北京") -> List[JobPosition]: """爬取拉勾网""" positions = [] base_url = "https://www.lagou.com/jobs/list_" async with self.get_page() as page: # 设置Cookie绕过反爬 await page.context.add_cookies([ { "name": "user_trace_token", "value": "test_token", "domain": ".lagou.com", "path": "/" } ]) url = f"{base_url}{keyword}?city={city}" try: await page.goto(url, timeout=self.config.timeout) # 处理弹窗 try: close_btn = await page.wait_for_selector(".popup-close", timeout=3000) if close_btn: await close_btn.click() except: pass # 提取职位列表 await page.wait_for_selector(".item__10RTO", timeout=10000) job_items = await page.query_selector_all(".item__10RTO") for item in job_items[:10]: try: title_elem = await item.query_selector(".p-top__1F7CL a") company_elem = await item.query_selector(".company-name__2-SjF") salary_elem = await item.query_selector(".money__3Lkgq") if all([title_elem, company_elem]): title = await title_elem.text_content() company = await company_elem.text_content() salary = await salary_elem.text_content() if salary_elem else None detail_link = await title_elem.get_attribute("href") position = JobPosition( title=title.strip(), company=company.strip(), salary=salary.strip() if salary else None, location=city, source="拉勾网", source_url=detail_link or url, published_date=datetime.now(), description="" ) if await self._is_duplicate(position): continue positions.append(position) await self.save_position(position) except Exception as e: logger.error(f"解析拉勾职位失败: {e}") except Exception as e: logger.error(f"爬取拉勾网失败: {e}") return positions async def _is_duplicate(self, position: JobPosition) -> bool: """检查职位是否重复""" # 使用MD5生成唯一标识 position_hash = hashlib.md5( f"{position.title}_{position.company}_{position.source}".encode() ).hexdigest() # 检查Redis中是否存在 exists = await self.redis_client.exists(f"job:{position_hash}") if exists: return True # 设置24小时过期 await self.redis_client.setex(f"job:{position_hash}", 86400, "1") return False async def save_position(self, position: JobPosition): """保存职位到数据库""" try: # 保存到MongoDB await self.db.positions.update_one( {"id": position.id}, {"$set": position.dict()}, upsert=True ) logger.info(f"保存职位成功: {position.title}") except Exception as e: logger.error(f"保存职位失败: {e}") def _dict_to_query(self, params: Dict) -> str: """将字典转换为查询字符串""" return "&".join([f"{k}={v}" for k, v in params.items()]) async def crawl_multiple_sources(self, keyword: str, sources: List[str] = None) -> List[JobPosition]: """多源并发爬取""" if sources is None: sources = ["boss", "lagou"] tasks = [] if "boss" in sources: tasks.append(self.crawl_boss_zhipin(keyword)) if "lagou" in sources: tasks.append(self.crawl_lagou(keyword)) # 可以添加更多平台 results = await asyncio.gather(*tasks, return_exceptions=True) all_positions = [] for result in results: if isinstance(result, Exception): logger.error(f"爬取任务失败: {result}") elif isinstance(result, list): all_positions.extend(result) return all_positions

3. 分布式任务队列实现

python

from celery import Celery from pydantic import BaseModel from typing import List import asyncio # Celery配置 celery_app = Celery( 'job_crawler', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, ) class CrawlTask(BaseModel): """爬虫任务模型""" keywords: List[str] sources: List[str] cities: List[str] max_results: int = 50 @celery_app.task(bind=True, max_retries=3) def start_crawl_task(self, task_data: dict): """启动爬虫任务""" task = CrawlTask(**task_data) # 创建事件循环并运行异步任务 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: crawler = AsyncJobCrawler() loop.run_until_complete(crawler.init_resources()) all_results = [] for keyword in task.keywords: for city in task.cities: positions = loop.run_until_complete( crawler.crawl_multiple_sources(keyword, task.sources) ) all_results.extend(positions[:task.max_results]) loop.run_until_complete(crawler.close_resources()) return { "status": "success", "count": len(all_results), "data": [pos.dict() for pos in all_results] } except Exception as e: self.retry(exc=e, countdown=60) finally: loop.close()

4. FastAPI Web服务

python

from fastapi import FastAPI, HTTPException, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from typing import List, Optional from datetime import datetime, timedelta app = FastAPI( title="招聘信息聚合API", description="多源招聘信息聚合爬虫系统", version="1.0.0" ) # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return {"message": "招聘信息聚合爬虫API服务"} @app.post("/api/v1/crawl") async def start_crawl( keywords: List[str] = Query(..., description="搜索关键词"), sources: List[str] = Query(["boss", "lagou"], description="数据来源"), cities: List[str] = Query(["北京"], description="城市"), max_results: int = Query(50, description="每个关键词最大结果数"), background_tasks: BackgroundTasks = None ): """启动爬虫任务""" task_data = { "keywords": keywords, "sources": sources, "cities": cities, "max_results": max_results } # 异步执行爬虫任务 task = start_crawl_task.delay(task_data) return { "task_id": task.id, "status": "started", "message": "爬虫任务已启动" } @app.get("/api/v1/positions") async def get_positions( keyword: Optional[str] = None, city: Optional[str] = None, source: Optional[str] = None, page: int = 1, limit: int = 20, days: int = 7 ): """查询职位信息""" # 连接MongoDB from motor.motor_asyncio import AsyncIOMotorClient client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.job_aggregator # 构建查询条件 query = {} if keyword: query["$or"] = [ {"title": {"$regex": keyword, "$options": "i"}}, {"company": {"$regex": keyword, "$options": "i"}}, {"description": {"$regex": keyword, "$options": "i"}} ] if city: query["location"] = {"$regex": city, "$options": "i"} if source: query["source"] = source # 时间过滤 time_threshold = datetime.now() - timedelta(days=days) query["crawl_time"] = {"$gte": time_threshold} # 执行查询 cursor = db.positions.find(query).sort("published_date", -1) cursor.skip((page - 1) * limit).limit(limit) positions = await cursor.to_list(length=limit) total = await db.positions.count_documents(query) # 转换ObjectId为字符串 for pos in positions: pos["_id"] = str(pos["_id"]) return { "data": positions, "total": total, "page": page, "limit": limit, "total_pages": (total + limit - 1) // limit } @app.get("/api/v1/statistics") async def get_statistics(days: int = 30): """获取统计信息""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.job_aggregator time_threshold = datetime.now() - timedelta(days=days) # 各平台职位数量 pipeline = [ {"$match": {"crawl_time": {"$gte": time_threshold}}}, {"$group": {"_id": "$source", "count": {"$sum": 1}}}, {"$sort": {"count": -1}} ] source_stats = await db.positions.aggregate(pipeline).to_list(None) # 热门职位 title_pipeline = [ {"$match": {"crawl_time": {"$gte": time_threshold}}}, {"$group": {"_id": "$title", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, {"$limit": 10} ] hot_positions = await db.positions.aggregate(title_pipeline).to_list(None) # 城市分布 city_pipeline = [ {"$match": {"crawl_time": {"$gte": time_threshold}}}, {"$group": {"_id": "$location", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, {"$limit": 10} ] city_distribution = await db.positions.aggregate(city_pipeline).to_list(None) return { "source_distribution": source_stats, "hot_positions": hot_positions, "city_distribution": city_distribution, "period_days": days }

5. Docker部署配置

dockerfile

# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ unzip \ && wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 启动命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

yaml

# docker-compose.yml version: '3.8' services: mongodb: image: mongo:latest container_name: job_mongodb restart: always ports: - "27017:27017" volumes: - mongodb_data:/data/db environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: password redis: image: redis:alpine container_name: job_redis restart: always ports: - "6379:6379" volumes: - redis_data:/data celery_worker: build: . container_name: celery_worker restart: always command: celery -A main.celery_app worker --loglevel=info depends_on: - redis - mongodb environment: - REDIS_HOST=redis - MONGO_HOST=mongodb celery_beat: build: . container_name: celery_beat restart: always command: celery -A main.celery_app beat --loglevel=info depends_on: - redis - mongodb environment: - REDIS_HOST=redis - MONGO_HOST=mongodb web: build: . container_name: job_crawler_web restart: always ports: - "8000:8000" depends_on: - mongodb - redis - celery_worker environment: - REDIS_HOST=redis - MONGO_HOST=mongodb - MONGO_USERNAME=admin - MONGO_PASSWORD=password volumes: mongodb_data: redis_data:

6. 高级功能扩展

python

class AdvancedJobAnalyzer: """高级职位分析器""" def __init__(self): import nltk from sklearn.feature_extraction.text import TfidfVectorizer import jieba # 初始化NLP工具 nltk.download('stopwords') self.stopwords = set(nltk.corpus.stopwords.words('chinese')) self.vectorizer = TfidfVectorizer(max_features=100) async def analyze_salary_trend(self, positions: List[JobPosition]): """分析薪资趋势""" import pandas as pd import numpy as np df = pd.DataFrame([pos.dict() for pos in positions]) # 提取薪资数值 df['salary_numeric'] = df['salary'].apply(self._extract_salary) # 按职位分组分析 salary_by_title = df.groupby('title')['salary_numeric'].agg(['mean', 'count']) return salary_by_title.to_dict() def _extract_salary(self, salary_str: str) -> float: """从薪资字符串提取数值""" import re if not salary_str: return 0 # 匹配数字模式 pattern = r'(\d+\.?\d*)K?-\d+\.?\d*K' match = re.search(pattern, salary_str) if match: numbers = re.findall(r'\d+\.?\d*', match.group()) if numbers: return float(numbers[0]) return 0 async def extract_skills(self, positions: List[JobPosition]) -> Dict[str, List[str]]: """从职位描述中提取技能关键词""" skills_dict = {} # 预定义技能词库 tech_skills = { 'Python', 'Java', 'JavaScript', 'C++', 'Go', 'Rust', 'Django', 'Flask', 'FastAPI', 'Spring', 'React', 'Vue', 'MySQL', 'PostgreSQL', 'MongoDB', 'Redis', 'Elasticsearch', 'Docker', 'Kubernetes', 'AWS', 'Azure', 'GCP', 'TensorFlow', 'PyTorch', '机器学习', '深度学习' } for position in positions: text = f"{position.title} {position.description}" found_skills = [skill for skill in tech_skills if skill in text] if found_skills: skills_dict[position.title] = found_skills return skills_dict async def generate_industry_report(self, positions: List[JobPosition]) -> Dict: """生成行业分析报告""" from collections import Counter import pandas as pd # 提取公司行业信息 companies = [pos.company for pos in positions] # 简单的行业分类 industry_keywords = { '互联网': ['科技', '网络', '互联网', '信息', '软件'], '金融': ['银行', '证券', '保险', '金融', '投资'], '教育': ['教育', '培训', '学校', '学院'], '医疗': ['医疗', '医院', '健康', '医药'], '制造': ['制造', '工厂', '工业', '生产'] } industry_counts = Counter() for company in companies: for industry, keywords in industry_keywords.items(): if any(keyword in company for keyword in keywords): industry_counts[industry] += 1 break else: industry_counts['其他'] += 1 return dict(industry_counts)

反爬虫策略与应对方案

1. 动态User-Agent轮换

python

class UserAgentManager: """User-Agent管理器""" def __init__(self): self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15" ] def get_random_agent(self): import random return random.choice(self.user_agents)

2. IP代理池

python

class ProxyManager: """代理IP管理器""" def __init__(self): self.proxy_list = [] async def refresh_proxies(self): """刷新代理IP池""" async with aiohttp.ClientSession() as session: async with session.get('https://api.proxy-provider.com/proxies') as resp: data = await resp.json() self.proxy_list = data.get('proxies', []) def get_proxy(self): import random return random.choice(self.proxy_list) if self.proxy_list else None

3. 请求频率控制

python

class RateLimiter: """请求频率限制器""" def __init__(self, max_requests: int = 10, period: int = 60): self.max_requests = max_requests self.period = period self.requests = [] async def wait_if_needed(self): import asyncio from datetime import datetime, timedelta now = datetime.now() cutoff = now - timedelta(seconds=self.period) # 清理过期的请求记录 self.requests = [req for req in self.requests if req > cutoff] if len(self.requests) >= self.max_requests: # 等待直到有请求过期 wait_time = (self.requests[0] - cutoff).total_seconds() await asyncio.sleep(wait_time) self.requests.append(now)

性能优化建议

  1. 连接池管理:使用aiohttp的连接池复用连接

  2. 异步数据库操作:使用异步MongoDB驱动

  3. 数据缓存:使用Redis缓存热点数据

  4. 增量爬取:记录最后爬取时间,只爬取新数据

  5. 分布式爬取:使用Celery实现分布式任务调度

监控与日志

python

class MonitoringSystem: """爬虫监控系统""" def __init__(self): import prometheus_client self.request_counter = prometheus_client.Counter( 'crawler_requests_total', 'Total number of requests' ) self.error_counter = prometheus_client.Counter( 'crawler_errors_total', 'Total number of errors' ) def record_request(self, url: str, success: bool): self.request_counter.inc() if not success: self.error_counter.inc() def generate_report(self): """生成监控报告""" import psutil import time return { 'timestamp': time.time(), 'cpu_percent': psutil.cpu_percent(), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage': psutil.disk_usage('/').percent, 'requests_total': self.request_counter._value.get(), 'errors_total': self.error_counter._value.get() }

结语

本文详细介绍了一个完整的招聘信息聚合爬虫系统的设计与实现。通过使用Playwright、Asyncio、FastAPI等现代Python技术,我们构建了一个高效、可扩展的分布式爬虫系统。系统不仅实现了基本的数据采集功能,还包含了数据存储、API服务、任务调度、监控报警等完整的企业级功能。

在实际应用中,还需要注意以下事项:

  1. 遵守robots.txt:尊重网站的爬虫协议

  2. 数据隐私保护:妥善处理收集的个人信息

  3. 法律合规:确保爬虫行为符合相关法律法规

  4. 资源消耗控制:合理控制爬取频率,避免对目标网站造成压力

  5. 数据质量保证:建立数据清洗和验证机制

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 8:47:46

国际最高认可!全知科技通过CMMI5级认证,研发成熟度达领先水平

近日,全知科技正式获得由CMMI研究院颁发的CMMI V3.0成熟度五级(Level 5)认证证书,成为业内少数达到该国际最高成熟度等级的企业之一。CMMI 五级是对企业在软件研发过程管理、技术研发能力、质量控制体系以及持续改进机制等方面的全…

作者头像 李华
网站建设 2026/4/18 8:01:03

国内多个主要城市已建成或正在建设线网指挥中心

城市指挥中心名称/简称核心特点/定位状态主要依据北京北京市轨道交通指挥中心亚洲规模最大、接入线路最多、集成化水平最高的路网管理中枢-8已建成市政府官网-8上海上海轨道交通路网运营调度指挥大楼 (3C大楼)管理全球最长大都市轨交网络之一,全自动运行线路管理领先…

作者头像 李华
网站建设 2026/4/16 17:28:57

高精度除法

高精度除法分为两种情况: 1.高精度/低精度 2.高精度/高精度 对于情况1,高精度/低精度的思想: 首先定义两个long long 类型数据代表除数和余数,在用字符串类型来接收被除数,在定义两个数组分别是被除数数组和结果数组…

作者头像 李华
网站建设 2026/4/18 8:48:52

【前缀和+滑动窗口】LCR 008. 长度最小的子数组

求解代码 public static int minSubArrayLen(int target, int[] nums) {// 获取原数组长度int n nums.length;int[] preSum new int[n 1];preSum[0] 0;for (int i 1; i < n; i) {preSum[i] preSum[i - 1] nums[i - 1];}int left 0; int right 1; int ans Integ…

作者头像 李华
网站建设 2026/4/18 8:29:12

程序员必看!AI大模型六大应用场景深度解析,收藏学习不迷路

本文深入解析AI大模型在教育、医疗、金融、工业、电商和内容创作六大行业的应用变革&#xff0c;展示技术如何重塑商业模式并创造新机遇。从个性化学习到智能医疗&#xff0c;从精准金融到工业智能化&#xff0c;AI正成为各行业创新的核心驱动力&#xff0c;为从业者和投资者提…

作者头像 李华