模拟ChatGPT流式响应
模拟ChatGPT流式响应

模拟ChatGPT流式响应

看看你的

参考百度AI对话和siderAI:

百度的:

接口名称:https://chat-ws.baidu.com/aichat/api/conversation,POST请求

image-20230919162904032

image-20230919162647789

image-20230919162850495

SiderAI的:https://sider.ai/api/v1/completion/text,也是POST请求

image-20230919162925544

image-20230919162757351

可见格式各不相同,关键是前后台如何交互数据流。

流式返回简介

流式返回是一种将数据以流的形式传输到客户端的机制,与传统的一次性请求-响应模式不同。在ChatGPT中,流式返回使我们能够在模型生成文本的同时逐步将结果发送给客户端,实现实时的交互体验。

ChatGPT的流式返回基于服务器发送事件(Server-Sent EventsSSE)技术。SSE利用HTTP协议,在客户端与服务器之间建立持久性的单向连接,服务端与客户端建立了 长连接,服务器可以通过该连接向客户端发送任意数量的数据。服务端就相当于河流的上游,客户端就相当于河流的下游,水往低处流,这就是 SSE 的流式传输。

以下是 SSE 的基本工作原理:

  1. 客户端通过发送一个 HTTP 请求来建立 SSE 连接。
  2. 服务器在建立连接后保持该连接打开,并发送事件数据给客户端。
  3. 服务器使用 “Content-Type: text/event-stream” 头部来标识 SSE 连接,并使用特定格式的数据来发送事件给客户端。
  4. 客户端接收到事件后,可以使用 JavaScriptEventSource 接口来处理事件数据。

eventSource和websocket的区别

1)协议不同:WebSocket 使用的是一种双向通信协议,而 eventSource 使用的是一种单向通信协议。WebSocket 协议可以在客户端和服务器之间建立一个长连接,双方可以同时发送和接收消息,而 eventSource 只能由服务器向客户端发送消息。

2)数据格式不同:WebSocket 可以发送任何格式的数据,例如文本、二进制数据或 JSON,而 eventSource 只能发送文本格式的数据。

3)支持程度不同:WebSocket 是一种相对较新的技术,在一些旧的浏览器或网络环境下可能不被支持,而 eventSource 已经被广泛支持,可以在大多数现代浏览器中使用。

4)应用场景不同:WebSocket 更适合那些需要实时双向通信的应用,例如在线游戏或视频会议,而 eventSource 更适合那些需要从服务器获取实时信息的应用,例如股票行情或新闻推送

FastAPI简单模拟

Python使用FastAPI模拟流式响应的后端接口代码如下:

import asyncio
import uvicorn
from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse

app = FastAPI()

data = "我是一条样例数据"

@app.post("/data")
async def stream_data():
    async def generate():
        for char in data:
            yield f"data: {char}\n\n"
            await asyncio.sleep(0.5)  # 可选的延迟,控制流式输出的速度

    return StreamingResponse(generate(), media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

请求一下:

image-20230919165912847

大概是符合要求的。我们对代码做更详尽的封装:

import asyncio
import uvicorn
from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
app = FastAPI()

class Info(BaseModel):
    prompt: Optional[str]
    stream: Optional[bool]
    app_name: Optional[str]
    app_version: Optional[str]
    tz_name: Optional[str]
    cid: Optional[str]
    model: Optional[str]
    from_: Optional[str] = Field(..., alias="from")

data = "我是一条样例数据"
res = 'data:{"code":0,"msg":"MessageResponse","data":{"text":"","cid":"C05ZS77O0R3","req_message_id":"CM0B4U24Z8BM6","message_id":"CM0A4UWNKAD6M","total":30,"remain":17,"remain_period":"daily","extra_quota":0,"extra_total":0,"chat_model":"gpt3.5"}}'

@app.post("/data")
async def stream_data(info: Info):
    async def generate():
        yield res.replace('MessageResponse', '')
        for char in data:
            yield res.replace('MessageResponse', char)
            await asyncio.sleep(0.5)  # 可选的延迟,控制流式输出的速度
        yield f"data:[DONE]"

    return StreamingResponse(generate(), media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, log_level='debug')

前端代码

<body>
<button type="button" onclick="output()">输出文章</button>
<div id="message"></div>
<script>
    function output() {
        let source = new EventSource(
            'http://localhost:7869/data');
        let innerHTML = '';
        source.onmessage = function (e) {
            if (e.data == '[done]') {
                source.close();
            } else {
                innerHTML += e.data;
                document.getElementById("message").innerHTML = innerHTML;
            }
        };
    }
</script>
</body>

SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式。

但是问题是原生的EventSource 不能使用post方法,只能使用get方法,而且还不能自定义请求header,所以我们可以使用npm包:@microsoft/fetch-event-source

microsoft/fetch-event-source

代码示例:

import {fetchEventSource} from '@microsoft/fetch-event-source';

const controller = new AbortController()
const signal = controller.signal

fetchEventSource('请求的url', {
    method: 'POST',
    signal: signal,
    headers: {
        /* 请求头配置 */
    },
    body: JSON.stringify({
        /* 发送的内容 */
    }),
    onmessage() {
        let result= JSON.parse(msg.data);  // 得到的数据
    },
    onerror(err){
        // 必须抛出错误才会停止
        throw err
    }
})

不太好用,有大佬封装的:

Fetch模拟sse

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>fetchSSE Demo</title>
</head>
<body>
  <h1>fetchSSE Demo</h1>
  <input type="text" id="queryInput" placeholder="输入查询参数">
  <button onclick="connectFetch()">建立 fetchSSE 连接</button>
  <button onclick="closeSSE()">断开 fetchSSE 连接</button>
  <br />
  <br />
  <div id="message"></div>

  <script>
    const messageElement = document.getElementById('message')
    const addMessage = (text) => {
      const messageNode = document.createTextNode(text);
      messageElement.appendChild(messageNode);
      messageElement.appendChild(document.createElement('br')); // 添加换行
    }
    // 建立 FETCH-SSE 连接
    const connectFetch = () => {
      const queryInput = document.getElementById('queryInput').value;
      console.log(queryInput)
      let controller = new AbortController()
      fetchEventSource('/data', {
        method: 'POST',
        body: JSON.stringify({
          query: queryInput,
          history: [],
          data:[]
        }),
         headers: new Headers({
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*'
         }),
        signal: controller.signal,
        // mode: 'cors', // 设置跨域模式
        onopen: () => {
          messageElement.innerHTML += `FETCH 连接成功<br />`
        },
        onclose: () => {
          messageElement.innerHTML += `FETCH 连接关闭<br />`
        },
        onmessage: (event) => {
          const data = JSON.parse(event)
          console.log(data)
          addMessage(`${data.id} --- ${data.time} --- body参数:${JSON.stringify(data.body)}`);
          // messageElement.innerHTML += `${data.id} --- ${data.time} --- body参数:${JSON.stringify(data.body)}` + '<br />'
        },
        onerror: (e) => {
          console.log(e)
        }
      })
    }

    // 断开 FETCH-SSE 连接
    const closeSSE = () => {
      if (controller) {
        controller.abort()
        controller = undefined
        messageElement.innerHTML += `FETCH 连接关闭<br />`
      }
    }

    const fetchEventSource = (url, options) => {
      fetch(url, options)
        .then(response => {
          if (response.status === 200) {
            options.onopen && options.onopen()
            return response.body
          }
        })
        .then(rb => {
          const reader = rb.getReader()
            const push = () => {
              // done 为数据流是否接收完成,boolean
              // value 为返回数据,Uint8Array
              return reader.read().then(({done, value}) => {
                if (done) {
                  options.onclose && options.onclose()
                  return
                }
                options.onmessage && options.onmessage(new TextDecoder().decode(value))
                // 持续读取流信息
                return push()
              })
            }
            // 开始读取流信息
            return push()
        })
        .catch((e) => {
          options.error && options.error(e)
        })
    }
</script>

</body>
</html>

FastAPI代码

# coding:utf-8
import copy
import datetime
import json
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
import asyncio
from apps.getReport import get_report
from utils.log_util import log_config
from fastapi.responses import StreamingResponse
from starlette.middleware.cors import CORSMiddleware
from generate_report import ReportGenerator
from fastapi.staticfiles import StaticFiles
app = FastAPI()
report_generator = ReportGenerator(model_path="chatglm/chatglm2-6b")

class Message(BaseModel):
    query: str
    history: list
    data: list

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="./static"), name="static")

# @app.get("/data")
@app.post("/data")
async def stream_data(message: Message):
    output_data = {
        'id': 1,
        'time': str(datetime.datetime.now())[:19],
        'body': {
            "content": ""
        }
    }
    query = message.query if message.query else "不知天上宫阙,今夕是何年"
    history = message.history if message.history else []
    response_gen = report_generator.get_any_response(query=query, history=history)
    async def generate():
        for i in response_gen:
            result, history = i
            output = copy.deepcopy(output_data)
            output["body"]["content"] = result
            print(output)
            yield json.dumps(output, ensure_ascii=False)
            await asyncio.sleep(0.2)  # 可选的延迟,控制流式输出的速度

    return StreamingResponse(generate(), media_type="text/event-stream")

if __name__ == '__main__':
    import logging
    logger = logging.getLogger()
    uvicorn.run("report_fastapi_main:app", host="0.0.0.0", port=7869,
                log_config=log_config, log_level='debug')

参考链接

发表回复

您的电子邮箱地址不会被公开。