內容

在 Dart 中建立串流

作者:Lasse Nielsen
2013 年 4 月(2021 年 5 月更新)

dart:async 函式庫包含兩種對許多 Dart API 來說很重要的類型:串流未來值。未來值表示單一運算的結果,而串流則是結果的序列。您會在串流中監聽以接收結果(資料和錯誤)以及串流關閉的通知。您也可以在串流完成之前暫停監聽或停止監聽串流。

但本文並非在探討使用串流。而是探討如何建立您自己的串流。您可以透過幾種方式建立串流

  • 轉換現有的串流。
  • 使用 async* 函式從頭開始建立串流。
  • 使用 StreamController 建立串流。

本文會顯示每種方法的程式碼,並提供提示以協助您正確實作串流。

如需有關如何使用串流的說明,請參閱非同步程式設計:串流

轉換現有串流

#

建立串流的常見情況是您已經有一個串流,而您想要根據原始串流的事件建立一個新的串流。例如,您可能有一個位元組串流,而您想要透過 UTF-8 解碼輸入將其轉換成字串串流。最通用的方法是建立一個新的串流,等待原始串流上的事件,然後輸出新的事件。範例

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

對於許多常見的轉換,您可以使用 Stream 提供的轉換方法,例如 map()where()expand()take()

例如,假設您有一個串流 counterStream,每秒發射一個遞增的計數器。以下是它的實作方式

dart
var counterStream =
    Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);

若要快速查看事件,您可以使用類似這樣的程式碼

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

若要轉換串流事件,您可以在監聽串流之前呼叫串流上的轉換方法,例如 map()。此方法會傳回一個新的串流。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

除了 map() 之外,您還可以呼叫任何其他轉換方法,例如以下方法

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,轉換方法就足以滿足您的需求。不過,如果您需要對轉換有更進一步的控制,您可以使用 Streamtransform() 方法指定一個串流轉換器。平台函式庫提供許多常見任務的串流轉換器。例如,以下程式碼使用 dart:convert 函式庫提供的 utf8.decoderLineSplitter 轉換器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

從頭開始建立串流

#

建立新串流的一種方式是使用非同步產生器 (async*) 函式。當函式被呼叫時會建立串流,而當串流被監聽時,函式的本體會開始執行。當函式傳回時,串流會關閉。在函式傳回之前,它可以使用 yieldyield* 陳述式在串流上發出事件。

以下是定期發出數字的原始範例

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函式傳回一個 Stream。當該串流被監聽時,本體會開始執行。它會重複延遲所要求的間隔,然後產生下一個數字。如果省略 maxCount 參數,則迴圈沒有停止條件,因此串流會永遠輸出越來越大的數字,或直到監聽器取消其訂閱。

當監聽器取消 (在 listen() 方法傳回的 StreamSubscription 物件上呼叫 cancel()) 時,下次本體到達 yield 陳述式時,yield 會作為 return 陳述式。任何封閉的 finally 區塊會被執行,函式會結束。如果函式在結束前嘗試產生一個值,則會失敗並作為傳回。

當函式最終結束時,cancel() 方法傳回的未來會完成。如果函式以錯誤結束,未來會以該錯誤完成;否則,它會以 null 完成。

另一個更有用的範例是將未來序列轉換為串流的函式

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函式向 futures 可迭代物件要求一個新的未來,等待該未來,發出結果值,然後迴圈。如果未來以錯誤完成,則串流會以該錯誤完成。

很少有 async* 函式從無中建立串流。它需要從某個地方取得資料,而大多數情況下,那個地方是另一個串流。在某些情況下,例如上述的未來序列,資料來自其他非同步事件來源。然而,在許多情況下,async* 函式過於簡化,無法輕鬆處理多個資料來源。這就是 StreamController 類別發揮作用的地方。

使用 StreamController

#

如果串流的事件來自程式不同部分,而不仅仅來自 async 函式可以遍歷的串流或未來,請使用 StreamController 建立並填入串流。

StreamController 提供一個新串流和一種方式,可以在任何時間點和從任何地方向串流新增事件。串流具有處理監聽器和暫停所需的所有邏輯。您傳回串流並保留控制器。

以下範例(取自 stream_controller_bad.dart)展示了使用 StreamController 來實作前一個範例中的 timedCounter() 函式的基本用法,儘管有缺陷。此程式碼會建立一個串流來傳回,然後根據計時器事件(既不是 future 也不是串流事件)將資料傳入串流中。

baddart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

如同先前,您可以像這樣使用 timedCounter() 傳回的串流

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() 實作有幾個問題

  • 它在有訂閱者之前就開始產生事件。
  • 即使訂閱者要求暫停,它也會繼續產生事件。

如下一節所述,您可以在建立 StreamController 時指定 onListenonPause 等回呼,來修正這兩個問題。

等待訂閱

#

原則上,串流應在開始運作前等待訂閱者。async* 函式會自動執行此動作,但當使用 StreamController 時,您擁有完全的控制權,可以在不適當的時候新增事件。當串流沒有訂閱者時,其 StreamController 會緩衝事件,如果串流從未取得訂閱者,則可能會導致記憶體外洩。

嘗試將使用串流的程式碼變更為以下內容

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

當此程式碼執行時,儘管串流正在運作,但在前 5 秒內不會印出任何內容。然後會新增監聽器,並一次印出前 5 個左右的事件,因為它們已由 StreamController 緩衝。

若要接收訂閱通知,請在建立 StreamController 時指定 onListen 參數。當串流取得第一個訂閱者時,會呼叫 onListen 回呼。如果您指定 onCancel 回呼,則會在控制器失去最後一個訂閱者時呼叫它。在前一個範例中,Timer.periodic() 應移至 onListen 處理常式,如下一節所示。

尊重暫停狀態

#

在監聽器要求暫停時,請避免產生事件。async* 函式會在串流訂閱暫停時自動在 yield 陳述式暫停。另一方面,StreamController 會在暫停期間緩衝事件。如果提供事件的程式碼不尊重暫停,則緩衝區的大小可能會無限增加。此外,如果監聽器在暫停後不久停止監聽,則建立緩衝區所花的功夫就白費了。

若要查看在沒有暫停支援的情況下會發生什麼情況,請嘗試將使用串流的程式碼變更為以下內容

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

當五秒的暫停結束時,在這段時間內觸發的事件會一次全部收到。這是因為串流的來源不尊重暫停,並持續將事件新增到串流中。因此,串流會緩衝事件,然後在串流取消暫停時清空緩衝區。

timedCounter() 的下列版本(來自 stream_controller.dart)使用 StreamController 上的 onListenonPauseonResumeonCancel 回呼來實作暫停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

使用上述的 listenWithPause() 函式執行這段程式碼。您會看到它在暫停期間停止計數,然後在之後順利恢復。

您必須使用所有監聽器(onListenonCancelonPauseonResume)才能收到暫停狀態變更的通知。原因是如果訂閱和暫停狀態同時變更,則只會呼叫 onListenonCancel 回呼。

最後提示

#

在不使用非同步*函式建立串流時,請記住以下提示

  • 使用同步控制器時請小心,例如使用 StreamController(sync: true) 建立的控制器。當您在取消暫停的同步控制器上傳送事件(例如使用 EventSink 定義的 add()addError()close() 方法)時,事件會立即傳送給串流上的所有監聽器。Stream 監聽器絕不能在新增監聽器的程式碼完全傳回之前呼叫,而在錯誤的時間使用同步控制器可能會破壞此承諾,並導致良好的程式碼失敗。避免使用同步控制器。

  • 如果您使用 StreamController,則會在 listen 呼叫傳回 StreamSubscription 之前呼叫 onListen 回呼。不要讓 onListen 回呼依賴於已經存在的訂閱。例如,在以下程式碼中,onListen 事件會在 subscription 變數具有有效值之前觸發(並呼叫 handler)。

    dart
    subscription = stream.listen(handler);
  • 當串流的監聽器狀態變更時,StreamController 定義的 onListenonPauseonResumeonCancel 回呼會由串流呼叫,但絕不會在觸發事件期間或呼叫另一個狀態變更處理常式期間呼叫。在這些情況下,狀態變更回呼會延遲到前一個回呼完成為止。

  • 不要嘗試自己實作 Stream 介面。事件、回呼、以及新增和移除監聽器之間的互動很容易出錯。請務必使用現有的串流,可能來自 StreamController,來實作新串流的 listen 呼叫。

  • 雖然可以透過延伸 Stream 類別並實作 listen 方法和額外的功能,來建立延伸 Stream 並具有更多功能的類別,但通常不建議這樣做,因為這會引入使用者必須考量的全新類型。與其建立一個 Stream(以及更多)的類別,您通常可以建立一個 具有 Stream(以及更多)的類別。