前言
在传统的客户端与服务端的数据交互,常见的两种交互方式是:
- 客户端发起
Ajax
请求后,服务端将数据一次性返回; - 客户端与服务端开启
Socket
,建立即时消息通信。
随着 ChatGPT AI 的诞生,一种服务端 response EventStream 响应流数据 Stream
方式被广泛推广。
这种交互方式,会将你想要从网络接受的资源分成一个个小的分块,优势在于:一次获取数据量很大的请求,可以拆分成多个分块依次给客户端去做渲染,避免消耗太长时间处理完整的数据,从而优化数据响应给客户端的速度。
一、熟悉 Stream 的响应和接收
1)服务端响应 Stream
当服务端设置 response headers Content-Type
为 "text/event-stream"
,表示数据将以「响应流」的方式,通过 HTTP 响应给客户端。
下面是一个 Node
服务端的 response stream 实例,将数据分块为 10 次响应给客户端:
// server.js
const http = require('http');
const server = http.createServer((req, res) => {
// 设置响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream', // 启用 Stream
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
// 跨域处理
'Access-Control-Allow-Origin': '*', // 允许任何域访问
'Access-Control-Allow-Headers': '*', // 允许携带任何请求头信息
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE',
'Access-Control-Max-Age': 86400
});
let step = 0;
// 模拟每秒钟发送一条消息
let timer = setInterval(() => {
const data = {
message: 'Hello, world!',
timestamp: Date.now(),
step: ++ step,
};
if (step === 10) {
clearInterval(timer);
// 最后一项数据,使用 end 来发送
res.end(`data: ${JSON.stringify(data)}\n\n`);
} else {
// 分块推流
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
}, 100);
});
server.listen(5000, () => {
console.log('Server is running on http://localhost:5000');
});
2)浏览器接收 Stream
在浏览器客户端,Fetch 可以轻松实现「响应流」的数据接收。一个简单的 Fetch 处理 EventStream 的例子如下:
// client.js
const fetchStream = async (url, options) => {
const response = await fetch(url, options);
const reader = response.body.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) {
console.log('done.');
break; // 读取完毕
} else {
console.log('message: ', new TextDecoder().decode(value));
}
}
}
fetchStream('http://127.0.0.1:5000/stream', { method: 'get', headers: { 'Content-Type': 'application/json' } });
小结,
- 1)服务端返回的 Stream,浏览器会识别为
ReadableStream
类型数据,执行getReader()
方法创建一个读取流队列,可以读取ReadableStream
上的每一个分块数据; - 2)通过循环调用 reader 的
read()
方法来读取每一个分块数据,它会返回一个Promise
对象,在Promise
中返回一个包含value
参数和done
参数的对象; - 3)
done
负责表明这个流是否已经读取完毕,若值为true
时表明流已经关闭,不会再有新的数据,此时result.value
的值为undefined
; - 4)
value
是一个Uint8Array
字节类型,可以通过TextDecoder
转换为文本字符串进行使用。
现在,我们对 Stream 流式响应已经有了初步了解,接下来我们对浏览器 fetch 进行二次封装,以此来满足业务场景使用。
二、封装 Fetch – 处理请求
相比较于 XMLHttpRequest
来说,fetch()
的写法简单又直观,只要在发起请求时将整个配置项传入就可以了,并且基于 Promise 链式调用实现。
一个简单的 fetch 请求示例如下:
const response = await fetch(url, {
method: 'POST', // *GET, POST, PUT, DELETE, etc.
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data) // body data type must match "Content-Type" header
});
其中第一个参数 url
是请求地址,第二个参数是一个 RequestInit
初始化配置对象,可以指定 method
、headers
、body
等配置。
1)创建一个 fetch 请求
我们采用面向对象方式创建一个构造函数,实例化对象时传入 fetch
请求配置:url、requestInit
,此外传入回调函数 onmessage
、ondone
、onerror
监听流式数据做逻辑处理。
构造函数的 constructor
实现如下:
interface IFetchStreamOptions {
url: string;
requestInit: RequestInit;
onmessage: (data: string[], index: number) => void;
ondone?: () => void;
onerror?: (response: Response) => void;
}
class FetchStream {
url: string;
requestInit: RequestInit;
onmessage: IFetchStreamOptions['onmessage'];
ondone: IFetchStreamOptions['ondone'];
onerror: IFetchStreamOptions['onerror'];
constructor(options: IFetchStreamOptions) {
this.url = options.url;
this.requestInit = options.requestInit;
this.onmessage = options.onmessage;
this.ondone = options.ondone;
this.onerror = options.onerror;
this.createFetchRequest();
}
...
}
接着调用 createFetchRequest
创建 fetch
请求,并读取每一块流内容通知给回调 onmessage
函数:
createFetchRequest() {
fetch(this.url, {
method: 'POST',
...this.requestInit
}).then(response => {
if (response.status === 200) {
return response.body!;
} else {
// fetch() 返回的 Promise 不会被标记为 reject,即使响应的 HTTP 状态码是 404 或 500
return Promise.reject(response);
}
}).then(async (readableStream) => {
// 1. 创建 reader 读取流队列
const reader = readableStream.getReader();
// 2. 记录流队列中分块的索引
let index: number = 0;
while (true) {
// 3. 读取分块数据,返回一个 Promise
// (如果分块可用,Promise 返回 { value: theChunk, done: false } 形式)
// (如果流已关闭,Promise 返回 { value: undefined, done: true } 形式)
const { value, done } = await reader.read();
if (done) { // 响应流处理完成
// 5. 流已关闭,执行外部结束逻辑
this.ondone?.();
break;
} else {
// 4. 将分块数据转换为 string 交给外部处理函数使用
const dataText = new TextDecoder().decode(value);
const data = dataText.split('\n\n').filter(Boolean) as string[]; // response 响应的消息可能存在多个,以 \n\n 分割
this.onmessage(data, index ++);
}
}
}).catch(response => {
// ... error 处理
this.onerror?.(response);
});
}
2)手动中断 fetch 请求
在 ChatGPT
AI 问答中,可以对当前回答进行中断。
我们知道 XMLHttpRequest
对象上有一个 abort()
方法,调用这个方法即可中断一个请求。此外 XHR 还有 onabort
事件,可以监听请求的中断并做出响应。
fetch
自身并没有这些功能,该如何实现中断请求?
可以借助 AbortController
API 来终止 Fetch 请求。
- 首先是创建一个新的 AbortController 对象实例
它会返回一个 AbortSignal 对象实例,它可以用来 with/abort 一个 Web(网络)请求。
class FetchStream {
...
+ controller: AbortController | null = null;
createFetchRequest() {
...
+ this.controller = new AbortController();
}
}
- 接着将 AbortSignal 作为选项传入到 Fetch 请求参数中,将 signal 和 controller 与 fetch 请求相关联
createFetchRequest() {
this.controller = new AbortController();
fetch(this.url, {
method: 'POST',
+ signal: this.controller.signal,
...this.requestInit
})
...
}
- 最后提供
abort()
方法 调用AbortController.abort()
可以中止一个尚未完成的 Web(网络)请求。
class FetchStream {
...
abort() {
if (this.controller) this.controller.abort();
}
}
3)超时中断 fetch 请求
XMLHttpRequest
对象上有一个 timeout
属性,为其赋值后若在指定时间请求还未完成,请求就会自动中断。此外 XHR 还有 ontimeout
事件,可以监听请求的超时中断并做出响应。
然而,Fetch
并没有像 XHR 具有 timeout 超时事件设定,不过我们可以借助 AbortController
+ setTimeout()
实现类似效果。
在创建请求时开启一个 setTimeout
计时器,若拿到请求结果清除计时器,否则计时器时间完成后,执行中断 fetch 请求。
class FetchStream {
...
+ timer: number = 0;
createFetchRequest() {
+ this.timeout(); // 开启超时计时器
...
fetch(this.url, {
method: 'POST',
signal: this.controller.signal,
...this.requestInit
}).then(response => {
+ clearTimeout(this.timer); // 拿到结果,清除 timeout 计时器
...
}
timeout(time: number = 60000) {
this.timer = window.setTimeout(() => {
this.abort();
this.ontimeout?.(); // 外部若传入了监听超时回调,类似 onmessage
}, time);
}
}
4)使用示例
FetchStream
已经封装完成,使用可以参考如下示例:
let content = ''; // 拼接每一个分块数据
const fetchStream = new FetchStream({
url: 'xxx',
requestInit: { // fetch 配置项
body: JSON.stringify(body),
headers,
},
onmessage: (dataList, index) => {
dataList.forEach(item => {
const dataStr = item.split("data:")[1];
const result = JSON.parse(dataStr);
console.log('--- cyl result: ', result, index, Date.now().toString().slice(9));
// 将每个分块数据进行拼接
content += result.data.content;
});
},
ondone: () => {
console.log('done!', content);
},
ontimeout: () => {
console.log('timeout!');
}
});
// 中断请求处理
// setTimeout(() => {
// fetchStream.abort();
// }, 500);