Skip to content

Commit

Permalink
Search index in a separate isolate. (#7037)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 14, 2023
1 parent 05e28f5 commit 539f6fb
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ AppEngine version, listed here to ease deployment and troubleshooting.
* Upgraded preview Flutter analysis SDK to `3.14.0-0.2.pre`.
* Note: isolates do not automatically restart after they are closed with uncaught error.
* Note: no control isolate is used, first isolate is used as the HTTP-serving frontend.
* Note: `search` has now a separate index in an isolate that is renewed every 15 minutes.

## `20230907t123500-all`
* Bumped runtimeVersion to `2023.09.05`.
Expand Down
3 changes: 3 additions & 0 deletions app/lib/search/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ List<ApiDocPage> apiDocPagesFromPubData(PubDartdocData pubData) {
class _CombinedSearchIndex implements SearchIndex {
const _CombinedSearchIndex();

@override
bool isReady() => indexInfo().isReady;

@override
IndexInfo indexInfo() => _inMemoryPackageIndex.indexInfo();

Expand Down
3 changes: 1 addition & 2 deletions app/lib/search/handlers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ Future<shelf.Response> _livenessCheckHandler(shelf.Request request) async {

/// Handles /readiness_check requests.
Future<shelf.Response> _readinessCheckHandler(shelf.Request request) async {
final info = await searchIndex.indexInfo();
if (info.isReady) {
if (await searchIndex.isReady()) {
return htmlResponse('OK');
} else {
return htmlResponse('Service Unavailable', status: 503);
Expand Down
11 changes: 11 additions & 0 deletions app/lib/search/search_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,21 @@ class IndexInfo {
'lastUpdateDelta': clock.now().difference(lastUpdated!).toString(),
'updatedPackages': updatedPackages,
};

factory IndexInfo.fromJson(Map<String, dynamic> map) {
final lastUpdated = map['lastUpdated'] as String?;
return IndexInfo(
isReady: map['isReady'] == true,
packageCount: map['packageCount'] as int,
lastUpdated: lastUpdated == null ? null : DateTime.parse(lastUpdated),
updatedPackages: (map['updatedPackages'] as List).cast<String>(),
);
}
}

/// Package search index and lookup.
abstract class SearchIndex {
FutureOr<bool> isReady();
FutureOr<PackageSearchResult> search(ServiceSearchQuery query);
FutureOr<IndexInfo> indexInfo();
}
Expand Down
143 changes: 103 additions & 40 deletions app/lib/service/entrypoint/_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,20 @@ import 'dart:async';
import 'dart:io';
import 'dart:isolate';

import 'package:collection/collection.dart';
import 'package:logging/logging.dart';
import 'package:stack_trace/stack_trace.dart';

import '../services.dart';
import '_messages.dart';
export '_messages.dart';

/// Wrapper method to replace [withServices] into [withFakeServices] for
/// local tests and development.
typedef ServicesWrapperFn = Future<void> Function(Future Function() fn);

/// Marker class for inter-isolate messages.
sealed class Message {}

/// Initializing message send from the controller isolate to the new one.
class EntryMessage extends Message {
final SendPort protocolSendPort;

EntryMessage({
required this.protocolSendPort,
});
}

/// Message sent from the isolate to indicate that it is ready with the initialization.
class ReadyMessage extends Message {
ReadyMessage();
}

/// Message sent from the isolate with arbitrary text.
class DebugMessage extends Message {
final String text;

DebugMessage(this.text);
}
/// The main method to run in the new isolate.
typedef EntryPointFn = Future<void> Function(EntryMessage message);

/// Starts, monitors, stops or restarts isolates that run the same code.
///
Expand All @@ -46,19 +28,27 @@ class DebugMessage extends Message {
class IsolateRunner {
final Logger logger;
final String kind;
final ServicesWrapperFn servicesWrapperFn;
final Future<void> Function(EntryMessage message) entryPoint;
final ServicesWrapperFn? servicesWrapperFn;
final EntryPointFn? entryPoint;
final Uri? spawnUri;

int started = 0;
final _isolates = <_Isolate>[];
bool _closing = false;

IsolateRunner({
IsolateRunner.fn({
required this.logger,
required this.kind,
required this.servicesWrapperFn,
required this.entryPoint,
});
required ServicesWrapperFn this.servicesWrapperFn,
required EntryPointFn this.entryPoint,
}) : spawnUri = null;

IsolateRunner.uri({
required this.logger,
required this.kind,
required Uri this.spawnUri,
}) : entryPoint = null,
servicesWrapperFn = null;

/// Starts [count] new isolates.
Future<void> start(int count) async {
Expand All @@ -74,13 +64,45 @@ class IsolateRunner {
required Duration wait,
}) async {
final isolatesToClose = [..._isolates];
for (final i in isolatesToClose) {
i.markedForRenew = true;
}
await start(count);
await Future.delayed(wait);
for (final i in isolatesToClose) {
await i.close();
}
}

/// Send [RequestMessage] and wait for [ReplyMessage] returning
/// [ReplyMessage.result], or throws [IsolateRequestException]
Future<Object?> sendRequest(
Object payload, {
required Duration timeout,
}) async {
final last = _isolates.lastWhereOrNull((i) =>
i.markedForRenew == false && i._readyMessage?.requestSendPort != null);
if (last == null) {
throw IsolateRequestException('No isolate to process request.');
}

final replyRecievePort = ReceivePort();
try {
final firstFuture = replyRecievePort.first;
final targetSendPort = last._readyMessage!.requestSendPort!;
final requestMessage = RequestMessage(payload, replyRecievePort.sendPort);
targetSendPort.send(requestMessage.encodeAsJson());
final first = await firstFuture.timeout(timeout) as Map<String, dynamic>;
final reply = Message.fromObject(first) as ReplyMessage;
if (reply.isError) {
throw IsolateRequestException(reply.error!);
}
return reply.result;
} finally {
replyRecievePort.close();
}
}

Future<void> _startOne() async {
if (_closing) return;
started++;
Expand All @@ -90,14 +112,19 @@ class IsolateRunner {
group: this,
logger: logger,
id: id,
servicesWrapperFn: servicesWrapperFn,
entryPoint: entryPoint,
);
_isolates.add(isolate);
unawaited(isolate.done.then((_) async {
_isolates.remove(isolate);
}));
await isolate.init();
if (entryPoint != null) {
await isolate.spawnFn(
servicesWrapperFn: servicesWrapperFn!,
entryPoint: entryPoint!,
);
} else {
await isolate.spawnUri(spawnUri: spawnUri!);
}
if (_closing) {
await isolate.close();
return;
Expand All @@ -116,10 +143,10 @@ class IsolateRunner {
/// Starts a worker isolate and returns its runner to control it.
Future<IsolateRunner> startWorkerIsolate({
required Logger logger,
required Future<void> Function(EntryMessage message) entryPoint,
required EntryPointFn entryPoint,
ServicesWrapperFn? servicesWrapperFn,
}) async {
final worker = IsolateRunner(
final worker = IsolateRunner.fn(
logger: logger,
kind: 'worker',
servicesWrapperFn: servicesWrapperFn ?? withServices,
Expand All @@ -129,14 +156,26 @@ Future<IsolateRunner> startWorkerIsolate({
return worker;
}

/// Starts an index isolate and returns its runner to control it.
Future<IsolateRunner> startQueryIsolate({
required Logger logger,
required Uri spawnUri,
}) async {
final worker = IsolateRunner.uri(
logger: logger,
kind: 'query',
spawnUri: spawnUri,
);
await worker.start(1);
return worker;
}

/// Represents a running isolate, with its current status and subscriptions.
class _Isolate {
/// Parent runner that owns this group
final IsolateRunner group;
final Logger logger;
final String id;
final ServicesWrapperFn servicesWrapperFn;
final Future<void> Function(EntryMessage message) entryPoint;

late Isolate _isolate;

Expand All @@ -154,16 +193,18 @@ class _Isolate {

final _doneCompleter = Completer();
late final done = _doneCompleter.future;
bool markedForRenew = false;

_Isolate({
required this.group,
required this.logger,
required this.id,
required this.servicesWrapperFn,
required this.entryPoint,
});

Future<void> init() async {
Future<void> spawnFn({
required ServicesWrapperFn servicesWrapperFn,
required Future<void> Function(EntryMessage message) entryPoint,
}) async {
_isolate = await Isolate.spawn(
_wrapper,
[
Expand All @@ -179,8 +220,30 @@ class _Isolate {
debugName: id,
);

await _init();
}

Future<void> spawnUri({
required Uri spawnUri,
}) async {
_isolate = await Isolate.spawnUri(
spawnUri,
[],
EntryMessage(
protocolSendPort: _protocolReceivePort.sendPort,
).encodeAsJson(),
onError: _errorReceivePort.sendPort,
onExit: _exitReceivePort.sendPort,
errorsAreFatal: true,
debugName: id,
);
await _init();
}

Future<void> _init() async {
final ready = Completer();
_protocolSubscription = _protocolReceivePort.listen((e) {
_protocolSubscription = _protocolReceivePort.listen((event) {
final e = Message.fromObject(event);
if (e is ReadyMessage && !ready.isCompleted) {
_readyMessage = e;
ready.complete();
Expand Down
Loading

0 comments on commit 539f6fb

Please sign in to comment.