- 先写一个fastapi 流式返回的接口
from fastapi import FastAPI from fastapi.responses import StreamingResponse import asyncio import time from typing import AsyncGenerator, Generator app = FastAPI(title="FastAPI 流式接口示例") # ------------------- 流式返回JSON数据(实战常用) ------------------- async def json_stream_generator(data_list: list, delay: float = 0.5): """异步生成器:逐条返回JSON格式数据""" for item in data_list: await asyncio.sleep(delay) # 每条数据返回JSON字符串(注意:流式JSON无需整体数组,逐行返回) json_str = f'{{"index": {item["index"]}, "content": "{item["content"]}"}}\n' yield json_str.encode("utf-8") @app.get("/stream/json")#==================这个接口进行测试. async def stream_json(): """流式返回JSON格式数据(模拟大模型分段响应)""" # 模拟大模型返回的分段数据 mock_data = [ {"index": 0, "content": "FastAPI"}, {"index": 1, "content": " 是一个高性能的"}, {"index": 2, "content": " Python Web框架"}, {"index": 3, "content": " 支持异步和流式输出"} ]*2 return StreamingResponse( json_stream_generator(mock_data), media_type="application/json" # 媒体类型指定为JSON ) if __name__ == "__main__": import uvicorn # 启动服务:默认端口8000,开启自动重载 uvicorn.run(app, host="0.0.0.0", port=8000)效果:
代码流式接受
import requests import json import requests url = "http://192.168.1.102:8000/stream/json" response = requests.get(url, stream=True) for line in response.iter_lines(): if line: print(line.decode("utf-8")) # 输出每行数据,确认是独立的JSON且末尾有\ndify如果不支持那么就dify写一个循环一直反复读取一个文件a.txt
然后我们的节点一直往这个文件里面写入.最后写一个结束符.
这样他一直读取,一直读到结束符.没读到结束符就再重新读整个文件.