跳到主要內容

在 Dart 中建立 Stream

作者:Lasse Nielsen
2013 年 4 月 (2021 年 5 月更新)

dart:async 程式庫包含兩種對於許多 Dart API 很重要的類型:StreamFuture。Future 代表單一運算的結果,而 stream 則是一連串的結果。您監聽 stream 以接收結果(資料和錯誤)以及 stream 關閉的通知。您也可以在監聽時暫停,或在 stream 完成之前停止監聽。

但本文不是關於「使用」 stream。而是關於建立您自己的 stream。您可以使用幾種方式建立 stream:

  • 轉換現有的 stream。
  • 使用 async* 函式從頭建立 stream。
  • 使用 StreamController 建立 stream。

本文展示了每種方法的程式碼,並提供提示以協助您正確實作 stream。

如需使用 stream 的協助,請參閱非同步程式設計:Stream

轉換現有的 Stream

#

建立 stream 的常見情況是您已經有一個 stream,並且想要根據原始 stream 的事件建立一個新的 stream。例如,您可能有一個位元組 stream,想要透過 UTF-8 解碼輸入將其轉換為字串 stream。最通用的方法是建立一個新的 stream,等待原始 stream 上的事件,然後輸出新的事件。範例:

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

對於許多常見的轉換,您可以使用 Stream 提供的轉換方法,例如 map()where()expand()take()

例如,假設您有一個 stream,counterStream,它每秒發出一個遞增的計數器。以下是如何實作它的方法:

dart
var counterStream = Stream<int>.periodic(
  const Duration(seconds: 1),
  (x) => x,
).take(15);

若要快速查看事件,您可以使用類似以下的程式碼:

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

若要轉換 stream 事件,您可以在監聽 stream 之前,在 stream 上調用轉換方法,例如 map()。該方法會傳回一個新的 stream。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

除了 map() 之外,您還可以使用任何其他轉換方法,例如以下方法:

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,轉換方法是您所需要的全部。但是,如果您需要對轉換進行更多控制,則可以使用 StreamTransformerStreamtransform() 方法。平台程式庫為許多常見任務提供了 stream 轉換器。例如,下列程式碼使用了 dart:convert 程式庫提供的 utf8.decoderLineSplitter 轉換器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines =
    await content
        .transform(utf8.decoder)
        .transform(const LineSplitter())
        .toList();

從頭建立 Stream

#

建立新 stream 的一種方法是使用非同步產生器 (async*) 函式。stream 在函式被調用時建立,而函式的主體在 stream 被監聽時開始執行。當函式傳回時,stream 會關閉。在函式傳回之前,它可以使用 yieldyield* 陳述式在 stream 上發出事件。

以下是一個原始範例,它以固定的間隔發出數字:

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函式會傳回 Stream。當監聽該 stream 時,主體開始執行。它會重複延遲請求的間隔,然後產生下一個數字。如果省略 maxCount 參數,則迴圈沒有停止條件,因此 stream 會永遠輸出越來越大的數字 - 或者直到監聽器取消其訂閱。

當監聽器取消時(透過在 listen() 方法傳回的 StreamSubscription 物件上調用 cancel()),則下次主體到達 yield 陳述式時,yield 會改為充當 return 陳述式。任何封閉的 finally 區塊都會執行,並且函式會結束。如果函式嘗試在結束之前產生值,則會失敗並充當傳回。

當函式最終結束時,cancel() 方法傳回的 future 會完成。如果函式以錯誤結束,則 future 會以該錯誤完成;否則,它會以 null 完成。

另一個更有用的範例是將 future 序列轉換為 stream 的函式:

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函式會向 futures 可迭代物件請求新的 future,等待該 future,發出結果值,然後迴圈。如果 future 以錯誤完成,則 stream 會以該錯誤完成。

很少有 async* 函式從無到有建立 stream。它需要從某個地方取得資料,而最常見的地方是另一個 stream。在某些情況下,例如上面的 future 序列,資料來自其他非同步事件來源。然而,在許多情況下,async* 函式太過於簡單,無法輕鬆處理多個資料來源。這就是 StreamController 類別的用武之地。

使用 StreamController

#

如果您的 stream 事件來自程式的不同部分,而不僅僅是來自可以由 async 函式遍歷的 stream 或 future,則請使用 StreamController 來建立和填充 stream。

StreamController 為您提供了一個新的 stream,以及一種在任何時間點和從任何地方將事件新增到 stream 的方法。stream 具有處理監聽器和暫停所需的所有邏輯。您傳回 stream 並將控制器保留給自己。

以下範例(來自 stream_controller_bad.dart)展示了 StreamController 的基本但有缺陷的用法,以實作先前範例中的 timedCounter() 函式。此程式碼建立一個要傳回的 stream,然後根據計時器事件將資料饋送到其中,這些計時器事件既不是 future 也不是 stream 事件。

baddart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

與之前一樣,您可以使用 timedCounter() 傳回的 stream,如下所示:

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() 的此實作有幾個問題:

  • 它在有訂閱者之前就開始產生事件。
  • 即使訂閱者請求暫停,它也會繼續產生事件。

如下一節所示,您可以透過在建立 StreamController 時指定回呼(例如 onListenonPause)來修正這兩個問題。

等待訂閱

#

作為規則,stream 應在開始工作之前等待訂閱者。async* 函式會自動執行此操作,但在使用 StreamController 時,您可以完全控制,甚至可以在不應該新增事件時新增事件。當 stream 沒有訂閱者時,其 StreamController 會緩衝事件,如果 stream 永遠沒有訂閱者,則可能會導致記憶體洩漏。

嘗試將使用 stream 的程式碼變更為以下內容:

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

當此程式碼執行時,前 5 秒鐘沒有任何內容列印出來,儘管 stream 正在執行工作。然後新增監聽器,並且一次列印前 5 個左右的事件,因為它們已由 StreamController 緩衝。

若要收到訂閱通知,請在建立 StreamController 時指定 onListen 引數。當 stream 取得其第一個訂閱者時,會調用 onListen 回呼。如果您指定 onCancel 回呼,則當控制器失去其最後一個訂閱者時,會調用它。在前面的範例中,Timer.periodic() 應移至 onListen 處理常式,如下一節所示。

遵守暫停狀態

#

避免在監聽器請求暫停時產生事件。當 stream 訂閱暫停時,async* 函式會在 yield 陳述式自動暫停。另一方面,StreamController 會在暫停期間緩衝事件。如果提供事件的程式碼不遵守暫停,則緩衝區的大小可能會無限期增長。此外,如果監聽器在暫停後不久停止監聽,則花費在建立緩衝區上的工作將會浪費。

若要查看沒有暫停支援的情況會發生什麼,請嘗試將使用 stream 的程式碼變更為以下內容:

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

當五秒的暫停結束時,在此期間觸發的事件會一次全部收到。發生這種情況是因為 stream 的來源不遵守暫停,並持續將事件新增到 stream。因此,stream 會緩衝事件,然後在 stream 變成未暫停時清空其緩衝區。

以下版本的 timedCounter()(來自 stream_controller.dart)透過使用 StreamController 上的 onListenonPauseonResumeonCancel 回呼來實作暫停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

使用上面的 listenWithPause() 函式執行此程式碼。您會看到它在暫停時停止計數,然後在之後順利恢復。

您必須使用所有監聽器 — onListenonCancelonPauseonResume — 才能收到暫停狀態變更的通知。原因是如果訂閱和暫停狀態同時變更,則只會調用 onListenonCancel 回呼。

最終提示

#

在不使用 async* 函式建立 stream 時,請記住這些提示:

  • 使用同步控制器時要小心 — 例如,使用 StreamController(sync: true) 建立的控制器。當您在未暫停的同步控制器上傳送事件時(例如,使用 EventSink 定義的 add()addError()close() 方法),事件會立即傳送到 stream 上的所有監聽器。在新增監聽器的程式碼完全傳回之前,永遠不應調用 Stream 監聽器,並且在錯誤的時間使用同步控制器可能會破壞此承諾並導致良好的程式碼失敗。避免使用同步控制器。

  • 如果您使用 StreamController,則會在 listen 呼叫傳回 StreamSubscription 之前調用 onListen 回呼。不要讓 onListen 回呼依賴於已經存在的訂閱。例如,在下列程式碼中,onListen 事件會在 subscription 變數具有有效值之前觸發(並調用 handler)。

    dart
    subscription = stream.listen(handler);
  • StreamController 定義的 onListenonPauseonResumeonCancel 回呼會在 stream 的監聽器狀態變更時由 stream 調用,但永遠不會在事件觸發期間或在另一個狀態變更處理常式呼叫期間調用。在這些情況下,狀態變更回呼會延遲到先前的回呼完成為止。

  • 不要嘗試自己實作 Stream 介面。事件、回呼以及新增和移除監聽器之間的互動很容易出現細微的錯誤。始終使用現有的 stream,可能是來自 StreamController 的 stream,來實作新 stream 的 listen 呼叫。

  • 雖然可以透過擴充 Stream 類別並在頂部實作 listen 方法和額外功能來建立擴充 Stream 且具有更多功能的類別,但通常不建議這樣做,因為它引入了使用者必須考慮的新類型。您可以建立一個「具有」 Stream(以及更多)的類別,而不是一個「是」 Stream(以及更多)的類別。