~~~ L O A D I N G ~~~~~ L O A D I N G ~~~~~ L O A D I N G ~~~

Fetch & Streams API

Mar 20, 2024 Max Lee

本篇將嘗試使用 Fetch Stream 實踐 XMLHttpRequest 的 aborttimeout,以及將大型資料以流(Stream)的方式獲取。


XMLHttpRequest

XMLHttpRequest 是在 AJAX(Asynchronous JavaScript And XML)開始盛行的當時,網頁工程師最主要用來獲取伺服器資料最主要的手段,直到 Fetch 釋出之後(2015 瀏覽器開始實裝),才開始被大家拋棄並遺忘。

其實在 Fetch 出現之前,可能也沒什麼人在裸寫 XMLHttpRequest,因為它的寫法稍嫌繁瑣、分散,不易管理,所以大部份人可能會使用 jQuery 的 $.ajax 或 Axios 這類將 XMLHttpRequest 封裝的比較方便的工具。

雖然話是這麼說,其實 XMLHttpRequest 也是有比 Fetch 方便的地方,例如 XMLHttpRequest 對於「取消請求」或「請求逾時」都有對應的 API 可以使用。


取消請求
const xhr = new XMLHttpRequest(),
  method = "GET",
  url = "https://example.come.tw/";

xhr.open(method, url);
xhr.onabort = () => {
  console.log("Request aborted");
};

xhr.send();
xhr.abort();
請求逾時
const xhr = new XMLHttpRequest(),
  method = "GET",
  url = "https://example.come.tw/";

xhr.timeout = 2000; // ms
xhr.open(method, url);
xhr.ontimeout = () => {
  console.log("Request timeout");
};

xhr.send();

甚至是「請求的傳輸進度」這種功能,XMLHttpRequest 也有對應的事件可以使用。

請求逾時
const xhr = new XMLHttpRequest(),
  method = "GET",
  url = "https://example.come.tw/";

xhr.open(method, url);
xhr.onprogress = (event) => {
  const { lengthComputable, loaded, total } = event;
  if (lengthComputable) {
    console.log(`Downloaded ${loaded} of ${total}`)
  } else {
    console.log(`Downloaded ${loaded}`);
  }
};

xhr.send();

我先前有寫過另一篇文章來說明 Axios 是如何封裝 XMLHttpRequest 的 abort,有興趣的朋友可以看看 這裡


AbortController

雖然反觀 XMLHttpRequest,Fetch 並沒有 aborttimeout 相關的 API,不過好在 Fetch 推出三年後也釋出了 signal 這個設定,讓我們可以使用 AbortController 來取消請求。若要實現請求逾時的話,也只要搭配 setTimeout 來使用即可。


取消請求
const controller = new AbortController();
const { signal } = controller;

fetch("https://example.come.tw/", { signal });

controller.abort();
請求逾時
const controller = new AbortController();
const { signal } = controller;

fetch("https://example.come.tw/", { signal });

setTimeout(() => controller.abort(), 2000);

但是 Fetch API 並沒有提供類似 onprogress 這樣的事件,所以我們必須要找其他方法,而這個方法也就是本次要介紹的重點 - Fetch Stream。


Streams API

Streams API 可以讓我們以流(Stream)的方式獲取資料,若資料足夠龐大便會以數個片段的形式獲取資料,這樣的好處是可以節省記憶體的使用,並且能在資料還沒完全下載完之前就開始處理資料。而 Fetch API 中也是在 Response 中實裝了 Streams API。

除此之外,以流(Stream)形式獲取資料也可以透過管道(Pipe)的概念來處理資料,這樣的好處是可以將資料的處理分成多個步驟,讓程式碼更加模組化。


# ReadableStream.getReader

呼叫 fetch 時所回傳的 Promise,會 resolve 出一個 Response 實體,也就是平時常在 then 裡面簡稱的 res。一般常用的 res.json()res.text() 其實都是來自於 Response,而我們要用的 Fetch Stream 也藏在其中。

Response 中還有一個 body 屬性,是一個 ReadableStream 的實體,而其中的 getReader 方法可以讓我們取得一個 ReadableStreamDefaultReader,只要呼叫它的 read 方法,就可以取得「流」的資料。

fetch("http://localhost:3000/")
  .then((res) => {
    /** @type { ReadableStreamDefaultReader } */
    const reader = res.body.getReader(); 

    let result = "";
    return new Promise((resolve) => {
      const pull = () => {
        reader.read().then(({ done, value }) => {
          if (done) return resolve(JSON.parse(result));
          result += new TextDecoder("utf-8").decode(value);
          pull();
        });
      };
      pull();
    });
  })
  .then((data) => {
    console.log(data);
  });

reader 類似於一個迭代器,每次使用 read() 都會回傳一個 Promise,你可以從中取得包含 donevalue 的物件:

  • done: 一個布林值,代表所有資料是否已經取得完畢。
  • value: 一個 Uint8Array,代表這次取得的資料。

這邊我們用了一個遞迴的方式來呼叫迭代器,並且由於從 read 所得到的資料是 Uint8Array 的形式,所以我們需要用 TextDecoder 來將資料轉成字串。


請求的傳輸進度

像這樣一段一段的取得資料,可以說是完美的符合了「請求的傳輸進度」的需求,我們可以在每次取得資料的時候,去計算已經取得的資料量,並且印出來。

fetch("http://localhost:3000/")
  .then((res) => {
    const reader = res.body.getReader();
    const contentLength = res.headers.get("Content-Length");
    let loaded = 0;

    const pull = () => {
      reader.read().then(({ done, value }) => {
        if (done) return console.log("Download complete");;
        loaded += value.byteLength;
        console.log(`Downloaded ${loaded} of ${contentLength}`);
        pull();
      });
    };
    pull();
  });

這次我們從 Response Header 中拿到 Content-Length,並透過每次在 reader 中取得的資料來計算已經下載的資料量,這樣就可以實現「請求的傳輸進度」了。


# Response.clone

不過上面的範例中,只顧著把進度印出來卻沒有把資料存取下來,所以我們還得結合前兩段程式碼才可以,但其實也有一個更方便的辦法,那就是使用 Response.clone,這個方法可以複製一個 Response 物件,這樣就可以直接把資料回傳出去了。

fetch("http://localhost:3000/")
  .then((res) => {
    const returnRes = res.clone();
    const reader = res.body.getReader();
    const contentLength = +res.headers.get("Content-Length");
    let loaded = 0;

    const pull = () => {
      reader.read().then(({ done, value }) => {
        if (done) return console.log("Download complete");
        loaded += value.byteLength;
        console.log(`Downloaded ${loaded} of ${contentLength}`);
        pull();
      });
    };
    pull();

    return returnRes.json();
  })
  .then((data) => {
    console.log(data);
  });

# ReadableStream.tee

除了 Response.clone 之外 Response.body 還有 tee() 這個方法,可以將 ReadableStream 複製成兩個,你可以想像它是把一個資料流分流成兩條了,而這兩條分流拿到的資料會是一樣的。

不過因為複製的是 ReadableStream,所以我們還是需要透過 Response 的建構子來建構一個新的 Response 實體,這樣才能夠使用 jsontext 這類的方法。

fetch("http://localhost:3000/")
  .then((res) => {
    const [progressStream, returnStream] = res.body.tee();
    const reader = progressStream.getReader();
    const contentLength = +res.headers.get("Content-Length");
    let loaded = 0;

    const pull = () => {
      reader.read().then(({ done, value }) => {
        if (done) return console.log("Download complete");
        loaded += value.byteLength;
        console.log(`Downloaded ${loaded} of ${contentLength}`);
        pull();
      });
    };
    pull();

    return new Response(returnStream, { header: res.headers }).json();
  })
  .then((data) => {
    console.log(data);
  });

到此為止,我們已經成功解決請求進度以及獲取資料的問題了,不過前面也有提到 AbortController 出現的時間比較晚,那在此之前 Fetch API 又是如何實現取消請求的呢?其實也是可以使用 Fetch Stream 來完成的。


取消請求

# ReadableStream.cancel / ReadableStreamDefaultReader.cancel

Response.body 中還有一個 cancel 方法,它可以用來關閉一個流,而當一個流被關閉了,那對應的 Response 及其請求也就會被取消。所以我們嘗試這樣處理:

let aborter = null;

fetch("http://localhost:3000/")
  .then((res) => {
    aborter = () => {
      res.body.cancel();
    };
    return res;
  })
  .then(res => res.json())
  .then((data) => { 
    console.log(data);
  });

aborter();

但卻沒有得到我們期望的結果,反而出現了以下錯誤:

TypeError: Failed to execute 'cancel' on 'ReadableStream': Cannot cancel a locked stream

不過沒關係,因為 ReadableStreamDefaultReader 也有 cancel 方法,它可以用來取消與此 reader 相關的流。所以我們嘗試這樣處理:

let aborter = null;

fetch("http://localhost:3000/")
  .then((res) => {
    aborter = () => {
      const reader = res.body.getReader();
      reader.cancel();
    };
    return res;
  })
  .then(res => res.json())
  .then((data) => { 
    console.log(data);
  });

aborter();

結果我們再次得到了錯誤,依然是告訴我們不能對一個已經被鎖定的流進行操作。

TypeError: Failed to execute 'getReader' on 'ReadableStream': ReadableStreamDefaultReader constructor can only accept readable streams that are not yet locked to a reader

不過我們依然不死心,既然 tee() 可以複製一個流,那我們就試試看能不能複製一個流,然後對這個複製出來的流進行取消。

let aborter = null;

fetch("http://localhost:3000/")
  .then((res) => {
    const [progressStream, returnStream] = res.body.tee();
    aborter = () => {
      const reader = progressStream.getReader();
      reader.cancel();
    };
    return new Response(returnStream, { header: res.headers }).json();
  })
  .then((data) => { 
    console.log(data);
   });

aborter();

這次終於沒有收到任何錯誤了,不過會發現我們的請求並沒有被取消, 因為我們取消的是 progressStream,而 returnStream 並沒有被取消,這個流依然在接收資料。

以上這些嘗試關閉流的方式都失敗了,原因在於一個流只能同時有一個處於活動狀態的 reader,當一個流被一個 reader 使用時,這個流就會被鎖定,當我們試圖對這個流進行操作時,就會出現錯誤。

但在第一個使用 Response.body.cancel 的範例中我們明明沒有使用任何 reader,怎麼流還是被鎖定了呢? 這是因為當我們使用 Response.jsonResponse.text 這類的方法時,這個方法會自動幫我們建立一個 reader 並且讀取資料,所以這個流就被鎖定了。


# new ReadableStream

所以我們無法使用 Fetch Stream 來實現取消請求嗎?其實不然,因為我們還有一個方法可以使用,那就是使用 ReadableStream 的建構子來建立一個新的流,用來代替原本的流「被鎖定」,並且還可以對這個新的流有額外的控制。

當我們要創建一個新的 ReadableStream 實體時,需要傳入一個設定對應生命週期 Callback 的物件,這個物件中有一個 start 方法,這個方法會在流開始時被呼叫,並且在被呼叫時會被傳入 controller 參數,這個參數是一個 ReadableStreamDefaultController 的實體,我們可以透過它來對流做一些內部操作。

const stream = new ReadableStream({
  // 在實例建立時呼叫,並傳入一個流的內部控制器
  start(controller) {
    controller.close(); // 關閉流
    controller.enqueue(chunk); // 將資料片段傳入流的佇列
    controller.error(reason); // 從流的內部發出錯誤
  },
  // 在流的內部佇列未滿時,會重複呼叫這個方法
  pull(controller) { ... },
  // 在流將被取消時呼叫
  cancel(reason) { ... }
});

下面我們就用這個方式創建一個「自訂流」,並且從 res.body 這個「原始流」拿到 reader,此時「原始流」會被這個 reader 鎖定,但這次 reader 是由我們全權掌控,從中取得的資料也會被做為「自訂流」的資料源片段。

接著暴露出一個 aborter 函式,當中透過 reader.cancel() 來釋放「原始流」的鎖定,再者是利用 controller.error 在「自訂流」的內部發出錯誤,這樣我們就成功取消了這次的請求。

最後再用新的 Response 包裹這個「自訂流」,這樣後續的 res.json() 鎖定的流就會是這個「自訂流」,而不是原本的「原始流」,如此就沒有鎖定衝突的問題了。

let aborter = null;

fetch("http://localhost:3000/")
  .then((res) => {
    const stream = new ReadableStream({ // 1
      start(controller) {
        let aborted = false;

        const reader = res.body.getReader(); // 2

        aborter = () => { // 3
          reader.cancel();
          controller.error(new Error("Fetch aborted"));
          aborted = true;
        };

        const push = () => {
          reader.read().then(({ value, done }) => {
            if (done) {
              if (!aborted) controller.close();
              return;
            }
            controller.enqueue(value); 
            push();
          });
        };
        push();
      }
    });
    return new Response(stream, { headers: res.headers }); // 4
  })
  .then(res => res.json())
  .then((data) => {
    console.log(data);
  })
  .catch((err) => {
    console.log(err);
  });

aborter();

這邊我們已經成功實作出「取消請求」的功能,若要實作「請求逾時」的功能,只要搭配 setTimeout 來使用即可。

另外為了更好理解有沒有使用 new ReadableStream 之間的差別,我也額外用圖像來表示一下:


管道串聯

最後要介紹的是前面有提到的,流(Stream)可以透過管道(Pipe)相互串接,讓資料處理可以步驟化,同時也不需要將所有資料都讀取完才能開始處理。

那首先要知道的是,每個管道鏈(Pipe Chain)的起點一定是一個 ReadableStream,而終點一定是一個 WritableStream,而中間的部分可以是 TransformStream。沒錯,除了 ReadableStream 之外,還有 TransformStreamWritableStream 這兩種流的類別。



# new WritableStream

可寫入的流 WritableStream,這個流的建構子也是一樣,需要傳入一個設定對應生命週期 Callback 的物件,其中比較重要的是 write 屬性,這個屬性會在新的資料片段準備處理時呼叫,所以最主要的資料處理邏輯會寫在這裡。

const stream = new WritableStream({
  // 在實例建立時呼叫,並傳入一個流的內部控制器
  start(controller) {
    controller.error(reason); // 從流的內部發出錯誤
  },
  // 在新的資料片段準備處理時呼叫
  write(chunk, controller) { ... }, 
  // 在所有資料處理完畢,流即將關閉時呼叫
  close(controller) { ... },
  // 在流被強制關閉時呼叫
  abort(controller) { ... } 
});

# WritableStream.getWriter / WritableStreamDefaultWriter.write

ReadableStream 一樣,WritableStream 也有一個對應的 getWriter 方法,可以取得一個 WritableStreamDefaultWriter 實體,其中的 write 方法可以用來將資料傳進流中開始進行處理。

const data = [0, 1, 0, 1, 0, 1];

let result = ""
const stream = new WritableStream({
  write(chunk) {
    const reverse = chunk === 0 ? 1 : 0;
    result += reverse;
  }
});

const writer = stream.getWriter();
data.forEach((value) => {
  writer.ready.then(() => {
    writer.write(value);
  });
});

console.log(result); // 101010

這邊有一個非常簡單的範例,透過迴圈的方式將陣列中的數字用 writer.write 逐一傳入流中,並且在流的 write 方法中將數字進行處理。

不過需要注意的是,我們在呼叫 writer.write 時,可能會因為流的狀態而無法立即寫入,所以我們需要透過 writer.ready 來確保流已經準備好可以寫入。


# new TransformStream

作為連接起點的 ReadableStream 和終點的 WritableStream 之間的 TransformStream,它是一個既可以讀又可以寫的流,這個流的主要功能是將讀取的資料進行轉換,然後再寫入到流中。

一樣在建立實體時需要傳入一個設定對應生命週期 Callback 的物件,其中比較重要的是 transform 屬性,這個屬性會在新的資料片段可以進行從可寫入端到可讀取端的轉換時呼叫,所以最主要的資料處理邏輯會寫在這裡。

const stream = new TransformStream({
  // 在實例建立時呼叫,並傳入一個流的內部控制器
  start(controller) {
    controller.enqueue(chunk); // 將資料片段傳入可讀取端的佇列
    controller.error(reason); // 從流的內部發出錯誤
    controller.terminate(); // 終止流
  },
  // 在新的資料片段可以進行從可寫端到可讀取端的轉換時呼叫
  transform(chunk, controller) { ... }, 
  // 在所有資料處理完畢,可寫入端即將關閉時呼叫
  flush(controller) { ... } 
});

注意!TransformStream 是由可寫入端與 ReadableStream 進行介接,由可讀取端與 WritableStream 進行介接,所以在其內部,資料是由可寫入端流進來,經過 transform 處理後再由可讀取端流出去。


# TransformStream.readable / TransformStream.writable

TransformStream 沒有任何方法,它暴露出來的只有他的可讀取端和可寫入端,這兩個端點可以讓我們取得一個 ReadableStreamWritableStream 實體,這樣我們就可以將 TransformStream 串接到其他流上。

const string = "Hello World";
const stream = new TransformStream({
  transform(chunk, controller) {
    console.log("Get Chunk:", chunk);
    controller.enqueue(chunk); // 將資料片段傳入可讀取端的佇列
  }
});

const transformWriter = stream.writable.getWriter();
for (const char of string) {
  transformWriter.ready.then(() => {
    transformWriter.write(char);
  });
}

const transformReader = stream.readable.getReader();
transformReader.read().then(function dequeue ({ done, value }) {
  if (done) return;
  return transformReader.read().then(dequeue);
});

// Get Chunk: H
// Get Chunk: e
// ...
// Get Chunk: d

在上面這個範例中,我用利用 TransformStream.writable 拿取寫入端的 writer 將字串中的每個字元逐一寫入,然後在 transform 方法中將這些字元逐一傳入可讀取端的佇列,並且透過 TransformStream.readable 來拿取可讀取端的 reader 來讀取資料。這樣在 TransformStream 內部就會呼叫 transform 方法來處理資料。


# ReadableStream.pipeThrough / ReadableStreamDefaultReader.pipeTo

認識完所有的流之後,我們就可以開始串接流了,pipeThroughpipeTo 就是用來將一個流接到另一個流的方法,其中 pipeThrough 只能接受 TransformStream 作為參數,pipeTo 則只能接受 WritableStream 作為參數。

const readableStream = new ReadableStream();
const transformStream1 = new TransformStream()
const transformStream2 = new TransformStream()
const writableStream = new WritableStream()

readableStream
  .pipeThrough(transformStream1())
  .pipeThrough(transformStream2())
  .pipeTo(writableStream())

而可以用鏈式寫法的原因是因為 pipeThrough 會將它拿到的 TransformStream 的可讀取端回傳出來,讓下一個 TransformStreamWritableStream 可以繼續接上,而只要透過 Pipe 接上的流便會被鎖定住,至於為何會鎖住,我們可以自己嘗試實作 pipeThroughpipeTo 就可以知道了。

function myPipeThrough(readableStream, transformStream) {
  const reader = readableStream.getReader();
  const writer = transformStream.writable.getWriter();

  reader.read().then(function through({ done, value }) {
    writer.ready.then(() => {
      if (done) {
        writer.close();
        return;
      }
      writer.write(value);
    })
    reader.read().then(through);
  });

  return transformStream.readable;
};
function myPipeTo(readableStream, writableStream) {
  const reader = readableStream.getReader();
  const writer = writableStream.getWriter();

  reader.read().then(function to({ done, value }) {
    writer.ready.then(() => {
      if (done) {
        writer.close();
        return;
      }
      writer.write(value);
    })
    reader.read().then(to);
  });
};

可以看到我們在串接兩個流的時候,是個別取得對應的 readerwriter,然後透過 reader.readwriter.write 來進行資料的傳遞,這樣就可以了解為何串接流會被鎖定住了。這邊一樣放一張我畫的圖來讓大家更好理解:

這時候回想前面要實現取消請求的時候,為何 Response.body 會被 json() 鎖定住了,我們可以合理懷疑這個方法內部應該是串接了一個 TransformStreamWritableStream,所以我們無法對 Response.body 進行操作。


結尾

最後我們用前面所學,寫一個終極範例作為文章的結尾,希望今天的分享對大家有幫助,並且對 Fetch Stream 有更深一層的了解。

// 印出訊息用的 DOM 和函式
const logs = document.querySelector("pre");
const writeMsg = (msg) => {
  logs.textContent += `${msg}\n`;
};

// 取消請求用的 flag 和函式
let aborted = false;
let streamAbort = () => {
  console.log("not yet");
};

// 創建印出請求進度的流的工廠函式
const progressStreamFactory = (progressResponse) => {
  let loaded = 0;
  let cancel;
  const total = progressResponse.headers.get("content-length");
  const progressReader = progressResponse.body.getReader();
  const stream = new ReadableStream({
    start(controller) {
      cancel = (msg) => {
        progressReader.cancel(msg);
        controller.error(msg);
      };

      progressReader.read().then(function log({ value, done }) {
        if (done) {
          if (!aborted) {
            controller.close(value);
            writeMsg("Download completed");
          }
          return;
        }

        loaded += value.length;
        if (total === null) {
          writeMsg(`Downloaded ${loaded}`);
        } else {
          const percentage = ((loaded / total) * 100).toFixed(2);
          writeMsg(`Downloaded ${loaded} of ${total} (${percentage}%)`);
        }
        controller.enqueue(value);
        return progressReader.read().then(log);
      });
    },
  });

  return [stream, cancel];
};

// 創建寫入流的工廠函式,他會將資料中的 "John" 文字替換成 "Mary",並在流關閉時印出 JSON
const writableStreamFactory = () => {
  let result = "";
  return new WritableStream({
    write(chunk) {
      const decoded = chunk.replaceAll("John", "Mary");
      result += decoded;
    },
    close() {
      console.log(JSON.parse(result));
    },
    abort(reason) {
      writeMsg("Writer: " + reason);
    },
  });
};

// 創建用來返回資料的流的工廠函式
const returnStreamFactory = (returnResponse) => {
  const returnReader = returnResponse.body.getReader();
  let cancel;
  const stream = new ReadableStream({
    start(controller) {
      cancel = (msg) => {
        returnReader.cancel(msg);
        controller.error(msg);
      };

      returnReader.read().then(function push({ value, done }) {
        if (done) {
          if (!aborted) controller.close(value);
          return;
        }
        controller.enqueue(value);
        return returnReader.read().then(push);
      });
    },
  });

  return [stream, cancel];
};

// 取得資料的函式
const getData = () => {
  // reset
  logs.textContent = "";
  aborted = false;
  streamAbort = () => {
    console.log("not yet");
  };

  fetch("http://localhost:3000/")
    .then((res) => {
      return [res, res.clone()];
    })
    .then(([progressResponse, returnResponse]) => {
      const [progressStream, progressStreamCancel] = progressStreamFactory(progressResponse);
      const [returnStream, returnStreamCancel] = returnStreamFactory(returnResponse);
      
      // 返回資料的流接了 TextDecoderStream 這個原生的 TransformStream,用來將 Uint8Array 轉成字串
      // 然後再接上寫入流,將資料中的 "John" 文字替換成 "Mary",並在流關閉時印出 JSON
      returnStream.pipeThrough(new TextDecoderStream()).pipeTo(writableStreamFactory());

      streamAbort = () => {
        if (aborted) return;
        aborted = true;
        const msg = "Download aborted";

        progressStreamCancel(msg);
        returnStreamCancel(msg);
        writeMsg(msg);
      };
    });
};

參考資料
Prev
Next