本文介绍ChatGPT使用的流式输出技术SSE(Server-Sent Events)的技术原理,通过代码示例,展示如何在Web应用程序中有效地使用SSE实现实时数据推送和流式输出。
一、背景介绍
当使用ChatGPT时,模型的回复不是一次性生成整个回答的,而是逐字逐句地生成。这是因为语言模型需要在每个时间步骤预测下一个最合适的单词或字符。如果等待整个回复生成后再输出到网页,会导致用户长时间等待,极大降低用户体验。
相反,逐字蹦出回复可以实现更快的交互响应。ChatGPT可以在输入消息后迅速开始生成回答的开头,并根据上下文逐渐细化回答。这种渐进式的呈现方式可以提供更流畅的对话体验,同时让用户知道模型正在工作,避免感觉像卡住了或没有响应。此外,逐字蹦出的回复还有助于用户跟踪模型的思考过程,看到它逐步构建回答的方式。这种可见的生成过程有助于用户理解模型是如何形成回答的,提高对话的透明度和可解释性。
那么,ChatGPT是用什么技术来实现流式输出的呢?
二、ChatGPT 流式输出原理
我们看一下ChatGPT的completion API的官方文档
其中有一个stream参数,其介绍如下:
可以看到,当stream设置为true时,将会使用SSE(Server-Sent Events)技术流式输出结果。我们curl调用一下。
curl -i -X POST -H 'Content-Type: application/json' -H 'Authorization: Bearer sk-************************************************' https://api.openai.com/v1/chat/completions -d '{"model":"gpt-3.5-turbo","messages":[{"role": "user", "content": "3+5=?"}],"temperature":0.8,"stream":true}'
结果如下:
HTTP/2 200
date: Fri, 08 Sep 2023 03:39:50 GMT
content-type: text/event-stream
access-control-allow-origin: *
cache-control: no-cache, must-revalidate
openai-organization: metaverse-cloud-pte-ltd-orfbgw
openai-processing-ms: 5
openai-version: 2020-10-01
strict-transport-security: max-age=15724800; includeSubDomains
x-ratelimit-limit-requests: 3500
x-ratelimit-limit-tokens: 90000
x-ratelimit-remaining-requests: 3499
x-ratelimit-remaining-tokens: 89980
x-ratelimit-reset-requests: 17ms
x-ratelimit-reset-tokens: 12ms
x-request-id: 96ff4efafed25a52fbedb6e5c7a3ab09
cf-cache-status: DYNAMIC
server: cloudflare
cf-ray: 80342aa96ae00974-HKG
alt-svc: h3=":443"; ma=86400
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"3"},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" +"},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"5"},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" ="},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"8"},"finish_reason":null}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
可以看到,响应头的类型是content-type: text/event-stream,表示这个响应是文本流输出,然后响应体每次都以data: 开头,后面携带的是一个json数据,最后以data: [DONE]作为结束标志。
ChatGPT流式输出的耗时比非流式输出耗时长,如果不需要打字机效果,建议使用非流式
三、SSE技术介绍
SSE (Server-Sent Events) 技术是一种用于实现服务器主动推送数据给客户端的通信协议。相比传统的请求-响应模式,SSE 提供了一种持久连接,允许服务器随时向客户端发送事件和数据,实现了实时性的消息传递。
SSE 的工作原理非常简单直观。客户端通过与服务器建立一条持久化的 HTTP 连接,然后服务器使用该连接将数据以事件流(event stream)的形式发送给客户端。这些事件流由多个事件(event)组成,每个事件包含一个标识符、类型和数据字段。客户端通过监听事件流来获取最新的数据,并在接收到事件后进行处理。
与 WebSocket 技术相比,SSE 使用的是基于 HTTP 的长轮询机制,而不需要建立全双工的网络连接。这使得 SSE 更容易在现有的基础设施上部署,无需特殊的代理或中间件支持。另外,SSE 能够与现有的 Web 技术(如 AJAX 和 RESTful API)很好地集成,同时也更适合传输较少频繁更新的数据。
SSE 的优点包括:
● 实时性:SSE 允许服务器主动将数据推送给客户端,实现实时更新和通知。
● 简单易用:SSE 基于标准的 HTTP 协议,无需额外的库或协议转换。
● 可靠性:SSE 使用 HTTP 连接,兼容性好,并能通过处理连接断开和错误情况来确保数据传输的可靠性。
● 轻量级:与 WebSocket 相比,SSE 不需要建立全双工连接,减少了通信的开销和服务器负载。
然而,SSE 也有一些限制。由于 SSE 基于 HTTP 长轮询机制,每个请求都需要建立和维护一个持久化连接,这可能导致较高的资源消耗。此外,SSE 适用于单向通信,即服务器向客户端发送数据,而客户端无法向服务器发送消息。
综上所述,SSE 技术提供了一种简单、实时的服务器推送数据给客户端的方法,适用于需要实现实时更新和通知的应用场景。它在 Web 开发中具有广泛的应用,可用于构建聊天应用、实时监控系统等,并为开发人员带来便利和灵活性。
四、SSE前端实践
在本章中,我们将探讨如何使用JavaScript的EventSource来实现流式输出。EventSource是一种支持服务器发送事件(Server-Sent Events)的API,它提供了一种简单而有效的方式来接收服务器端的实时数据。
首先,我们需要在前端创建一个EventSource对象,并指定要连接的服务器端URL。下面是一个示例代码:
const eventSource = new EventSource('/stream'); // '/stream' 是服务器端提供SSE的端点
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data); // 解析接收到的数据
// 在这里处理数据,例如更新页面内容或执行其他操作
};
eventSource.onerror = function(event) {
if (event.readyState === EventSource.CLOSED) {
// 处理连接关闭的情况,例如显示错误信息或重新连接
console.log('Connection closed.');
} else {
// 处理其他错误,例如网络问题
console.error('EventSource error:', event);
}
};
上述代码中,我们通过指定'/stream'作为EventSource的URL来连接到服务器端的SSE端点。当服务器端有新数据发送时,onmessage回调函数将被触发,我们可以在这里处理接收到的数据。常见的处理方式包括更新页面内容、添加新元素或执行其他操作。
另外,onerror回调函数用于处理连接错误的情况。当连接关闭时,我们可以相应地处理,例如显示错误信息或尝试重新连接。对于其他类型的错误,我们可以在控制台打印错误消息以便调试。
在流式输出中,如何判断数据流何时结束是一个重要问题。在SSE中,当服务器端关闭连接时会触发onerror回调函数,我们可以在这里处理连接关闭的情况。此外,我们还可以在服务器端发送特定的标识来表示数据流的结束,然后在前端进行相应的处理。
如果在连接过程中出现网络问题或其他错误,onerror回调函数也会被触发。在这种情况下,我们可以根据具体的错误处理机制来决定如何处理,例如重新连接或显示错误信息。
通过使用EventSource库,我们可以轻松地在前端实现与服务器端的流式通信。无论是实时聊天、实时数据更新还是其他需要实时性的应用场景,SSE都提供了一种简单且可靠的解决方案。在实践中,我们可以根据具体的需求和业务逻辑,灵活地使用SSE来实现各种流式输出的功能。
使用EventSource需要注意以下问题:
1. 结束标识
服务器端应发送特定的标识来表示数据流的结束,然后前端调用close关闭EventSource。如果不这么做的话,当服务端发送完数据关闭连接后,EventSource默认会自动重新连接。
2. 只支持GET
url可以携带一些简单的查询参数,如果要传输复杂的请求体,可以考虑两次请求的方案。先通过普通的HTTP POST/PUT请求,将请求体传送到服务端。服务端将请求体缓存起来,并返回一个能唯一标识的票据,前端最后使用EventSource在url中带上票据,服务端根据票据从缓存里取出请求体。
3. 不支持自定义Header
接口如果需要鉴权,无法在Header里定义Authorization请求头,那么建议使用Cookie来标识用户,EventSource请求会携带Cookie。
五、SSE Java 实践
在Java领域,使用Spring WebFlux可以方便地实现服务器发送事件(Server-Sent Events,SSE)的流式输出。下面是一个代码示例,展示了如何使用Java Spring WebFlux库来实现SSE流式输出:
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class SSEController {
@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("data")
.data("Sample data " + sequence)
.build());
}
}
在上述代码中,我们创建了一个REST控制器(@RestController),其中/stream路径对应着我们的SSE流。streamData()方法返回一个Flux<ServerSentEvent<String>>对象,该对象代表了一个包含了多个Server-Sent Event的无限流。通过使用Flux.interval(Duration)方法,我们可以实现每秒向客户端发送一个事件。
在map操作中,我们将序列号转换为ServerSentEvent对象,该对象包含了需要发送给客户端的数据和相关的元信息。你可以根据自己的需求修改data字段中的数据。
在此实践中,我们通过使用WebFlux框架,充分利用了响应式编程模型。这种模型可以处理大量并发请求而无需阻塞线程,并且与Java 8的CompletableFuture等异步编程概念集成得很好。
为了避免连接中断,可以通过添加适当的错误处理机制来提高可靠性。例如,你可以使用doOnError方法来捕获异常并采取相应的措施,比如重新建立连接或记录错误日志。
通过以上的Java Spring WebFlux实践,你可以轻松地构建具有流式输出功能的SSE服务。这种方式适用于需要向客户端实时推送数据的应用场景,如实时聊天、股票行情推送等。
1. 如何保持连接
在实际的生产环境中,前端请求服务端可能会经过Nginx或其他网关来转发,如下图所示
由于Nginx和upstream服务的连接是池化的,连接有超时时间,超时后会关闭,如果服务端SSE长时间未下发消息,Nginx可能会关闭掉连接。这种情况可以通过修改Nginx upstream超时解决(影响所有请求,连接资源释放不及时),也可以通过心跳包来解决,每隔一段时间下发前后端约定好的无实质数据的心跳包。如下代码所示
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class SSEController {
@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("data")
.data("Sample data " + sequence)
.build());
}
@GetMapping("/stream-with-heart-beat")
public Flux<ServerSentEvent<String>> streamWithHeartBeat() {
return streamData().mergeWith(Flux.interval(Duration.ofSeconds(5))
.map(sequence -> ServerSentEvent.<String> builder()
.event("data")
.data("Continue")
.build()));
}
}
六、SSE Python 实践
在Python中实现流式输出可以通过使用SSE(Server-Sent Events)技术来实现。SSE是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送数据。下面将介绍如何在Python中实践SSE。
首先,我们需要使用一个轻量级的Web框架,比如Flask,来构建我们的Python应用程序。Flask提供了简单易用的工具和库,适合用于快速开发Web应用。另外,为了支持SSE,我们还需要使用一个相关的Python库,比如flask-sse。
首先,安装所需的库。可以使用pip命令执行以下操作:
pip install flask flask-sse
接下来,创建一个Flask应用,并配置SSE相关的路由。以下是一个简单的示例代码:
from flask import Flask, render_template
from flask_sse import sse
app = Flask(__name__)
app.register_blueprint(sse, url_prefix="/stream")
@app.route("/")
def index():
return render_template("index.html")
@app.route("/send_message")
def send_message():
sse.publish({"message": "Hello, SSE!"}, type="message")
return "Message sent!"
if __name__ == "__main__":
app.run(debug=True)
在上面的代码中,我们创建了一个Flask应用,并使用flask_sse.sse注册了一个SSE蓝图,将其URL前缀设置为/stream。我们还定义了两个路由,一个用于渲染主页(/),另一个用于发送SSE消息(/send_message)。
在send_message路由中,我们使用sse.publish方法发送一条SSE消息。这里我们发送了一个包含"Hello, SSE!"信息的消息,并指定了类型为"message"。
通过以上的实践,你可以在Python中实现SSE流式输出。这种方法可以用于实时推送数据、更新通知、日志记录等应用场景。记得在实际应用中处理连接中断、错误处理等异常情况,以确保流式输出的稳定性和可靠性。
七、实时消息推送实践
本章节描述如何使用Redis pub/sub频道结合SSE技术,实现将消息推送给指定用户,具体流程如下图所示
1. Redis pub/sub频道
1. 发布消息:客户端可以通过指定频道名称将消息发布到Redis服务器。任何订阅了该频道的客户端都将接收到这条消息。消息可以是任何字符串,不限于特定格式或内容。
2. 订阅频道:客户端可以订阅一个或多个频道来接收发布到这些频道的消息。可以动态地订阅和取消订阅频道。当有新消息发布到订阅的频道时,客户端将立即接收到该消息。
3. 多播消息:Redis支持将消息同时发布到多个频道,这称为多播消息。客户端可以选择发布消息到多个频道,从而实现在不同的频道间广播消息。
4. 消息持久化:Redis pub/sub频道本身不保存消息的状态。如果某个客户端在订阅之前发布了消息,订阅后无法获取此前的消息。因此,如果需要消息的持久化存储,可以考虑使用Redis的其他特性,如列表数据类型(List)或Stream类型。
5. 无阻塞通信:Redis pub/sub频道采用异步非阻塞的通信方式。发布者向频道发布消息后,不会阻塞并等待订阅者接收消息。这种模式下,发布和订阅是异步的,各自独立进行。
Redis pub/sub频道非常适合构建实时消息系统、发布/订阅模型以及事件驱动架构。通过使用该功能,可以实现高效的消息传递和实时数据更新。
2. 优点 - 高性能高吞吐量
Redis是一个高性能的内存数据库,它能够处理大量的并发请求。使用Redis pub/sub频道可以实现快速的消息发布和订阅,从而支持高吞吐量的实时消息传递。
3. 缺点 - 低可用性
Redis pub/sub频道不提供持久化消息的功能,在异常情况下可能会漏消息,需要结合历史消息查询、消息唯一ID去重、离线消息存储等技术来保证可用性。
极光笔记|EngageLab AppPush激活亚太地区推送通知的潜力
鸿蒙平台消息推送的那些事
极光笔记丨为什么你的App通知关闭率那么高?几个防止过度推送的有效方式
极光笔记丨提升 App Push 通知开启率必须要做的几件事
AIGC|Agentbot的构建实践