內容

此頁面討論一些使用 Isolate API 來實作 Isolate 的範例。

當您的應用程式處理的運算大到足以暫時阻擋其他運算時,您應該使用 Isolate。最常見的範例是在 Flutter 應用程式中,當您需要執行可能會導致 UI 沒有回應的大型運算時。

沒有任何規則說明您必須在何時使用 Isolate,但以下是一些它們可能派得上用場的更多情況

  • 剖析和解碼異常大型的 JSON 資訊塊。
  • 處理並壓縮照片、音訊和影片。
  • 轉換音訊和影片檔案。
  • 對大型清單或檔案系統執行複雜的搜尋和篩選。
  • 執行 I/O,例如與資料庫通訊。
  • 處理大量網路要求。

實作一個簡單的執行緒 Isolate

#

這些範例實作一個產生簡單工作者 Isolate 的主要 Isolate。 Isolate.run() 簡化了設定和管理工作者 Isolate 的步驟

  1. 產生 (啟動並建立) 一個 Isolate。
  2. 在產生的 Isolate 上執行一個函式。
  3. 擷取結果。
  4. 將結果傳回主要 Isolate。
  5. 工作完成後終止 Isolate。
  6. 檢查、擷取並將例外和錯誤擲回主要 Isolate。

在新的 Isolate 中執行現有方法

#
  1. 呼叫 run() 以產生一個新的 Isolate (一個 背景工作者),直接在 主要 Isolate 中,同時 main() 等待結果
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 將您要讓工作者 Isolate 執行的函式傳遞給它,作為其第一個引數。在此範例中,它是現有的函式 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 會取得 _readAndParseJson() 傳回的結果,並將值傳回主要 Isolate,關閉工作者 Isolate。

  2. 工作者 Isolate 會將儲存結果的記憶體傳輸到主要 Isolate。它不會複製資料。工作者 Isolate 會執行驗證,以確保允許傳輸這些物件。

_readAndParseJson() 是現有的非同步函式,可以輕易地在主要 Isolate 中直接執行。改用 Isolate.run() 來執行它,可以啟用並行處理。工作者 Isolate 會完全抽象化 _readAndParseJson() 的運算。它可以在不阻擋主要 Isolate 的情況下完成。

Isolate.run() 的結果永遠是 Future,因為主要 Isolate 中的程式碼會繼續執行。工作者 Isolate 執行的運算是同步還是非同步,並不會影響主要 Isolate,因為它無論如何都會並行執行。

如需完整的程式,請查看 send_and_receive.dart 範例。

使用 Isolate 傳送封閉函數

#

您也可以使用 run() 建立一個簡單的工作者 Isolate,方法是在主要 Isolate 中直接使用函式文字或閉包。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此範例與前一個範例達成相同的目的。一個新的 Isolate 會產生、運算一些東西,並傳回結果。

不過,現在 Isolate 會傳送一個 閉包。閉包的限制比典型的命名函式少,無論是在其功能上,還是如何寫入程式碼中。在此範例中,Isolate.run() 會執行看起來像是本地程式碼的內容,並行執行。在這個意義上,您可以想像 run() 的作用就像一個控制流程運算子,用於「並行執行」。

使用埠在 Isolate 之間傳送多則訊息

#

短暫存在的 Isolate 很方便使用,但需要效能開銷才能產生新的 Isolate,並將物件從一個 Isolate 複製到另一個 Isolate。如果您的程式碼依賴於使用 Isolate.run 重複執行相同的運算,您可以透過建立不會立即結束的長駐 Isolate 來提升效能。

為執行此操作,您可以使用一些低階 Isolate API,其中 Isolate.run 會抽象化

本節將說明在一個新產生的 Isolate 與 主 Isolate 之間建立雙向通訊所需的步驟。第一個範例 基本通訊埠 會在高階層級介紹此程序。第二個範例 穩健的通訊埠 會逐步為第一個範例加入更多實用的實際功能。

ReceivePortSendPort

#

在 Isolate 之間設定長駐通訊需要兩個類別(除了 Isolate 之外):ReceivePortSendPort。這些通訊埠是 Isolate 之間唯一可以通訊的方式。

ReceivePort 是處理從其他 Isolate 傳送訊息的物件。這些訊息會透過 SendPort 傳送。

通訊埠的行為類似於 Stream 物件(事實上,接收通訊埠會實作 Stream!)您可以將 SendPortReceivePort 視為 Stream 的 StreamController 和監聽器。SendPort 類似於 StreamController,因為您可以使用 SendPort.send() 方法 將訊息「加入」其中,而這些訊息會由監聽器處理,在此情況下為 ReceivePortReceivePort 會處理它接收到的訊息,方法是將這些訊息作為引數傳遞給您提供的回呼函式。

設定通訊埠

#

一個新產生的 Isolate 只有透過 Isolate.spawn 呼叫接收到的資訊。如果您需要主 Isolate 在建立之後繼續與產生的 Isolate 通訊,您必須設定一個通訊頻道,讓產生的 Isolate 可以將訊息傳送給主 Isolate。Isolate 只能透過訊息傳遞來通訊。它們無法「看到」彼此的記憶體,這就是「Isolate」這個名稱的由來。

若要設定這個雙向通訊,請先在主 Isolate 中建立一個 ReceivePort,然後在使用 Isolate.spawn 產生它時,將它的 SendPort 作為引數傳遞給新的 Isolate。新的 Isolate 然後會建立它自己的 ReceivePort,並將 它的 SendPort 傳回主 Isolate 傳遞給它的 SendPort 上。主 Isolate 會接收這個 SendPort,現在雙方都有開放的頻道來傳送和接收訊息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔離中建立一個 ReceivePortSendPort 會自動建立為 ReceivePort 上的屬性。
  2. 使用 Isolate.spawn() 產生工作執行緒隔離。
  3. ReceivePort.sendPort 的參照傳遞為第一則訊息給工作執行緒隔離。
  4. 在工作執行緒隔離中建立另一個新的 ReceivePort
  5. 將工作執行緒隔離的 ReceivePort.sendPort 的參照傳遞為第一則訊息返回給主隔離。

除了建立連接埠和設定通訊外,您還需要告訴連接埠在收到訊息時要執行什麼動作。這會使用各個 ReceivePort 上的 listen 方法來執行。

A figure showing events being fed, one by one, into the event loop

  1. 透過主隔離的參照傳送訊息給工作執行緒隔離的 SendPort
  2. 透過工作執行緒隔離的 ReceivePort 上的監聽器接收並處理訊息。這是執行您想要從主隔離中移出的運算的地方。
  3. 透過工作執行緒隔離的參照傳送回傳訊息給主隔離的 SendPort
  4. 透過主隔離的 ReceivePort 上的監聽器接收訊息。

基本埠範例

#

此範例說明如何設定一個長駐的工作執行緒隔離,並在它與主隔離之間建立雙向通訊。此程式碼使用將 JSON 文字傳送到新隔離的範例,在此隔離中會對 JSON 進行剖析和解碼,然後再傳送回主隔離。

步驟 1:定義工作執行緒類別

#

首先,為您的背景工作執行緒隔離建立一個類別。此類別包含您需要的所有功能,以

  • 產生隔離。
  • 傳送訊息給該隔離。
  • 讓隔離解碼一些 JSON。
  • 將解碼的 JSON 傳送回主隔離。

此類別公開兩個公開方法:一個產生工作執行緒隔離,另一個處理傳送訊息給該工作執行緒隔離。

本範例中其餘各節將逐步示範如何填入類別方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步驟 2:產生工作者 Isolate

#

Worker.spawn 方法是您將用來群組建立工作者 Isolate 以及確保它可以接收和傳送訊息的程式碼。

  • 首先,建立一個 ReceivePort。這允許主 Isolate 接收從新產生的工作者 Isolate 傳送的訊息。
  • 接著,新增一個監聽器至接收埠,以處理工作者 Isolate 將傳送回來的訊息。傳遞給監聽器的回呼函式 _handleResponsesFromIsolate 將於 步驟 4 中說明。
  • 最後,使用 Isolate.spawn 產生工作者 Isolate。它預期兩個引數:一個要在工作者 Isolate 上執行的函式(於 步驟 3 中說明),以及接收埠的 sendPort 屬性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 引數將在工作者 Isolate 上呼叫時,作為引數傳遞給回呼函式 (_startRemoteIsolate)。這是確保工作者 Isolate 有辦法將訊息傳送回主 Isolate 的第一步。

步驟 3:在工作者 Isolate 上執行程式碼

#

在此步驟中,您定義方法 _startRemoteIsolate,該方法傳送至工作者 Isolate,以便在產生時執行。此方法就像工作者 Isolate 的「main」方法。

  • 首先,建立另一個新的 ReceivePort。此埠接收來自主 Isolate 的未來訊息。
  • 接著,將該埠的 SendPort 傳送回主 Isolate。
  • 最後,新增一個監聽器至新的 ReceivePort。此監聽器處理主 Isolate 傳送至工作者 Isolate 的訊息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

工作者 ReceivePort 上的監聽器會解碼從主 Isolate 傳遞的 JSON,然後將解碼的 JSON 傳送回主 Isolate。

此監聽器是從主 Isolate 傳送至工作者 Isolate 的訊息的進入點。這是您唯一一次機會告訴工作者 Isolate 在未來執行什麼程式碼。

步驟 4:在主 Isolate 上處理訊息

#

最後,您需要告訴主 Isolate 如何處理從工作者 Isolate 傳送回主 Isolate 的訊息。為此,您需要填寫 _handleResponsesFromIsolate 方法。回想一下,這個方法會傳遞給 receivePort.listen 方法,如 步驟 2 所述

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

也回想一下,您在 步驟 3 中傳送一個 SendPort 回到主 Isolate。這個方法會處理接收那個 SendPort,以及處理未來的訊息(將會是已解碼的 JSON)。

  • 首先,檢查訊息是否為 SendPort。如果是,將那個埠指派給類別的 _sendPort 屬性,以便稍後可以用來傳送訊息。
  • 接著,檢查訊息是否為 Map<String, dynamic> 類型,這是已解碼 JSON 的預期類型。如果是,使用您的應用程式特定邏輯來處理那個訊息。在本範例中,會列印訊息。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步驟 5:加入一個 Completer 以確保您的 Isolate 已設定

#

若要完成類別,定義一個名為 parseJson 的公開方法,負責傳送訊息到工作者 Isolate。它也需要確保在 Isolate 完全設定之前可以傳送訊息。若要處理這個問題,請使用 Completer

  • 首先,加入一個名為 Completer 的類別層級屬性,並將它命名為 _isolateReady
  • 接著,在 _handleResponsesFromIsolate 方法(在 步驟 4 中建立)中,如果訊息為 SendPort,請呼叫 Completer 上的 complete()
  • 最後,在 parseJson 方法中,在加入 _sendPort.send 之前,加入 await _isolateReady.future。這可確保在產生工作者 Isolate 已將它的 SendPort 傳回主 Isolate 之前,不會傳送任何訊息到工作者 Isolate。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整的範例

#
展開以查看完整的範例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

穩健埠範例

#

前一個範例 說明了設定具有雙向通訊的長駐 Isolate 所需的基本建構區塊。如前所述,那個範例缺少一些重要的功能,例如錯誤處理、在不再使用埠時關閉埠的能力,以及在某些情況下訊息順序不一致的問題。

這個範例透過建立一個具有這些附加功能等更多功能且遵循更佳設計模式的長駐工作者 Isolate,來擴充第一個範例中的資訊。雖然這段程式碼與第一個範例有相似之處,但它並非那個範例的延伸。

步驟 1:定義工作執行緒類別

#

首先,為您的背景工作執行緒隔離建立一個類別。此類別包含您需要的所有功能,以

  • 產生隔離。
  • 傳送訊息給該隔離。
  • 讓隔離解碼一些 JSON。
  • 將解碼的 JSON 傳送回主隔離。

此類別公開三個方法:一個用於建立工作者隔離區,一個用於處理傳送訊息至該工作者隔離區,以及一個在不再使用時關閉埠的方法。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._commands, this._responses) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步驟 2:在 Worker.spawn 方法中建立 RawReceivePort

#

在衍生隔離區之前,您需要建立 RawReceivePort,這是一個較低階的 ReceivePort。使用 RawReceivePort 是較佳的模式,因為它讓您可以將隔離區啟動邏輯與處理隔離區上訊息傳遞的邏輯分開。

Worker.spawn 方法中

  • 首先,建立 RawReceivePort。此 ReceivePort 僅負責接收來自工作者隔離區的初始訊息,這會是一個 SendPort
  • 接下來,建立一個 Completer,這會指出隔離區何時準備好接收訊息。當它完成時,它會傳回一個包含 ReceivePortSendPort 的紀錄。
  • 接下來,定義 RawReceivePort.handler 屬性。此屬性是一個 Function?,其行為類似於 ReceivePort.listener。當此埠收到訊息時,就會呼叫此函式。
  • 在處理函式中,呼叫 connection.complete()。此方法預期會有一個包含 ReceivePortSendPort紀錄 作為引數。SendPort 是從工作者隔離區傳送的初始訊息,它會在下一步中指派至類別層級的 SendPort 名為 _commands
  • 接著,使用 ReceivePort.fromRawReceivePort 建構函式建立新的 ReceivePort,並傳入 initPort
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

先建立 RawReceivePort,再建立 ReceivePort,稍後就能為 ReceivePort.listen 新增新的回呼。反之,如果你直接建立 ReceivePort,就只能新增一個 listener,因為 ReceivePort 實作 Stream,而不是 BroadcastStream

實際上,這能讓你將隔離啟動邏輯與在建立通訊後處理接收訊息的邏輯分開。隨著其他方法中的邏輯增加,這個好處會越來越明顯。

步驟 3:使用 Isolate.spawn 產生工作執行緒隔離

#

這個步驟會繼續填入 Worker.spawn 方法。你會新增產生隔離所需的程式碼,並從這個類別傳回 Worker 的執行個體。在此範例中,呼叫 Isolate.spawn 會包在 try/catch 區塊 中,這能確保如果隔離無法啟動,initPort 會關閉,而且不會建立 Worker 物件。

  • 首先,嘗試在 try/catch 區塊中產生工作執行緒隔離。如果產生工作執行緒隔離失敗,請關閉在先前步驟中建立的接收埠。傳遞給 Isolate.spawn 的方法會在後續步驟中介紹。
  • 接著,等待 connection.future,並從它傳回的記錄中解構傳送埠和接收埠。
  • 最後,呼叫 Worker 的私人建構函式傳入來自該完成器的埠,並傳回 Worker 的執行個體。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(sendPort, receivePort);
  }

請注意,在此範例中(與 前一個範例 相比),Worker.spawn 會作為這個類別的非同步靜態建構函式,而且是建立 Worker 執行個體的唯一方法。這會簡化 API,讓建立 Worker 執行個體的程式碼更簡潔。

步驟 4:完成隔離設定程序

#

在此步驟中,您將完成基本的隔離設定程序。這幾乎完全與前一個範例相關,而且沒有新的概念。有一個細微的變更,在於程式碼被拆分為更多方法,這是一種設計實務,可讓您透過本範例的其餘部分來新增更多功能。如需設定隔離的基本程序的深入說明,請參閱基本埠範例

首先,建立從 `Worker.spawn` 方法傳回的私人建構函式。在建構函式主體中,將監聽器新增至主隔離所使用的接收埠,並將一個尚未定義的方法傳遞至該監聽器,稱為 `_handleResponsesFromIsolate`。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接著,將程式碼新增至 `_startRemoteIsolate`,負責初始化工作人員隔離的埠。回想該方法已傳遞至 `Isolate.spawn` 中的 `Worker.spawn` 方法,而且它會將主隔離的 `SendPort` 傳遞為引數。

  • 建立新的 `ReceivePort`。
  • 將該埠的 `SendPort` 傳送回主隔離。
  • 呼叫名為 `_handleCommandsToIsolate` 的新方法,並將新的 `ReceivePort` 和來自主隔離的 `SendPort` 傳遞為引數。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接著,新增 `_handleCommandsToIsolate` 方法,負責接收來自主隔離的訊息、在工作人員隔離中解碼 json,並將解碼後的 json 傳送回作為回應。

  • 首先,在工作人員隔離的 `ReceivePort` 上宣告監聽器。
  • 在新增至監聽器的回呼中,嘗試在try/catch 區塊中解碼從主隔離傳遞的 JSON。如果解碼成功,將解碼後的 JSON 傳送回主隔離。
  • 如果發生錯誤,傳送回RemoteError
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接著,新增 `_handleResponsesFromIsolate` 方法的程式碼。

  • 首先,檢查訊息是否為 `RemoteError`,如果是,您應該 `throw` 該錯誤。
  • 否則,列印訊息。在後續步驟中,您會更新此程式碼以傳回訊息,而不是列印它們。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最後,新增 `parseJson` 方法,這是一個公開方法,允許外部程式碼將 JSON 傳送至工作者隔離區進行解碼。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您將在下一步更新此方法。

步驟 5:同時處理多則訊息

#

目前,如果您快速將訊息傳送至工作者隔離區,隔離區會以「完成順序」傳送解碼後的 JSON 回應,而非傳送順序。您無法判斷哪個回應對應到哪則訊息。

在此步驟中,您將透過為每則訊息提供識別碼,並使用 `Completer` 物件來確保外部程式碼呼叫 `parseJson` 時,傳回給該呼叫者的回應是正確的回應,來修正此問題。

首先,為 `Worker` 新增兩個類別層級屬性

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

`_activeRequests` 地圖將傳送至工作者隔離區的訊息與 `Completer` 關聯起來。 `_activeRequests` 中使用的金鑰取自 `_idCounter`,而隨著傳送更多訊息,`_idCounter` 會遞增。

接著,更新 `parseJson` 方法,以便在將訊息傳送至工作者隔離區之前建立完成器。

  • 首先建立 `Completer`。
  • 接著,遞增 `_idCounter`,以便將每個 `Completer` 與唯一的數字關聯起來。
  • 在 `_activeRequests` 地圖中新增一則項目,其中金鑰為 `_idCounter` 的目前數字,而完成器為值。
  • 將訊息傳送至工作者隔離區,並附上識別碼。由於您只能透過 `SendPort` 傳送一個值,因此請將識別碼和訊息包裝在 記錄 中。
  • 最後,傳回完成器的未來,最終將包含來自工作者隔離區的回應。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您還需要更新 `_handleResponsesFromIsolate` 和 `_handleCommandsToIsolate` 來處理此系統。

在 `_handleCommandsToIsolate` 中,您需要考量 `message` 是包含兩個值的記錄,而非僅有 JSON 文字。透過從 `message` 解構值來執行此操作。

然後,在解碼 JSON 後,更新對 `sendPort.send` 的呼叫,以使用記錄將識別碼和解碼後的 JSON 傳回至主隔離區。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最後,更新 `_handleResponsesFromIsolate`。

  • 首先,再次從訊息引數中解構識別碼和回應。
  • 然後,從 `_activeRequests` 地圖中移除對應於此請求的完成器。
  • 最後,不要擲回錯誤或列印解碼後的 JSON,而是完成完成器,傳入回應。完成後,回應將傳回至在主隔離區呼叫 `parseJson` 的程式碼。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步驟 6:新增關閉連接埠的功能

#

當您的程式碼不再使用分離時,您應該關閉主分離和工作者分離的連接埠。

  • 首先,新增一個類別層級的布林值,用來追蹤連接埠是否已關閉。
  • 然後,新增 `Worker.close` 方法。在此方法中
    • 將 `_closed` 更新為 true。
    • 傳送最後一則訊息給工作者分離。這則訊息是一個字串,內容為「關閉」,但它可以是您想要的任何物件。您會在下一段程式碼片段中使用它。
  • 最後,檢查 `_activeRequests` 是否為空。如果是,關閉名為 `_responses` 的主分離 `ReceivePort`。
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 接下來,您需要在工作者分離中處理「關閉」訊息。將以下程式碼新增到 `_handleCommandsToIsolate` 方法。這段程式碼會檢查訊息是否為字串「關閉」。如果是,它會關閉工作者分離的 `ReceivePort`,然後傳回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最後,您應該新增程式碼,在嘗試傳送訊息之前檢查連接埠是否已關閉。在 `Worker.parseJson` 方法中新增一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整的範例

#
在此展開以查看完整範例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}