非同步程式設計:串流
Dart 中的非同步程式設計以 Future 和 Stream 類別為特徵。
Future 代表一個不會立即完成的運算。一般的函式會傳回結果,而非同步函式會傳回 Future,最終會包含結果。Future 會在結果準備就緒時通知您。
串流是非同步事件的序列。它類似於非同步的 Iterable—不同於在您要求時取得下一個事件,串流會在事件準備就緒時通知您。
接收串流事件
#串流可以用多種方式建立,這是另一篇文章的主題,但它們都可以用相同的方式使用:非同步 for 迴圈(通常簡稱為 await for)迭代串流的事件,就像 for 迴圈迭代 Iterable 一樣。例如
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
此程式碼只是接收整數事件串流的每個事件,將它們加總,並傳回(Future 形式的)總和。當迴圈主體結束時,函式會暫停,直到下一個事件到達或串流完成。
該函式標記了 async
關鍵字,這是使用 await for 迴圈時的必要條件。
以下範例透過使用 async*
函式產生簡單的整數串流來測試先前的程式碼
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
錯誤事件
#當串流中沒有更多事件時,串流就完成了,接收事件的程式碼會收到通知,就像收到新事件到達的通知一樣。當使用 await for 迴圈讀取事件時,迴圈會在串流完成時停止。
在某些情況下,錯誤會在串流完成之前發生;可能是從遠端伺服器擷取檔案時網路失敗,或者可能是產生事件的程式碼有錯誤,但需要有人知道。
串流也可以傳遞錯誤事件,就像傳遞資料事件一樣。大多數串流會在第一個錯誤後停止,但有可能會有傳遞多個錯誤的串流,以及在錯誤事件後傳遞更多資料的串流。在本文中,我們僅討論最多傳遞一個錯誤的串流。
當使用 await for 讀取串流時,錯誤會由迴圈陳述式拋出。這也會結束迴圈。您可以使用 try-catch 捕捉錯誤。以下範例在迴圈迭代器等於 4 時拋出錯誤
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
try {
await for (final value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw Exception('Intentional exception');
} else {
yield i;
}
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}
使用串流
#Stream 類別包含許多輔助方法,可以為您執行串流上的常見操作,類似於 Iterable 上的方法。例如,您可以使用 Stream API 中的 lastWhere()
找到串流中最後一個正整數。
Future<int> lastPositive(Stream<int> stream) =>
stream.lastWhere((x) => x >= 0);
兩種串流
#串流有兩種。
單一訂閱串流
#最常見的串流類型包含一系列事件,這些事件是較大整體的一部分。事件需要以正確的順序傳遞,且不能遺漏任何事件。這是您在讀取檔案或接收 Web 請求時取得的串流類型。
這種串流只能監聽一次。稍後再次監聽可能意味著錯過初始事件,然後串流的其餘部分就沒有意義了。當您開始監聽時,資料將會被擷取並以區塊形式提供。
廣播串流
#另一種串流類型適用於可以一次處理一則訊息的個別訊息。例如,這種串流可以用於瀏覽器中的滑鼠事件。
您可以隨時開始監聽這種串流,並且您會收到在您監聽時觸發的事件。多個監聽器可以同時監聽,並且您可以在取消先前的訂閱後稍後再次監聽。
處理串流的方法
#Stream<T> 上的下列方法處理串流並傳回結果
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
除了 drain()
和 pipe()
之外,所有這些函式都對應於 Iterable 上的類似函式。每一個都可以透過使用具有 await for 迴圈的 async
函式(或僅使用其他方法之一)輕鬆編寫。例如,一些實作可能是
Future<bool> contains(Object? needle) async {
await for (final event in this) {
if (event == needle) return true;
}
return false;
}
Future forEach(void Function(T element) action) async {
await for (final event in this) {
action(event);
}
}
Future<List<T>> toList() async {
final result = <T>[];
await forEach(result.add);
return result;
}
Future<String> join([String separator = '']) async =>
(await toList()).join(separator);
(實際實作稍微複雜一些,但主要是因為歷史原因。)
修改串流的方法
#Stream 上的下列方法會根據原始串流傳回新的串流。每一個方法都會等到有人監聽新的串流,然後才監聽原始串流。
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);
前面的方法對應於 Iterable 上的類似方法,這些方法將可迭代物件轉換為另一個可迭代物件。所有這些都可以使用具有 await for 迴圈的 async
函式輕鬆編寫。
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);
asyncExpand()
和 asyncMap()
函式類似於 expand()
和 map()
,但允許它們的函式引數是非同步函式。distinct()
函式在 Iterable
上不存在,但它本來可以存在。
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(
Duration timeLimit, {
void Function(EventSink<T> sink)? onTimeout,
});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
最後三個函式更特殊。它們涉及錯誤處理,而 await for 迴圈無法做到這一點—到達迴圈的第一個錯誤將結束迴圈及其在串流上的訂閱。沒有從中恢復的方法。以下程式碼示範如何使用 handleError()
從串流中移除錯誤,然後在 await for 迴圈中使用它。
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
await for (final event in streamWithoutErrors) {
yield convert(event);
}
}
transform() 函式
#transform()
函式不僅用於錯誤處理;它是一個更通用的串流「map」。正常的 map 需要每個傳入事件一個值。然而,特別是對於 I/O 串流,可能需要多個傳入事件才能產生一個輸出事件。StreamTransformer 可以處理這種情況。例如,解碼器(如 Utf8Decoder)是轉換器。轉換器只需要一個函式 bind(),它可以透過 async
函式輕鬆實作。
讀取和解碼檔案
#以下程式碼讀取檔案並在串流上執行兩個轉換。它首先將資料從 UTF8 轉換,然後透過 LineSplitter 執行。所有行都會被印出,除了任何以井字號 #
開頭的行。
import 'dart:convert';
import 'dart:io';
void main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(const LineSplitter());
await for (final line in lines) {
if (!line.startsWith('#')) print(line);
}
}
listen() 方法
#Stream 上的最後一個方法是 listen()
。這是一個「低階」方法—所有其他串流函式都是根據 listen()
定義的。
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
});
若要建立新的 Stream
類型,您可以直接擴充 Stream
類別並實作 listen()
方法—Stream
上的所有其他方法都會呼叫 listen()
才能運作。
listen()
方法允許您開始監聽串流。在您執行此操作之前,串流是一個惰性物件,描述您想要看到的事件。當您監聽時,會傳回一個 StreamSubscription 物件,它代表產生事件的活動串流。這類似於 Iterable
如何只是物件的集合,但迭代器是執行實際迭代的物件。
串流訂閱允許您暫停訂閱、在暫停後恢復訂閱,以及完全取消訂閱。您可以設定回呼,以便在每個資料事件或錯誤事件發生時以及串流關閉時呼叫。
其他資源
#請閱讀以下文件,以取得關於在 Dart 中使用串流和非同步程式設計的更多詳細資訊。
- 在 Dart 中建立串流,一篇關於建立您自己的串流的文章
- Futures 和錯誤處理,一篇解釋如何使用 Future API 處理錯誤的文章
- 非同步支援,語言導覽中的一個章節
- Stream API 參考文件
除非另有說明,否則本網站上的文件反映的是 Dart 3.7.1 版本。頁面上次更新於 2025-02-13。 檢視原始碼 或 回報問題。