Qwen3-Reranker-4B API开发指南:快速构建RESTful服务
如果你正在做搜索、推荐或者问答系统,肯定遇到过这样的问题:从海量文档里找出来的结果,排在前面的不一定是最相关的。传统的向量检索能帮你找到相似的,但判断“好不好”这件事,还得靠人工或者更复杂的模型。
Qwen3-Reranker-4B就是专门解决这个问题的。它是一个重排序模型,简单来说,就是给检索出来的结果再打一次分,把真正相关的排到最前面。今天咱们不聊复杂的原理,就手把手教你如何把这个模型包装成一个标准的RESTful API服务,让你能像调用普通接口一样使用它。
1. 为什么需要API服务?
你可能已经在本地跑过Qwen3-Reranker-4B的示例代码了,用transformers或者vLLM直接调用确实能工作。但在实际项目中,你会遇到几个现实问题:
- 并发处理:多个用户同时请求怎么办?
- 资源管理:模型加载在内存里,怎么避免重复加载?
- 接口标准化:前端、移动端、其他服务怎么方便地调用?
- 监控和日志:出了问题怎么快速定位?
把这些功能都自己实现一遍太费时间了。更好的办法是搭建一个专门的API服务,把模型封装起来,对外提供统一的接口。这样无论是Web应用、移动App还是其他微服务,都能通过HTTP请求轻松调用。
2. 环境准备与快速部署
2.1 基础环境要求
首先确保你的机器满足以下条件:
- Python 3.9+:建议用Python 3.10或3.11
- CUDA 11.8+:如果有NVIDIA GPU的话
- 至少8GB显存:Qwen3-Reranker-4B模型本身需要,实际使用根据并发量可能需要更多
- 16GB以上内存:处理长文本时需要足够的内存
如果你用的是CPU,也能跑,只是速度会慢一些。对于生产环境,强烈建议使用GPU。
2.2 安装依赖包
创建一个新的Python虚拟环境,然后安装必要的包:
# 创建虚拟环境 python -m venv reranker_env source reranker_env/bin/activate # Linux/Mac # 或者 reranker_env\Scripts\activate # Windows # 安装核心依赖 pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 # 根据你的CUDA版本调整 pip install transformers>=4.51.0 pip install fastapi uvicorn pydantic pip install python-multipart # 处理文件上传 pip install httpx # 异步HTTP客户端如果你打算用vLLM来加速推理(推荐生产环境使用),还需要安装:
pip install vllm>=0.8.52.3 快速验证模型
在开始写API之前,先确认模型能正常工作。创建一个简单的测试脚本:
# test_model.py import torch from transformers import AutoTokenizer, AutoModelForCausalLM def test_basic_inference(): """测试基础推理功能""" print("正在加载模型...") # 加载tokenizer和模型 tokenizer = AutoTokenizer.from_pretrained( "Qwen/Qwen3-Reranker-4B", padding_side='left' ) model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen3-Reranker-4B", torch_dtype=torch.float16, # 使用半精度减少显存占用 device_map="auto" # 自动分配到可用设备 ).eval() print("模型加载完成!") # 准备测试数据 task = 'Given a web search query, retrieve relevant passages that answer the query' queries = ["What is the capital of France?"] documents = ["Paris is the capital of France.", "Berlin is the capital of Germany."] # 格式化输入 pairs = [] for query, doc in zip(queries, documents): formatted = f"<Instruct>: {task}\n<Query>: {query}\n<Document>: {doc}" pairs.append(formatted) # 准备模型输入 max_length = 8192 prefix = "<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be \"yes\" or \"no\".<|im_end|>\n<|im_start|>user\n" suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n" prefix_tokens = tokenizer.encode(prefix, add_special_tokens=False) suffix_tokens = tokenizer.encode(suffix, add_special_tokens=False) inputs = tokenizer( pairs, padding=False, truncation='longest_first', return_attention_mask=False, max_length=max_length - len(prefix_tokens) - len(suffix_tokens) ) for i, ele in enumerate(inputs['input_ids']): inputs['input_ids'][i] = prefix_tokens + ele + suffix_tokens inputs = tokenizer.pad(inputs, padding=True, return_tensors="pt", max_length=max_length) inputs = {k: v.to(model.device) for k, v in inputs.items()} # 推理 with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits[:, -1, :] # 获取"yes"和"no"的token id token_false_id = tokenizer.convert_tokens_to_ids("no") token_true_id = tokenizer.convert_tokens_to_ids("yes") true_scores = logits[:, token_true_id] false_scores = logits[:, token_false_id] # 计算概率 batch_scores = torch.stack([false_scores, true_scores], dim=1) batch_scores = torch.nn.functional.log_softmax(batch_scores, dim=1) scores = batch_scores[:, 1].exp().tolist() print(f"查询: {queries[0]}") for doc, score in zip(documents, scores): print(f" 文档: {doc}") print(f" 相关性分数: {score:.4f}") print(f" 是否相关: {'是' if score > 0.5 else '否'}") print() if __name__ == "__main__": test_basic_inference()运行这个脚本,如果一切正常,你会看到类似这样的输出:
正在加载模型... 模型加载完成! 查询: What is the capital of France? 文档: Paris is the capital of France. 相关性分数: 0.9821 是否相关: 是 文档: Berlin is the capital of Germany. 相关性分数: 0.0234 是否相关: 否3. 构建FastAPI服务
现在模型能跑了,我们来把它包装成API服务。FastAPI是一个现代、快速的Web框架,特别适合构建API。
3.1 基础API结构
先创建一个最简单的API服务:
# app.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Optional import torch from transformers import AutoTokenizer, AutoModelForCausalLM import logging import asyncio from contextlib import asynccontextmanager # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 定义请求和响应模型 class RerankRequest(BaseModel): """重排序请求""" query: str documents: List[str] instruction: Optional[str] = None top_k: Optional[int] = None class RerankResponse(BaseModel): """重排序响应""" scores: List[float] ranked_documents: List[str] ranked_scores: List[float] class BatchRerankRequest(BaseModel): """批量重排序请求""" pairs: List[dict] # 每个元素包含query和documents class HealthResponse(BaseModel): """健康检查响应""" status: str model_loaded: bool device: str # 全局变量存储模型实例 _model = None _tokenizer = None _device = None @asynccontextmanager async def lifespan(app: FastAPI): """生命周期管理:启动时加载模型,关闭时清理""" global _model, _tokenizer, _device logger.info("正在加载Qwen3-Reranker-4B模型...") try: # 加载tokenizer _tokenizer = AutoTokenizer.from_pretrained( "Qwen/Qwen3-Reranker-4B", padding_side='left' ) # 根据是否有GPU选择设备 if torch.cuda.is_available(): _device = "cuda" logger.info(f"检测到GPU: {torch.cuda.get_device_name(0)}") _model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen3-Reranker-4B", torch_dtype=torch.float16, device_map="auto" ).eval() else: _device = "cpu" logger.warning("未检测到GPU,使用CPU模式,推理速度会较慢") _model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen3-Reranker-4B" ).eval().to("cpu") logger.info(f"模型加载完成,运行在: {_device}") yield except Exception as e: logger.error(f"模型加载失败: {str(e)}") raise finally: # 清理资源 if _model is not None: del _model _model = None if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("模型资源已释放") # 创建FastAPI应用 app = FastAPI( title="Qwen3-Reranker-4B API", description="基于Qwen3-Reranker-4B的重排序API服务", version="1.0.0", lifespan=lifespan ) @app.get("/") async def root(): """根路径,返回服务信息""" return { "service": "Qwen3-Reranker-4B API", "version": "1.0.0", "status": "running" } @app.get("/health", response_model=HealthResponse) async def health_check(): """健康检查端点""" return HealthResponse( status="healthy", model_loaded=_model is not None, device=_device or "unknown" ) def format_instruction(instruction: Optional[str], query: str, doc: str) -> str: """格式化指令、查询和文档""" if instruction is None: instruction = 'Given a web search query, retrieve relevant passages that answer the query' return f"<Instruct>: {instruction}\n<Query>: {query}\n<Document>: {doc}" def compute_scores(pairs: List[str]) -> List[float]: """计算相关性分数""" global _model, _tokenizer if _model is None or _tokenizer is None: raise HTTPException(status_code=503, detail="模型未加载") # 准备模型输入 max_length = 8192 prefix = "<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be \"yes\" or \"no\".<|im_end|>\n<|im_start|>user\n" suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n" prefix_tokens = _tokenizer.encode(prefix, add_special_tokens=False) suffix_tokens = _tokenizer.encode(suffix, add_special_tokens=False) # 分词 inputs = _tokenizer( pairs, padding=False, truncation='longest_first', return_attention_mask=False, max_length=max_length - len(prefix_tokens) - len(suffix_tokens) ) # 添加前缀和后缀 for i, ele in enumerate(inputs['input_ids']): inputs['input_ids'][i] = prefix_tokens + ele + suffix_tokens # 填充并转换为tensor inputs = _tokenizer.pad(inputs, padding=True, return_tensors="pt", max_length=max_length) inputs = {k: v.to(_model.device) for k, v in inputs.items()} # 推理 with torch.no_grad(): outputs = _model(**inputs) logits = outputs.logits[:, -1, :] # 获取"yes"和"no"的token id token_false_id = _tokenizer.convert_tokens_to_ids("no") token_true_id = _tokenizer.convert_tokens_to_ids("yes") true_scores = logits[:, token_true_id] false_scores = logits[:, token_false_id] # 计算概率 batch_scores = torch.stack([false_scores, true_scores], dim=1) batch_scores = torch.nn.functional.log_softmax(batch_scores, dim=1) scores = batch_scores[:, 1].exp().tolist() return scores @app.post("/rerank", response_model=RerankResponse) async def rerank_documents(request: RerankRequest): """单查询多文档重排序""" try: logger.info(f"处理重排序请求,查询: {request.query[:50]}...,文档数: {len(request.documents)}") # 格式化输入对 pairs = [ format_instruction(request.instruction, request.query, doc) for doc in request.documents ] # 计算分数 scores = compute_scores(pairs) # 根据分数排序 scored_docs = list(zip(request.documents, scores)) scored_docs.sort(key=lambda x: x[1], reverse=True) # 如果指定了top_k,只返回前k个 if request.top_k and request.top_k > 0: scored_docs = scored_docs[:request.top_k] ranked_documents = [doc for doc, _ in scored_docs] ranked_scores = [score for _, score in scored_docs] return RerankResponse( scores=scores, ranked_documents=ranked_documents, ranked_scores=ranked_scores ) except Exception as e: logger.error(f"重排序失败: {str(e)}") raise HTTPException(status_code=500, detail=f"内部错误: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)3.2 运行和测试API
保存上面的代码为app.py,然后运行:
python app.py服务启动后,打开浏览器访问http://localhost:8000/docs,你会看到自动生成的API文档页面。这里可以测试接口:
- 点击
/rerank接口的 "Try it out" 按钮 - 输入测试数据:
{ "query": "What is machine learning?", "documents": [ "Machine learning is a subset of artificial intelligence.", "Python is a popular programming language.", "Deep learning is a type of machine learning.", "The weather today is sunny." ], "instruction": "Given a web search query, retrieve relevant passages that answer the query", "top_k": 3 }- 点击 "Execute" 发送请求
你应该能看到返回的结果,文档会按照相关性从高到低排序。
4. 进阶功能实现
基础API能用了,但实际项目中还需要更多功能。下面我们逐步添加。
4.1 批量处理接口
单个查询处理多个文档很常见,但有时我们需要批量处理多个查询。添加批量接口:
# 在app.py中添加 @app.post("/batch_rerank") async def batch_rerank_documents(request: BatchRerankRequest): """批量重排序""" try: logger.info(f"处理批量重排序请求,任务数: {len(request.pairs)}") all_results = [] for pair in request.pairs: query = pair.get("query", "") documents = pair.get("documents", []) instruction = pair.get("instruction") if not query or not documents: all_results.append({ "error": "Missing query or documents", "query": query, "documents": documents }) continue # 格式化输入对 pairs_formatted = [ format_instruction(instruction, query, doc) for doc in documents ] # 计算分数 scores = compute_scores(pairs_formatted) # 排序 scored_docs = list(zip(documents, scores)) scored_docs.sort(key=lambda x: x[1], reverse=True) all_results.append({ "query": query, "scores": scores, "ranked_documents": [doc for doc, _ in scored_docs], "ranked_scores": [score for _, score in scored_docs] }) return {"results": all_results} except Exception as e: logger.error(f"批量重排序失败: {str(e)}") raise HTTPException(status_code=500, detail=f"内部错误: {str(e)}")4.2 使用vLLM加速
对于生产环境,使用vLLM可以显著提升推理速度。修改模型加载部分:
# vllm_integration.py from vllm import LLM, SamplingParams from transformers import AutoTokenizer import torch import math from typing import List, Tuple class VLLMReranker: """使用vLLM加速的重排序器""" def __init__(self, model_path: str = "Qwen/Qwen3-Reranker-4B"): self.model_path = model_path self.model = None self.tokenizer = None self.sampling_params = None def initialize(self): """初始化vLLM模型""" if torch.cuda.is_available(): num_gpus = torch.cuda.device_count() logger.info(f"使用vLLM加速,检测到 {num_gpus} 个GPU") self.model = LLM( model=self.model_path, tensor_parallel_size=num_gpus, max_model_len=10000, enable_prefix_caching=True, gpu_memory_utilization=0.8, trust_remote_code=True ) else: logger.warning("未检测到GPU,vLLM将使用CPU模式") self.model = LLM( model=self.model_path, max_model_len=10000, trust_remote_code=True ) self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) self.tokenizer.padding_side = "left" self.tokenizer.pad_token = self.tokenizer.eos_token # 准备采样参数 true_token = self.tokenizer("yes", add_special_tokens=False).input_ids[0] false_token = self.tokenizer("no", add_special_tokens=False).input_ids[0] self.sampling_params = SamplingParams( temperature=0, max_tokens=1, logprobs=20, allowed_token_ids=[true_token, false_token] ) logger.info("vLLM模型初始化完成") def format_instruction_vllm(self, instruction: str, query: str, doc: str) -> List[dict]: """为vLLM格式化输入""" if instruction is None: instruction = 'Given a web search query, retrieve relevant passages that answer the query' return [ { "role": "system", "content": "Judge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be \"yes\" or \"no\"." }, { "role": "user", "content": f"<Instruct>: {instruction}\n\n<Query>: {query}\n\n<Document>: {doc}" } ] def compute_scores_vllm(self, pairs: List[Tuple[str, str, str]]) -> List[float]: """使用vLLM计算分数""" if self.model is None or self.tokenizer is None: raise ValueError("模型未初始化") # 准备消息 messages = [] for instruction, query, doc in pairs: messages.append(self.format_instruction_vllm(instruction, query, doc)) # 应用聊天模板 prompts = self.tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=False, enable_thinking=False ) # 添加后缀 suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n" suffix_tokens = self.tokenizer.encode(suffix, add_special_tokens=False) max_length = 8192 prompts = [prompt[:max_length - len(suffix_tokens)] + suffix_tokens for prompt in prompts] # 推理 outputs = self.model.generate(prompts, self.sampling_params, use_tqdm=False) # 计算分数 scores = [] true_token = self.tokenizer("yes", add_special_tokens=False).input_ids[0] false_token = self.tokenizer("no", add_special_tokens=False).input_ids[0] for output in outputs: final_logits = output.outputs[0].logprobs[-1] true_logit = final_logits.get(true_token, -10).logprob false_logit = final_logits.get(false_token, -10).logprob true_score = math.exp(true_logit) false_score = math.exp(false_logit) score = true_score / (true_score + false_score) if (true_score + false_score) > 0 else 0 scores.append(score) return scores def cleanup(self): """清理资源""" if self.model is not None: del self.model self.model = None if torch.cuda.is_available(): torch.cuda.empty_cache()4.3 添加缓存和限流
为了提高性能和防止滥用,添加缓存和限流:
# middleware.py from fastapi import Request, HTTPException from fastapi.middleware.coroutine import CoroutineMiddleware from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded import time from functools import lru_cache import hashlib import json # 初始化限流器 limiter = Limiter(key_func=get_remote_address) # 简单的内存缓存 class SimpleCache: def __init__(self, max_size=1000, ttl=300): self.cache = {} self.max_size = max_size self.ttl = ttl # 缓存存活时间(秒) def get_key(self, data: dict) -> str: """生成缓存键""" data_str = json.dumps(data, sort_keys=True) return hashlib.md5(data_str.encode()).hexdigest() def get(self, key: str): """获取缓存""" if key in self.cache: value, timestamp = self.cache[key] if time.time() - timestamp < self.ttl: return value else: del self.cache[key] # 过期删除 return None def set(self, key: str, value): """设置缓存""" if len(self.cache) >= self.max_size: # 简单策略:删除最旧的一个 oldest_key = next(iter(self.cache)) del self.cache[oldest_key] self.cache[key] = (value, time.time()) def clear(self): """清空缓存""" self.cache.clear() # 创建缓存实例 cache = SimpleCache(max_size=500, ttl=600) # 最多500条,存活10分钟 # 缓存装饰器 def cached_rerank(func): """重排序结果缓存装饰器""" async def wrapper(request: RerankRequest): # 生成缓存键 cache_key_data = { "query": request.query, "documents": request.documents, "instruction": request.instruction } cache_key = cache.get_key(cache_key_data) # 检查缓存 cached_result = cache.get(cache_key) if cached_result is not None: logger.info(f"缓存命中: {cache_key[:10]}...") return cached_result # 执行实际计算 result = await func(request) # 存入缓存 cache.set(cache_key, result) logger.info(f"缓存设置: {cache_key[:10]}...") return result return wrapper # 在API端点应用缓存和限流 @app.post("/rerank") @limiter.limit("10/minute") # 每分钟10次 @cached_rerank async def rerank_documents(request: RerankRequest, request_state: Request): """带缓存和限流的重排序接口""" # 原有逻辑... pass5. 生产环境部署建议
5.1 使用Docker容器化
创建Dockerfile:
# Dockerfile FROM python:3.10-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["python", "app.py"]创建docker-compose.yml:
# docker-compose.yml version: '3.8' services: reranker-api: build: . ports: - "8000:8000" environment: - PYTHONUNBUFFERED=1 - MODEL_PATH=Qwen/Qwen3-Reranker-4B - MAX_WORKERS=4 - LOG_LEVEL=INFO volumes: - ./models:/app/models # 如果需要挂载本地模型 deploy: resources: reservations: devices: - driver: nvidia count: 1 capabilities: [gpu] restart: unless-stopped5.2 使用Nginx反向代理
创建Nginx配置:
# nginx.conf upstream reranker_backend { server localhost:8000; keepalive 32; } server { listen 80; server_name api.yourdomain.com; client_max_body_size 10M; location / { proxy_pass http://reranker_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_cache_bypass $http_upgrade; # 超时设置 proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; } # 健康检查 location /health { proxy_pass http://reranker_backend/health; access_log off; } }5.3 监控和日志
添加Prometheus监控:
# monitoring.py from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST from fastapi import Response import time # 定义指标 REQUEST_COUNT = Counter( 'reranker_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'] ) REQUEST_LATENCY = Histogram( 'reranker_request_duration_seconds', 'Request latency in seconds', ['method', 'endpoint'] ) REQUEST_INPUT_TOKENS = Histogram( 'reranker_input_tokens', 'Number of input tokens per request', buckets=[10, 50, 100, 500, 1000, 5000] ) # 监控中间件 @app.middleware("http") async def monitor_requests(request: Request, call_next): start_time = time.time() try: response = await call_next(request) # 记录请求 REQUEST_COUNT.labels( method=request.method, endpoint=request.url.path, status=response.status_code ).inc() # 记录延迟 latency = time.time() - start_time REQUEST_LATENCY.labels( method=request.method, endpoint=request.url.path ).observe(latency) return response except Exception as e: REQUEST_COUNT.labels( method=request.method, endpoint=request.url.path, status=500 ).inc() raise @app.get("/metrics") async def metrics(): """Prometheus指标端点""" return Response( content=generate_latest(), media_type=CONTENT_TYPE_LATEST )6. 客户端调用示例
6.1 Python客户端
# client.py import requests import json from typing import List, Optional class RerankerClient: """重排序API客户端""" def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None): self.base_url = base_url.rstrip("/") self.api_key = api_key self.session = requests.Session() if api_key: self.session.headers.update({"Authorization": f"Bearer {api_key}"}) def rerank(self, query: str, documents: List[str], instruction: Optional[str] = None, top_k: Optional[int] = None): """重排序文档""" payload = { "query": query, "documents": documents } if instruction: payload["instruction"] = instruction if top_k: payload["top_k"] = top_k response = self.session.post( f"{self.base_url}/rerank", json=payload, timeout=30 ) if response.status_code == 200: return response.json() else: raise Exception(f"API请求失败: {response.status_code} - {response.text}") def batch_rerank(self, pairs: List[dict]): """批量重排序""" payload = {"pairs": pairs} response = self.session.post( f"{self.base_url}/batch_rerank", json=payload, timeout=60 ) if response.status_code == 200: return response.json() else: raise Exception(f"API请求失败: {response.status_code} - {response.text}") def health_check(self): """健康检查""" response = self.session.get(f"{self.base_url}/health", timeout=5) return response.json() # 使用示例 if __name__ == "__main__": client = RerankerClient("http://localhost:8000") # 检查服务状态 health = client.health_check() print(f"服务状态: {health}") # 单次重排序 result = client.rerank( query="What is artificial intelligence?", documents=[ "AI is the simulation of human intelligence by machines.", "Machine learning is a key part of AI.", "The sky is blue.", "Python is great for AI development." ], instruction="Given a web search query, retrieve relevant passages that answer the query", top_k=2 ) print("重排序结果:") for doc, score in zip(result["ranked_documents"], result["ranked_scores"]): print(f" 分数: {score:.4f} - {doc[:50]}...")6.2 JavaScript/TypeScript客户端
// reranker-client.ts export interface RerankRequest { query: string; documents: string[]; instruction?: string; top_k?: number; } export interface RerankResponse { scores: number[]; ranked_documents: string[]; ranked_scores: number[]; } export class RerankerClient { private baseUrl: string; private apiKey?: string; constructor(baseUrl: string = 'http://localhost:8000', apiKey?: string) { this.baseUrl = baseUrl.replace(/\/$/, ''); this.apiKey = apiKey; } async rerank(request: RerankRequest): Promise<RerankResponse> { const headers: Record<string, string> = { 'Content-Type': 'application/json', }; if (this.apiKey) { headers['Authorization'] = `Bearer ${this.apiKey}`; } const response = await fetch(`${this.baseUrl}/rerank`, { method: 'POST', headers, body: JSON.stringify(request), }); if (!response.ok) { throw new Error(`API request failed: ${response.status} - ${await response.text()}`); } return response.json(); } async healthCheck(): Promise<any> { const response = await fetch(`${this.baseUrl}/health`); return response.json(); } } // 使用示例 async function example() { const client = new RerankerClient('http://localhost:8000'); try { // 健康检查 const health = await client.healthCheck(); console.log('Service health:', health); // 重排序 const result = await client.rerank({ query: 'What is deep learning?', documents: [ 'Deep learning uses neural networks with many layers.', 'Machine learning is a broader field.', 'The weather is nice today.', 'Neural networks are inspired by the human brain.' ], instruction: 'Given a web search query, retrieve relevant passages that answer the query', top_k: 3 }); console.log('Reranked results:'); result.ranked_documents.forEach((doc, index) => { console.log(` ${index + 1}. Score: ${result.ranked_scores[index].toFixed(4)} - ${doc.substring(0, 50)}...`); }); } catch (error) { console.error('Error:', error); } }7. 总结
通过这篇文章,我们从零开始构建了一个完整的Qwen3-Reranker-4B API服务。整个过程涵盖了从基础的环境准备、模型测试,到完整的FastAPI服务实现,再到生产环境的部署优化。
实际用下来,这套方案有几个明显的优点:部署简单,基本上跟着步骤走就能跑起来;性能也不错,特别是加上vLLM加速和缓存之后,响应速度能满足大部分场景的需求;扩展性也好,无论是添加新的端点还是集成到现有系统都很方便。
当然,在实际项目中可能还会遇到一些具体问题,比如如何根据业务场景定制指令(instruction)、如何处理超长文本、如何做AB测试等。这些都可以在现有框架基础上继续完善。
如果你刚开始接触这块,建议先按照教程把基础服务搭起来,跑通整个流程。熟悉了之后,再根据实际需求调整优化。比如,如果你的应用场景对延迟特别敏感,可以考虑进一步优化模型推理;如果需要处理大量并发请求,可以研究一下负载均衡和自动扩缩容。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。