內容

在 Dart 中建立 Stream

作者:Lasse Nielsen
2013 年 4 月 (2021 年 5 月更新)

dart:async 函式庫包含兩種對於許多 Dart API 很重要的類型:StreamFuture。Future 代表單次計算的結果,而 Stream 則是結果的序列。您監聽 Stream 以取得結果 (資料和錯誤) 和 Stream 關閉的通知。您也可以在監聽時暫停,或在 Stream 完成之前停止監聽。

但本文並非關於使用 Stream。而是關於建立您自己的 Stream。您可以使用幾種方式建立 Stream:

  • 轉換現有的 Stream。
  • 使用 async* 函式從頭開始建立 Stream。
  • 使用 StreamController 建立 Stream。

本文顯示每種方法的程式碼,並提供提示以協助您正確實作 Stream。

如需有關使用 Stream 的說明,請參閱非同步程式設計:Stream

轉換現有的 Stream

#

建立 Stream 的常見情況是您已經有一個 Stream,並且想要根據原始 Stream 的事件建立新的 Stream。例如,您可能有一個位元組 Stream,而您想要藉由 UTF-8 解碼輸入將其轉換為字串 Stream。最常見的方法是建立一個新的 Stream,該 Stream 等待原始 Stream 上的事件,然後輸出新的事件。範例

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

對於許多常見的轉換,您可以使用 Stream 提供的轉換方法,例如 map()where()expand()take()

例如,假設您有一個 Stream,counterStream,它每秒發出一個遞增計數器。以下是如何實作它:

dart
var counterStream =
    Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);

若要快速查看事件,您可以使用類似以下的程式碼:

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

若要轉換 Stream 事件,您可以在監聽 Stream 之前,在 Stream 上叫用轉換方法,例如 map()。該方法會傳回新的 Stream。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

您可以改為使用其他轉換方法,例如以下方法,而不是使用 map()

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,轉換方法已足夠您使用。但是,如果您需要對轉換有更多控制權,您可以使用 Streamtransform() 方法指定 StreamTransformer。平台函式庫為許多常見工作提供 Stream 轉換器。例如,以下程式碼使用 dart:convert 函式庫提供的 utf8.decoderLineSplitter 轉換器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

從頭開始建立 Stream

#

建立新 Stream 的其中一種方法是使用非同步產生器 (async*) 函式。在呼叫函式時會建立 Stream,並且在監聽 Stream 時開始執行函式的主體。當函式傳回時,Stream 會關閉。在函式傳回之前,可以使用 yieldyield* 陳述式在 Stream 上發出事件。

以下是一個簡單的範例,它會定期發出數字:

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函式會傳回 Stream。當監聽該 Stream 時,會開始執行主體。它會重複延遲所要求的間隔,然後產生下一個數字。如果省略 maxCount 參數,則迴圈沒有停止條件,因此 Stream 會永遠輸出越來越大的數字,或直到監聽器取消其訂閱為止。

當監聽器取消時 (藉由在 listen() 方法傳回的 StreamSubscription 物件上叫用 cancel()),則下次主體到達 yield 陳述式時,yield 會改為作為 return 陳述式。任何包含的 finally 區塊都會執行,且函式會結束。如果函式在結束之前嘗試產生值,則會失敗並作為傳回。

當函式最終結束時,cancel() 方法傳回的 Future 會完成。如果函式以錯誤結束,則 Future 會以該錯誤完成;否則,它會以 null 完成。

另一個更有用的範例是將 Future 序列轉換為 Stream 的函式:

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函式會向 futures 可迭代物件要求新的 Future、等待該 Future、發出產生的值,然後進行迴圈。如果 Future 以錯誤完成,則 Stream 會以該錯誤完成。

使用 async* 函式從頭開始建置 Stream 的情況很少見。它需要從某處取得資料,而最常出現的情況是從另一個 Stream 取得。在某些情況下,例如上述的 Future 序列,資料來自其他非同步事件來源。但是,在許多情況下,async* 函式太過簡單,無法輕鬆處理多個資料來源。這就是 StreamController 類別的用途。

使用 StreamController

#

如果您的 Stream 事件來自程式的不同部分,而不只是來自可由 async 函式遍歷的 Stream 或 Future,則請使用 StreamController 來建立和填入 Stream。

StreamController 為您提供新的 Stream,以及隨時隨地將事件新增至 Stream 的方法。Stream 具有處理監聽器和暫停所需的所有邏輯。您會傳回 Stream 並保留控制器給自己。

以下範例 (來自 stream_controller_bad.dart) 顯示 StreamController 的基本但有缺陷的用法,以實作先前範例中的 timedCounter() 函式。此程式碼會建立要傳回的 Stream,然後根據計時器事件將資料饋入其中,而計時器事件既不是 Future 也不是 Stream 事件。

baddart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

如同之前,你可以像這樣使用 timedCounter() 回傳的串流

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

這個 timedCounter() 的實作有一些問題

  • 它在有訂閱者之前就開始產生事件。
  • 即使訂閱者請求暫停,它仍會持續產生事件。

如下幾個章節所示,你可以透過在建立 StreamController 時指定回呼函式,例如 onListenonPause,來修正這兩個問題。

等待訂閱

#

一般來說,串流應該在有訂閱者之後才開始工作。async* 函式會自動執行此操作,但當你使用 StreamController 時,你可以完全掌控,甚至可以在不應該的時候新增事件。當串流沒有訂閱者時,其 StreamController 會緩衝事件,如果串流永遠沒有訂閱者,可能會導致記憶體洩漏。

嘗試將使用串流的程式碼變更為以下內容

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

當此程式碼執行時,前 5 秒不會印出任何內容,儘管串流正在執行工作。然後,會加入監聽器,並且會一次印出前 5 個左右的事件,因為它們已被 StreamController 緩衝。

若要接收訂閱通知,請在建立 StreamController 時指定 onListen 引數。當串流獲得第一個訂閱者時,會呼叫 onListen 回呼。如果你指定 onCancel 回呼,則會在控制器失去最後一個訂閱者時呼叫它。在前面的範例中,Timer.periodic() 應該移至 onListen 處理常式,如下一節所示。

遵守暫停狀態

#

避免在監聽器請求暫停時產生事件。當串流訂閱暫停時,async* 函式會在 yield 陳述式自動暫停。另一方面,StreamController 會在暫停期間緩衝事件。如果提供事件的程式碼不遵守暫停,緩衝區的大小可能會無限增長。此外,如果監聽器在暫停後不久停止監聽,則用於建立緩衝區的工作將會浪費。

若要查看沒有暫停支援時會發生什麼情況,請嘗試將使用串流的程式碼變更為以下內容

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

當五秒暫停時間結束時,該期間觸發的事件會一次全部接收。發生這種情況的原因是串流的來源不遵守暫停,並持續向串流新增事件。因此,串流會緩衝事件,然後在串流解除暫停時清空其緩衝區。

以下版本的 timedCounter()(來自 stream_controller.dart)透過使用 StreamController 上的 onListenonPauseonResumeonCancel 回呼來實作暫停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

使用上述的 listenWithPause() 函式執行此程式碼。你會看到它在暫停時停止計數,並且之後會順利恢復。

你必須使用所有監聽器—onListenonCancelonPauseonResume—才能收到暫停狀態變更的通知。原因是如果訂閱和暫停狀態同時變更,則只會呼叫 onListenonCancel 回呼。

最後提示

#

在不使用 async* 函式建立串流時,請記住以下訣竅

  • 使用同步控制器時請小心—例如,使用 StreamController(sync: true) 建立的控制器。當你在未暫停的同步控制器上傳送事件時(例如,使用 EventSink 定義的 add()addError()close() 方法),事件會立即傳送到串流上的所有監聽器。在加入監聽器的程式碼完全返回之前,絕對不能呼叫 Stream 監聽器,並且在錯誤的時間使用同步控制器可能會違反此承諾並導致良好的程式碼失敗。避免使用同步控制器。

  • 如果你使用 StreamController,則會在 listen 呼叫傳回 StreamSubscription 之前呼叫 onListen 回呼。不要讓 onListen 回呼依賴於已存在的訂閱。例如,在下列程式碼中,onListen 事件會在 subscription 變數具有有效值之前觸發(並呼叫 handler)。

    dart
    subscription = stream.listen(handler);
  • StreamController 定義的 onListenonPauseonResumeonCancel 回呼會在串流的監聽器狀態變更時由串流呼叫,但絕不會在觸發事件期間或在呼叫另一個狀態變更處理常式期間呼叫。在這些情況下,狀態變更回呼會延遲到先前的回呼完成為止。

  • 不要嘗試自己實作 Stream 介面。很容易在事件、回呼以及新增和移除監聽器之間的互動上產生細微的錯誤。始終使用現有的串流(可能來自 StreamController)來實作新串流的 listen 呼叫。

  • 儘管可以透過擴充 Stream 類別並在頂端實作 listen 方法和額外功能來建立擴充 Stream 並具有更多功能的類別,但通常不建議這樣做,因為這會引入使用者必須考慮的新類型。你可以建立一個「具有」 Stream(以及更多)的類別,而不是一個「是」 Stream(以及更多)的類別。