Fetch 实现 ChatGPT Stream 流的方式处理网络请求

前言

在传统的客户端与服务端的数据交互,常见的两种交互方式是:

  1. 客户端发起 Ajax 请求后,服务端将数据一次性返回;
  2. 客户端与服务端开启 Socket,建立即时消息通信。

随着 ChatGPT AI 的诞生,一种服务端 response EventStream 响应流数据 Stream 方式被广泛推广。

这种交互方式,会将你想要从网络接受的资源分成一个个小的分块,优势在于:一次获取数据量很大的请求,可以拆分成多个分块依次给客户端去做渲染,避免消耗太长时间处理完整的数据,从而优化数据响应给客户端的速度。

一、熟悉 Stream 的响应和接收

1)服务端响应 Stream

当服务端设置 response headers Content-Type"text/event-stream",表示数据将以「响应流」的方式,通过 HTTP 响应给客户端。

下面是一个 Node 服务端的 response stream 实例,将数据分块为 10 次响应给客户端:

// server.js
const http = require('http');

const server = http.createServer((req, res) => {
  // 设置响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream', // 启用 Stream
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    // 跨域处理
    'Access-Control-Allow-Origin': '*', // 允许任何域访问
    'Access-Control-Allow-Headers': '*', // 允许携带任何请求头信息
    'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE',
    'Access-Control-Max-Age': 86400
  });


  let step = 0;
  // 模拟每秒钟发送一条消息
  let timer = setInterval(() => {
    const data = {
      message: 'Hello, world!',
      timestamp: Date.now(),
      step: ++ step,
    };
    if (step === 10) {
      clearInterval(timer);
      // 最后一项数据,使用 end 来发送
      res.end(`data: ${JSON.stringify(data)}\n\n`);
    } else {
      // 分块推流
      res.write(`data: ${JSON.stringify(data)}\n\n`);
    }

  }, 100);
});

server.listen(5000, () => {
  console.log('Server is running on http://localhost:5000');
});
2)浏览器接收 Stream

在浏览器客户端,Fetch 可以轻松实现「响应流」的数据接收。一个简单的 Fetch 处理 EventStream 的例子如下:

// client.js
const fetchStream = async (url, options) => {
  const response = await fetch(url, options);
  const reader = response.body.getReader();

  while (true) {
    const { value, done } = await reader.read();
    if (done) {
      console.log('done.');
      break; // 读取完毕
    } else {
      console.log('message: ', new TextDecoder().decode(value));
    }
  }
}


fetchStream('http://127.0.0.1:5000/stream', { method: 'get', headers: { 'Content-Type': 'application/json' } });

小结,

  • 1)服务端返回的 Stream,浏览器会识别为 ReadableStream 类型数据,执行 getReader() 方法创建一个读取流队列,可以读取 ReadableStream 上的每一个分块数据;
  • 2)通过循环调用 reader 的 read() 方法来读取每一个分块数据,它会返回一个 Promise 对象,在 Promise 中返回一个包含 value 参数和 done 参数的对象;
  • 3)done 负责表明这个流是否已经读取完毕,若值为 true 时表明流已经关闭,不会再有新的数据,此时 result.value 的值为 undefined
  • 4)value 是一个 Uint8Array 字节类型,可以通过 TextDecoder 转换为文本字符串进行使用。

现在,我们对 Stream 流式响应已经有了初步了解,接下来我们对浏览器 fetch 进行二次封装,以此来满足业务场景使用。

二、封装 Fetch – 处理请求

相比较于 XMLHttpRequest 来说,fetch() 的写法简单又直观,只要在发起请求时将整个配置项传入就可以了,并且基于 Promise 链式调用实现。

一个简单的 fetch 请求示例如下:

const response = await fetch(url, {
  method: 'POST', // *GET, POST, PUT, DELETE, etc.
  headers: {
    'Content-Type': 'application/json'
  },
  body: JSON.stringify(data) // body data type must match "Content-Type" header
});

其中第一个参数 url 是请求地址,第二个参数是一个 RequestInit 初始化配置对象,可以指定 methodheadersbody 等配置。

1)创建一个 fetch 请求

我们采用面向对象方式创建一个构造函数,实例化对象时传入 fetch 请求配置:url、requestInit,此外传入回调函数 onmessageondoneonerror 监听流式数据做逻辑处理。

构造函数的 constructor 实现如下:

interface IFetchStreamOptions {
  url: string;
  requestInit: RequestInit;
  onmessage: (data: string[], index: number) => void;
  ondone?: () => void;
  onerror?: (response: Response) => void;
}

class FetchStream {
  url: string;
  requestInit: RequestInit;
  onmessage: IFetchStreamOptions['onmessage'];
  ondone: IFetchStreamOptions['ondone'];
  onerror: IFetchStreamOptions['onerror'];

  constructor(options: IFetchStreamOptions) {
    this.url = options.url;
    this.requestInit = options.requestInit;
    this.onmessage = options.onmessage;
    this.ondone = options.ondone;
    this.onerror = options.onerror;
    this.createFetchRequest();
  }

  
  ...
}

接着调用 createFetchRequest 创建 fetch 请求,并读取每一块流内容通知给回调 onmessage 函数:

createFetchRequest() {

  fetch(this.url, { 
    method: 'POST', 
    ...this.requestInit 
  }).then(response => {
    if (response.status === 200) {
      return response.body!;
    } else { 
      // fetch() 返回的 Promise 不会被标记为 reject,即使响应的 HTTP 状态码是 404 或 500
      return Promise.reject(response);
    }
  }).then(async (readableStream) => {
    // 1. 创建 reader 读取流队列
    const reader = readableStream.getReader();
    // 2. 记录流队列中分块的索引
    let index: number = 0;
    while (true) {
      // 3. 读取分块数据,返回一个 Promise
      // (如果分块可用,Promise 返回 { value: theChunk, done: false } 形式)
      // (如果流已关闭,Promise 返回 { value: undefined, done: true } 形式)
      const { value, done } = await reader.read();
      if (done) { // 响应流处理完成
        // 5. 流已关闭,执行外部结束逻辑
        this.ondone?.();
        break;
      } else {
        // 4. 将分块数据转换为 string 交给外部处理函数使用
        const dataText = new TextDecoder().decode(value);
        const data = dataText.split('\n\n').filter(Boolean) as string[]; // response 响应的消息可能存在多个,以 \n\n 分割
        this.onmessage(data, index ++);
      }
    }

  }).catch(response => {
    // ... error 处理
    this.onerror?.(response);
  });
}

2)手动中断 fetch 请求

ChatGPT AI 问答中,可以对当前回答进行中断。

我们知道 XMLHttpRequest 对象上有一个 abort() 方法,调用这个方法即可中断一个请求。此外 XHR 还有 onabort 事件,可以监听请求的中断并做出响应。

fetch 自身并没有这些功能,该如何实现中断请求?

可以借助 AbortController API 来终止 Fetch 请求。

  1. 首先是创建一个新的 AbortController 对象实例

它会返回一个 AbortSignal 对象实例,它可以用来 with/abort 一个 Web(网络)请求。

class FetchStream {


  ...


+ controller: AbortController | null = null;


  createFetchRequest() {

    ...
+   this.controller = new AbortController();
  }
}
  1. 接着将 AbortSignal 作为选项传入到 Fetch 请求参数中,将 signal 和 controller 与 fetch 请求相关联
createFetchRequest() {

  this.controller = new AbortController();
  
  fetch(this.url, { 
    method: 'POST', 
+   signal: this.controller.signal, 
    ...this.requestInit 
  })
  ...
}
  1. 最后提供 abort() 方法 调用 AbortController.abort() 可以中止一个尚未完成的 Web(网络)请求。
class FetchStream {


  ...


  abort() {
    if (this.controller) this.controller.abort();
  }
}

3)超时中断 fetch 请求

XMLHttpRequest 对象上有一个 timeout 属性,为其赋值后若在指定时间请求还未完成,请求就会自动中断。此外 XHR 还有 ontimeout 事件,可以监听请求的超时中断并做出响应。

然而,Fetch 并没有像 XHR 具有 timeout 超时事件设定,不过我们可以借助 AbortController + setTimeout() 实现类似效果。

在创建请求时开启一个 setTimeout 计时器,若拿到请求结果清除计时器,否则计时器时间完成后,执行中断 fetch 请求。

class FetchStream {


  ...


+ timer: number = 0;


  createFetchRequest() {

+   this.timeout(); // 开启超时计时器
    ...
    
    fetch(this.url, { 
      method: 'POST', 
      signal: this.controller.signal, 
      ...this.requestInit 
    }).then(response => {
+     clearTimeout(this.timer); // 拿到结果,清除 timeout 计时器
      ...
  }
  
  timeout(time: number = 60000) {
    this.timer = window.setTimeout(() => {
      this.abort();
      this.ontimeout?.(); // 外部若传入了监听超时回调,类似 onmessage
    }, time);
  }

}

4)使用示例

FetchStream 已经封装完成,使用可以参考如下示例:

let content = ''; // 拼接每一个分块数据
const fetchStream = new FetchStream({
  url: 'xxx',
  requestInit: { // fetch 配置项
    body: JSON.stringify(body),
    headers,
  },
  onmessage: (dataList, index) => {
    dataList.forEach(item => {
      const dataStr = item.split("data:")[1];
      const result = JSON.parse(dataStr);
      console.log('--- cyl result: ', result, index, Date.now().toString().slice(9));
      // 将每个分块数据进行拼接
      content += result.data.content;
    });
  },
  ondone: () => {
    console.log('done!', content);
  },
  ontimeout: () => {
    console.log('timeout!');
  }
});

// 中断请求处理
// setTimeout(() => {
//   fetchStream.abort();
// }, 500);

参考资料

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYeVupyX' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片