news 2026/4/23 6:33:22

DeOldify批量任务队列:Celery异步处理+Redis消息队列集成教程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DeOldify批量任务队列:Celery异步处理+Redis消息队列集成教程

DeOldify批量任务队列:Celery异步处理+Redis消息队列集成教程

1. 项目概述与需求分析

1.1 为什么需要批量处理

在实际的图像上色应用场景中,我们经常需要处理大量图片:

  • 老照片数字化修复项目,一次处理数百张家庭老照片
  • 影视制作公司需要批量处理历史影像资料
  • 档案馆、博物馆的文献数字化工作
  • 个人用户想要一次性处理整个相册

如果使用同步处理方式,用户需要等待每张图片处理完成才能上传下一张,体验极差。通过Celery和Redis实现异步任务队列,可以让用户一次性提交所有任务,系统在后台自动处理。

1.2 技术方案优势

Celery + Redis 组合的优势:

  • 异步处理:请求立即返回,任务后台执行
  • 任务队列:支持大量任务排队处理
  • 并发控制:可以控制同时处理的任务数量
  • 状态追踪:实时查看任务处理进度
  • 失败重试:自动重试失败的任务
  • 分布式扩展:可以轻松扩展多个工作节点

2. 环境准备与依赖安装

2.1 安装必要的Python包

首先确保已经安装了基础的DeOldify服务,然后安装异步处理所需的依赖:

pip install celery redis flower

2.2 安装并配置Redis

使用Docker安装Redis(推荐):

docker run -d --name redis-server -p 6379:6379 redis:alpine

或者使用系统包管理器安装:

# Ubuntu/Debian sudo apt-get install redis-server # CentOS/RHEL sudo yum install redis # 启动Redis服务 sudo systemctl start redis sudo systemctl enable redis

2.3 验证Redis连接

import redis # 测试Redis连接 r = redis.Redis(host='localhost', port=6379, db=0) try: r.ping() print("Redis连接成功!") except redis.ConnectionError: print("无法连接到Redis服务器")

3. Celery异步任务系统配置

3.1 创建Celery应用

创建celery_app.py文件:

from celery import Celery import os import sys import base64 from PIL import Image from io import BytesIO # 添加当前目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 创建Celery应用 celery_app = Celery( 'deoldify_worker', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0', include=['deoldify_tasks'] ) # 配置Celery celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, task_track_started=True, task_time_limit=300, # 5分钟超时 task_soft_time_limit=280, worker_concurrency=2, # 并发工作进程数,根据GPU内存调整 worker_prefetch_multiplier=1, task_acks_late=True, task_reject_on_worker_lost=True, )

3.2 定义异步任务

创建deoldify_tasks.py文件:

from celery_app import celery_app import requests import base64 import time from io import BytesIO from PIL import Image import os # DeOldify服务地址 DEOLDIFY_SERVICE = "http://localhost:7860" @celery_app.task(bind=True, name='tasks.colorize_image') def colorize_image_task(self, image_data, filename, task_id=None): """异步图像上色任务""" # 更新任务状态为开始 self.update_state( state='PROGRESS', meta={ 'status': '处理中', 'progress': 0, 'filename': filename, 'task_id': task_id or self.request.id } ) try: # 调用DeOldify服务 files = {'image': (filename, image_data)} response = requests.post(f"{DEOLDIFY_SERVICE}/colorize", files=files) # 更新进度 self.update_state( state='PROGRESS', meta={ 'status': '上色处理中', 'progress': 50, 'filename': filename, 'task_id': task_id or self.request.id } ) if response.status_code == 200: result = response.json() if result['success']: # 解码图片数据 img_data = base64.b64decode(result['output_img_base64']) # 更新进度为完成 self.update_state( state='PROGRESS', meta={ 'status': '完成', 'progress': 100, 'filename': filename, 'task_id': task_id or self.request.id } ) return { 'success': True, 'image_data': result['output_img_base64'], 'format': result['format'], 'filename': filename } else: raise Exception(f"上色失败: {result}") else: raise Exception(f"API调用失败: {response.status_code}") except Exception as e: # 更新状态为失败 self.update_state( state='FAILURE', meta={ 'status': f'失败: {str(e)}', 'progress': 0, 'filename': filename, 'task_id': task_id or self.request.id } ) raise

3.3 启动Celery Worker

创建启动脚本start_worker.sh

#!/bin/bash # 设置Python路径 export PYTHONPATH=/root/cv_unet_image-colorization:$PYTHONPATH # 启动Celery Worker celery -A celery_app worker \ --loglevel=info \ --concurrency=2 \ --hostname=deoldify_worker@%h \ --pool=solo \ --without-mingle \ --without-gossip

给脚本添加执行权限:

chmod +x start_worker.sh

4. 批量任务队列API实现

4.1 扩展Flask应用支持批量处理

修改或创建新的Flask应用来支持批量任务:

from flask import Flask, request, jsonify from celery.result import AsyncResult from deoldify_tasks import colorize_image_task import base64 import uuid import os from werkzeug.utils import secure_filename app = Flask(__name__) # 存储任务信息的临时字典(生产环境应使用Redis或数据库) task_store = {} @app.route('/batch/colorize', methods=['POST']) def batch_colorize(): """批量图像上色接口""" if 'images' not in request.files: return jsonify({'error': '没有上传图片文件'}), 400 files = request.files.getlist('images') if not files: return jsonify({'error': '没有上传图片文件'}), 400 # 创建批量任务ID batch_id = str(uuid.uuid4()) task_results = [] for file in files: if file.filename == '': continue # 读取文件内容 image_data = file.read() filename = secure_filename(file.filename) # 创建单个任务 task = colorize_image_task.apply_async( args=[image_data, filename], task_id=str(uuid.uuid4()) ) task_results.append({ 'task_id': task.id, 'filename': filename, 'status': '排队中' }) # 存储任务信息 task_store[batch_id] = { 'tasks': [tr['task_id'] for tr in task_results], 'filenames': {tr['task_id']: tr['filename'] for tr in task_results}, 'created_at': time.time() } return jsonify({ 'batch_id': batch_id, 'total_tasks': len(task_results), 'tasks': task_results }) @app.route('/batch/status/<batch_id>', methods=['GET']) def batch_status(batch_id): """获取批量任务状态""" if batch_id not in task_store: return jsonify({'error': '批量任务不存在'}), 404 batch_info = task_store[batch_id] tasks_status = [] for task_id in batch_info['tasks']: task_result = AsyncResult(task_id) task_status = { 'task_id': task_id, 'filename': batch_info['filenames'].get(task_id, '未知文件'), 'status': task_result.status, 'progress': 0 } if task_result.info and isinstance(task_result.info, dict): task_status['progress'] = task_result.info.get('progress', 0) task_status['status'] = task_result.info.get('status', task_result.status) tasks_status.append(task_status) # 计算总体进度 completed = sum(1 for ts in tasks_status if ts['status'] in ['SUCCESS', 'FAILURE']) total = len(tasks_status) overall_progress = (completed / total) * 100 if total > 0 else 0 return jsonify({ 'batch_id': batch_id, 'overall_progress': overall_progress, 'completed': completed, 'total': total, 'tasks': tasks_status }) @app.route('/task/result/<task_id>', methods=['GET']) def task_result(task_id): """获取单个任务结果""" task_result = AsyncResult(task_id) if task_result.successful(): result = task_result.result return jsonify({ 'success': True, 'status': 'SUCCESS', 'result': { 'filename': result['filename'], 'format': result['format'], 'image_data': result['image_data'] } }) elif task_result.failed(): return jsonify({ 'success': False, 'status': 'FAILURE', 'error': str(task_result.info) }), 500 else: return jsonify({ 'success': False, 'status': task_result.status, 'message': '任务尚未完成' }), 202

4.2 批量处理管理脚本

创建批量处理管理工具batch_manager.py

import requests import os import json from pathlib import Path import time class DeOldifyBatchClient: def __init__(self, base_url="http://localhost:8080"): self.base_url = base_url def upload_batch(self, folder_path): """上传整个文件夹的图片进行批量处理""" image_files = [] for ext in ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff', '*.webp']: image_files.extend(Path(folder_path).glob(ext)) if not image_files: print("文件夹中没有找到支持的图片文件") return None files = [] for img_path in image_files: files.append(('images', (img_path.name, open(img_path, 'rb'), 'image/jpeg'))) response = requests.post(f"{self.base_url}/batch/colorize", files=files) # 关闭所有文件 for _, (_, file_obj, _) in files: file_obj.close() if response.status_code == 200: return response.json() else: print(f"上传失败: {response.status_code} - {response.text}") return None def monitor_batch(self, batch_id, interval=5): """监控批量任务进度""" while True: response = requests.get(f"{self.base_url}/batch/status/{batch_id}") if response.status_code == 200: status = response.json() print(f"\r进度: {status['overall_progress']:.1f}% " f"({status['completed']}/{status['total']})", end='') if status['overall_progress'] >= 100: print("\n批量处理完成!") break else: print(f"\n监控失败: {response.status_code}") break time.sleep(interval) def download_results(self, batch_id, output_folder): """下载批量处理结果""" # 先获取批量任务状态 status_response = requests.get(f"{self.base_url}/batch/status/{batch_id}") if status_response.status_code != 200: print("获取任务状态失败") return False status = status_response.json() # 创建输出文件夹 os.makedirs(output_folder, exist_ok=True) success_count = 0 for task in status['tasks']: if task['status'] == 'SUCCESS': # 获取任务结果 result_response = requests.get(f"{self.base_url}/task/result/{task['task_id']}") if result_response.status_code == 200: result = result_response.json() if result['success']: # 解码并保存图片 image_data = base64.b64decode(result['result']['image_data']) output_path = os.path.join(output_folder, f"colored_{task['filename']}") with open(output_path, 'wb') as f: f.write(image_data) success_count += 1 print(f"已保存: {output_path}") print(f"成功下载 {success_count} 张图片") return True # 使用示例 if __name__ == "__main__": client = DeOldifyBatchClient() # 上传批量处理 result = client.upload_batch("./black_white_photos") if result: batch_id = result['batch_id'] print(f"批量任务已创建: {batch_id}") # 监控进度 client.monitor_batch(batch_id) # 下载结果 client.download_results(batch_id, "./colored_results")

5. 系统部署与监控

5.1 使用Supervisor管理Celery Worker

创建Supervisor配置文件/etc/supervisor/conf.d/celery_worker.conf

[program:celery_worker] directory=/root/cv_unet_image-colorization command=/usr/bin/celery -A celery_app worker --loglevel=info --concurrency=2 --hostname=worker1@%%h user=root numprocs=1 stdout_logfile=/root/cv_unet_image-colorization/logs/celery_worker.log stderr_logfile=/root/cv_unet_image-colorization/logs/celery_worker_error.log autostart=true autorestart=true startsecs=10 stopwaitsecs=600 killasgroup=true priority=999 environment= PYTHONPATH="/root/cv_unet_image-colorization", CELERY_BROKER_URL="redis://localhost:6379/0", CELERY_RESULT_BACKEND="redis://localhost:6379/0"

5.2 使用Flower监控Celery任务

安装并启动Flower监控面板:

pip install flower celery -A celery_app flower --port=5555

或者创建Supervisor配置来管理Flower:

[program:celery_flower] directory=/root/cv_unet_image-colorization command=/usr/bin/celery -A celery_app flower --port=5555 user=root autostart=true autorestart=true startsecs=10 stdout_logfile=/root/cv_unet_image-colorization/logs/flower.log stderr_logfile=/root/cv_unet_image-colorization/logs/flower_error.log

5.3 批量处理服务部署脚本

创建完整的部署脚本deploy_batch_service.sh

#!/bin/bash # 批量处理服务部署脚本 echo "开始部署DeOldify批量处理服务..." # 1. 安装依赖 echo "安装Python依赖..." pip install celery redis flower # 2. 启动Redis echo "启动Redis服务..." if ! docker ps | grep -q "redis-server"; then docker run -d --name redis-server -p 6379:6379 redis:alpine echo "Redis容器已启动" else echo "Redis容器已在运行中" fi # 3. 创建必要的目录 echo "创建日志目录..." mkdir -p /root/cv_unet_image-colorization/logs # 4. 复制配置文件 echo "配置Supervisor..." cp celery_worker.conf /etc/supervisor/conf.d/ cp celery_flower.conf /etc/supervisor/conf.d/ # 5. 重新加载Supervisor配置 echo "重新加载Supervisor..." supervisorctl reread supervisorctl update # 6. 启动服务 echo "启动Celery Worker和Flower..." supervisorctl start celery_worker supervisorctl start celery_flower echo "部署完成!" echo "Celery Worker状态: supervisorctl status celery_worker" echo "Flower监控面板: http://localhost:5555"

6. 使用示例与测试

6.1 批量处理测试脚本

创建测试脚本test_batch_processing.py

import requests import time import os from pathlib import Path def test_batch_processing(): """测试批量处理功能""" # 准备测试图片 test_images = [] for i in range(3): # 测试3张图片 # 创建简单的测试图片 from PIL import Image, ImageDraw img = Image.new('RGB', (100, 100), color=(i*50, i*50, i*50)) draw = ImageDraw.Draw(img) draw.text((10, 10), f"Test {i+1}", fill=(255, 255, 255)) filename = f"test_{i+1}.jpg" img.save(filename) test_images.append(filename) # 上传批量处理 files = [] for img_file in test_images: files.append(('images', (img_file, open(img_file, 'rb'), 'image/jpeg'))) print("上传图片进行批量处理...") response = requests.post('http://localhost:8080/batch/colorize', files=files) # 关闭文件 for _, (_, file_obj, _) in files: file_obj.close() if response.status_code == 200: batch_info = response.json() batch_id = batch_info['batch_id'] print(f"批量任务创建成功,ID: {batch_id}") # 监控进度 while True: status_response = requests.get(f'http://localhost:8080/batch/status/{batch_id}') if status_response.status_code == 200: status = status_response.json() print(f"\r处理进度: {status['overall_progress']:.1f}%", end='') if status['overall_progress'] >= 100: print("\n处理完成!") break else: print("获取进度失败") break time.sleep(2) # 清理测试文件 for img_file in test_images: os.remove(img_file) return True else: print(f"上传失败: {response.status_code} - {response.text}") return False if __name__ == "__main__": test_batch_processing()

6.2 性能优化建议

根据实际测试情况调整配置:

# 在celery_app.py中根据硬件调整这些参数 celery_app.conf.update( worker_concurrency=2, # 根据GPU内存调整,通常1-4 worker_prefetch_multiplier=1, # 避免内存溢出 task_time_limit=300, # 单任务超时时间 task_serializer='json', result_serializer='json', )

7. 总结与扩展建议

通过本教程,我们成功实现了:

  1. 异步任务处理:使用Celery将耗时的图像上色任务转为后台异步执行
  2. 消息队列:使用Redis作为消息代理,支持任务排队和状态跟踪
  3. 批量处理API:提供了完整的批量上传、状态查询、结果下载接口
  4. 监控管理:集成Flower监控面板,方便查看任务状态
  5. 生产级部署:使用Supervisor管理进程,确保服务稳定性

7.1 进一步优化建议

  1. 数据库集成:使用数据库存储任务状态,避免内存丢失
  2. 分布式扩展:部署多个Celery Worker节点提高处理能力
  3. 限流控制:添加API限流防止滥用
  4. 结果缓存:对处理结果进行缓存,避免重复处理
  5. 进度通知:集成WebSocket实时推送处理进度

7.2 生产环境注意事项

  1. Redis持久化:配置Redis数据持久化,防止任务丢失
  2. 监控告警:设置任务失败告警机制
  3. 日志管理:完善的日志记录和分析
  4. 资源隔离:为Celery Worker设置资源限制
  5. 备份策略:定期备份任务数据和配置

现在你已经拥有了一个完整的DeOldify批量处理系统,可以高效处理大量图像上色任务,为用户提供更好的体验!


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 6:25:50

教育领域的变革:个性化 AI 导师 Agent

教育领域的变革:个性化 AI 导师 Agent 1. 引入与连接(唤起兴趣与建立关联) 1.1 引人入胜的开场:一场跨越时空的课堂对话 1925年,苏联教育心理学家维果茨基(Lev Vygotsky)坐在莫斯科国立大学的教室里,对着一群未来的教育者写下了《教育心理学》手稿中的那句核心论断—…

作者头像 李华
网站建设 2026/4/23 6:23:28

别再傻傻分不清了!MATLAB矩阵运算的点乘(.*)和矩阵乘(*)到底啥区别?

MATLAB矩阵运算深度解析&#xff1a;元素级操作与矩阵级操作的本质差异 引言&#xff1a;为什么我们需要区分这两种运算&#xff1f; 在MATLAB的世界里&#xff0c;矩阵运算就像是一把瑞士军刀&#xff0c;功能强大但需要正确使用。许多初学者在使用MATLAB进行科学计算或工程仿…

作者头像 李华
网站建设 2026/4/23 6:16:32

Docker技术入门与实战【2.3】

第13章 编程语言本章主要介绍如何使用Docker快速部署主流编程语言的开发环境及其常用框架&#xff0c;包括C、C、Java、PHP、Python、Perl、Ruby、JavaScript、Ruby等。其中&#xff0c;笔者将重点介绍常用Web编程语言PHP的Docker使用。13.1 PHP13.1.1 PHP技术栈PHP是一种广泛使…

作者头像 李华
网站建设 2026/4/23 6:13:32

DeepLabv3+图像分割实战:从环境配置到生产部署

1. 深度学习图像分割与DeepLab概述在计算机视觉领域&#xff0c;图像分割一直是最具挑战性的任务之一。与简单的物体检测不同&#xff0c;分割需要精确到像素级别的分类&#xff0c;这对算法的精度和效率都提出了更高要求。DeepLab作为Google团队开发的系列模型&#xff0c;通过…

作者头像 李华