內容

非同步程式設計:串流

Dart 中的非同步程式設計的特徵在於 FutureStream 類別。

Future 代表不會立即完成的運算。一般函式會傳回結果,非同步函式會傳回 Future,最終會包含結果。Future 會在結果準備好時告知您。

串流是非同步事件序列。它就像非同步 Iterable,不同的是,您不會在要求時取得下一個事件,而是串流會在準備好時告知您有事件。

接收串流事件

#

串流可以透過許多方式建立,這是另一篇文章的主題,但它們都可以用相同的方式使用:非同步 for 迴圈(通常稱為 await for)會像 for 迴圈 遍歷 Iterable 一樣遍歷串流的事件。例如

dart
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

此程式碼只接收整數事件串流的每個事件,將它們加總,並傳回 (未來) 總和。當迴圈主體結束時,函式會暫停,直到下一個事件到達或串流完成。

函式標示為 async 關鍵字,這是使用 await for 迴圈時所需要的。

以下範例透過使用 async* 函式產生簡單的整數串流來測試前一個程式碼

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

錯誤事件

#

當串流中沒有更多事件時,串流即完成,而接收事件的程式碼會收到通知,就像收到新事件到達的通知一樣。使用await for 迴圈讀取事件時,串流完成時迴圈會停止。

在某些情況下,串流完成前會發生錯誤;例如,從遠端伺服器擷取檔案時網路發生故障,或者產生事件的程式碼有錯誤,但需要有人知道這件事。

串流也可以傳遞錯誤事件,就像傳遞資料事件一樣。大多數串流會在第一個錯誤後停止,但有些串流可能會傳遞多個錯誤,而有些串流則會在錯誤事件後傳遞更多資料。本文檔只討論最多傳遞一個錯誤的串流。

使用await for 讀取串流時,迴圈陳述式會擲回錯誤。這也會結束迴圈。您可以使用try-catch 捕捉錯誤。以下範例會在迴圈反覆運算器等於 4 時擲回錯誤

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

使用串流

#

Stream 類別包含許多輔助方法,可以對串流執行常見作業,類似於 Iterable 上的方法。例如,您可以使用 Stream API 中的 lastWhere() 找出串流中的最後一個正整數。

dart
Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

兩種串流

#

有兩種串流。

單一訂閱串流

#

最常見的串流類型包含一系列事件,這些事件是較大整體的一部分。事件必須按正確順序傳遞,且不能遺漏任何事件。當您讀取檔案或接收網路要求時,就會取得這種串流。

這種串流只能監聽一次。稍後再監聽可能會遺漏初始事件,然後串流的其餘部分就沒有意義了。當您開始監聽時,資料會被擷取並分批提供。

廣播串流

#

另一種串流類型是針對一次可以處理一個的個別訊息。例如,這種串流可以用於瀏覽器的滑鼠事件。

您可以隨時開始監聽這種串流,並取得在監聽時引發的事件。多個監聽器可以同時監聽,而且您可以在取消先前的訂閱後稍後再監聽。

處理串流的方法

#

下列 Stream<T> 方法會處理串流並傳回結果

dart
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

除了 drain()pipe() 之外,所有這些函式都對應到 Iterable 上的類似函式。每個函式都可以輕鬆使用具有await for 迴圈的 async 函式來撰寫(或僅使用其他方法之一)。例如,某些實作可能是

dart
Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(實際實作稍微複雜一些,但主要是基於歷史原因。)

修改串流的方法

#

下列 Stream 方法會根據原始串流傳回新的串流。每個方法都會等到有人在新的串流上監聽後,才會在原始串流上監聽。

dart
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

前述方法對應於 Iterable 上的類似方法,這些方法會將可迭代項目轉換成另一個可迭代項目。所有這些方法都可以使用具有 **await for** 迴圈的非同步函式輕鬆寫出。

dart
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

asyncExpand()asyncMap() 函式類似於 expand()map(),但允許其函式參數為非同步函式。distinct() 函式不存在於 Iterable 上,但它可以存在。

dart
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最後三個函式比較特殊。它們涉及錯誤處理,**await for** 迴圈無法處理錯誤處理,到達迴圈的第一個錯誤將結束迴圈及其對串流的訂閱。無法從中復原。以下程式碼顯示如何使用 handleError() 在 **await for** 迴圈中使用串流之前移除錯誤。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

transform() 函式

#

transform() 函式不僅用於錯誤處理;它是串流的更通用的「map」。一般 map 需要每個輸入事件一個值。然而,特別是對於 I/O 串流,產生一個輸出事件可能需要多個輸入事件。StreamTransformer 可以處理這種情況。例如,Utf8Decoder 等解碼器就是轉換器。轉換器只需要一個函式,bind(),它可以輕鬆地由非同步函式實作。

讀取和解碼檔案

#

以下程式碼讀取檔案並對串流執行兩個轉換。它首先將資料從 UTF8 轉換,然後透過 LineSplitter 執行。會印出所有行,但開頭為井號 (#) 的行除外。

dart
import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen() 方法

#

串流上的最後一個方法是 listen()。這是一個「低階」方法,所有其他串流函式都是根據 listen() 定義的。

dart
StreamSubscription<T> listen(void Function(T event)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError});

若要建立新的 Stream 類型,您只要延伸 Stream 類別並實作 listen() 方法即可,Stream 上的所有其他方法都會呼叫 listen() 才能運作。

listen() 方法允許您開始監聽串流。在您開始監聽之前,串流只是一個惰性物件,描述您想要看到的事件。當您監聽時,會傳回一個 StreamSubscription 物件,代表產生事件的活動串流。這類似於 Iterable 只是物件的集合,但執行器才是實際執行反覆運算的那個。

串流訂閱允許您暫停訂閱、在暫停後繼續訂閱,以及完全取消訂閱。您可以設定回呼,以便在每個資料事件或錯誤事件,以及串流關閉時呼叫。

其他資源

#

閱讀下列文件,深入了解在 Dart 中使用串流和非同步程式設計的詳細資訊。