Skip to content
40 changes: 30 additions & 10 deletions src/client/testing/testController/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@ export function createTestingDeferred(): Deferred<void> {
return createDeferred<void>();
}

// Maximum time (ms) to wait for the result pipe to drain after the subprocess exits.
// Acts as a backstop in case `reader.onClose` never fires (e.g. abnormal subprocess exit,
// platform-specific named-pipe quirks) so the adapter's `finally` block can never hang.
export const RESULT_PIPE_DRAIN_TIMEOUT_MS = 5_000;

/**
* Awaits `deferred.promise` but resolves after at most `timeoutMs` so callers
* cannot hang indefinitely if the underlying event source never fires.
*/
export async function awaitDeferredWithTimeout<T>(deferred: Deferred<T>, timeoutMs: number): Promise<void> {
let timeoutHandle: NodeJS.Timeout | undefined;
try {
await Promise.race([
deferred.promise,
new Promise<void>((resolve) => {
timeoutHandle = setTimeout(() => {
traceVerbose(`awaitDeferredWithTimeout: timed out after ${timeoutMs}ms; resolving anyway.`);
resolve();
}, timeoutMs);
Comment on lines +45 to +49
}),
]);
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
}
}

interface ExecutionResultMessage extends Message {
params: ExecutionTestPayload;
}
Expand Down Expand Up @@ -89,6 +117,8 @@ export async function startRunResultNamedPipe(
traceVerbose('Starting Test Result named pipe');
const pipeName: string = generateRandomPipeName('python-test-results');

// `cancellationToken` only cancels pipe creation; disposal is driven by
// `reader.onClose` so buffered results are not dropped on cancel.
const reader = await createReaderPipe(pipeName, cancellationToken);
traceVerbose(`Test Results named pipe ${pipeName} connected`);
let disposables: Disposable[] = [];
Expand All @@ -99,14 +129,6 @@ export async function startRunResultNamedPipe(
deferredTillServerClose.resolve();
});

if (cancellationToken) {
disposables.push(
cancellationToken?.onCancellationRequested(() => {
traceLog(`Test Result named pipe ${pipeName} cancelled`);
disposable.dispose();
}),
);
}
disposables.push(
reader,
reader.listen((data: Message) => {
Expand All @@ -115,9 +137,7 @@ export async function startRunResultNamedPipe(
dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload);
}),
reader.onClose(() => {
// this is called once the server close, once per run instance
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
// dispose of all data listeners and cancelation listeners
disposable.dispose();
}),
reader.onError((error) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
project,
);
} finally {
await deferredTillServerClose.promise;
await utils.awaitDeferredWithTimeout(deferredTillServerClose, utils.RESULT_PIPE_DRAIN_TIMEOUT_MS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ export class UnittestTestExecutionAdapter implements ITestExecutionAdapter {
cSource.token, // token to cancel
);
runInstance.token.onCancellationRequested(() => {
console.log(`Test run cancelled, resolving 'till TillAllServerClose' deferred for ${uri.fsPath}.`);
// if canceled, stop listening for results
deferredTillServerClose.resolve();
console.log(`Test run cancelled for ${uri.fsPath}; waiting for result pipe to drain.`);
Comment on lines 72 to +73
// Don't resolve the deferred here: the pipe must drain first.
// `reader.onClose` in `startRunResultNamedPipe` will resolve it
// once the subprocess closes its end of the pipe.
});
try {
await this.runTestsNew(
Expand All @@ -89,7 +90,7 @@ export class UnittestTestExecutionAdapter implements ITestExecutionAdapter {
} catch (error) {
traceError(`Error in running unittest tests: ${error}`);
} finally {
await deferredTillServerClose.promise;
await utils.awaitDeferredWithTimeout(deferredTillServerClose, utils.RESULT_PIPE_DRAIN_TIMEOUT_MS);
}
}

Expand Down
202 changes: 200 additions & 2 deletions src/test/testing/testController/utils.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ import * as assert from 'assert';
import * as sinon from 'sinon';
import * as fs from 'fs';
import * as path from 'path';
import { CancellationToken, TestController, TestItem, Uri, Range, Position } from 'vscode';
import { writeTestIdsFile, populateTestTree } from '../../../client/testing/testController/common/utils';
import { CancellationToken, CancellationTokenSource, TestController, TestItem, Uri, Range, Position } from 'vscode';
import {
Emitter,
Event,
MessageReader,
PartialMessageInfo,
Disposable as RpcDisposable,
DataCallback,
} from 'vscode-jsonrpc';
import { Message } from 'vscode-jsonrpc';
import {
writeTestIdsFile,
populateTestTree,
startRunResultNamedPipe,
awaitDeferredWithTimeout,
} from '../../../client/testing/testController/common/utils';
import { createDeferred, Deferred } from '../../../client/common/utils/async';
import * as namedPipes from '../../../client/common/pipes/namedPipes';
import { EXTENSION_ROOT_DIR } from '../../../client/constants';
import {
DiscoveredTestNode,
DiscoveredTestItem,
ExecutionTestPayload,
ITestResultResolver,
} from '../../../client/testing/testController/common/types';
import { RunTestTag, DebugTestTag } from '../../../client/testing/testController/common/testItemUtilities';
Expand Down Expand Up @@ -752,3 +769,184 @@ suite('populateTestTree tests', () => {
assert.deepStrictEqual(mockTestItem2.range, new Range(new Position(6, 0), new Position(7, 0)));
});
});

suite('startRunResultNamedPipe drain-on-cancel tests', () => {
let sandbox: sinon.SinonSandbox;
let createReaderPipeStub: sinon.SinonStub;

// Minimal `MessageReader` fake exposing only what `startRunResultNamedPipe` uses.
class FakeMessageReader implements MessageReader {
private _onClose = new Emitter<void>();

private _onError = new Emitter<Error>();

private _onPartialMessage = new Emitter<PartialMessageInfo>();

private _callback: DataCallback | undefined;

public disposed = false;

public onError: Event<Error> = this._onError.event;

public onClose: Event<void> = this._onClose.event;

public onPartialMessage: Event<PartialMessageInfo> = this._onPartialMessage.event;

public listen(callback: DataCallback): RpcDisposable {
this._callback = callback;
return {
dispose: () => {
this._callback = undefined;
},
};
}

public dispose(): void {
this.disposed = true;
this._onClose.dispose();
this._onError.dispose();
this._onPartialMessage.dispose();
}

// Test helpers.
public emit(message: Message): void {
this._callback?.(message);
}

public hasListener(): boolean {
return this._callback !== undefined;
}

public fireClose(): void {
this._onClose.fire();
}
}

setup(() => {
sandbox = sinon.createSandbox();
});

teardown(() => {
sandbox.restore();
});

function makeMessage(payload: Partial<ExecutionTestPayload>): Message {
return ({ jsonrpc: '2.0', params: payload } as unknown) as Message;
}
Comment on lines +833 to +835

test('cancellation alone does NOT resolve deferredTillServerClose and does NOT detach the listener (drain not interrupted)', async () => {
const reader = new FakeMessageReader();
createReaderPipeStub = sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);

const received: ExecutionTestPayload[] = [];
const deferredTillServerClose: Deferred<void> = createDeferred<void>();
const cancelSource = new CancellationTokenSource();

await startRunResultNamedPipe((payload) => received.push(payload), deferredTillServerClose, cancelSource.token);

assert.ok(createReaderPipeStub.calledOnce, 'createReaderPipe should be called once');
assert.ok(reader.hasListener(), 'reader should have a listener registered before cancel');

// Trigger cancellation.
cancelSource.cancel();

// Yield to let any synchronous-then-microtask handlers run.
await new Promise((r) => setImmediate(r));

assert.strictEqual(
reader.disposed,
false,
'reader must NOT be disposed by cancellation alone (otherwise buffered data would be lost)',
);
assert.ok(reader.hasListener(), 'data listener must remain attached after cancel so the drain can continue');
assert.strictEqual(
(deferredTillServerClose as Deferred<void>).completed,
false,
'deferredTillServerClose must NOT resolve on cancellation; it should only resolve when the pipe closes',
);

cancelSource.dispose();
});

test('data emitted after cancellation is still delivered to the callback (drain works)', async () => {
const reader = new FakeMessageReader();
sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);

const received: ExecutionTestPayload[] = [];
const deferredTillServerClose: Deferred<void> = createDeferred<void>();
const cancelSource = new CancellationTokenSource();

await startRunResultNamedPipe((payload) => received.push(payload), deferredTillServerClose, cancelSource.token);

// Simulate the debug-path race: cancel fires while results are still buffered.
cancelSource.cancel();
await new Promise((r) => setImmediate(r));

// Buffered messages arrive after cancellation.
reader.emit(makeMessage({ cwd: 'a' }));
reader.emit(makeMessage({ cwd: 'b' }));

assert.strictEqual(received.length, 2, 'messages emitted after cancel must still reach the callback');
assert.deepStrictEqual(
received.map((p) => p.cwd),
['a', 'b'],
'all buffered results delivered in order',
);

// Subprocess closes its end of the pipe -> onClose fires -> dispose.
reader.fireClose();
await deferredTillServerClose.promise;

assert.strictEqual(reader.disposed, true, 'reader disposed via onClose path');

cancelSource.dispose();
});

test('reader.onClose resolves deferredTillServerClose and disposes the reader (natural completion path, no cancellation)', async () => {
const reader = new FakeMessageReader();
sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);

const deferredTillServerClose: Deferred<void> = createDeferred<void>();

await startRunResultNamedPipe(
() => {
/* no-op */
},
deferredTillServerClose,
undefined,
);

assert.strictEqual(
(deferredTillServerClose as Deferred<void>).completed,
false,
'deferred unresolved before close',
);

reader.fireClose();
await deferredTillServerClose.promise;

assert.strictEqual(reader.disposed, true, 'reader disposed when onClose fires');
});
});

suite('awaitDeferredWithTimeout', () => {
test('resolves promptly when the deferred resolves before the timeout', async () => {
const deferred = createDeferred<void>();
const started = Date.now();
const waiter = awaitDeferredWithTimeout(deferred, 5000);
setTimeout(() => deferred.resolve(), 10);
await waiter;
const elapsed = Date.now() - started;
assert.ok(elapsed < 1000, `should resolve well before timeout, took ${elapsed}ms`);
});

test('resolves after the timeout when the deferred never settles (no hang)', async () => {
const deferred = createDeferred<void>();
const started = Date.now();
await awaitDeferredWithTimeout(deferred, 50);
const elapsed = Date.now() - started;
assert.ok(elapsed >= 50, `should wait at least the timeout, took ${elapsed}ms`);
assert.ok(elapsed < 2000, `should not hang well beyond timeout, took ${elapsed}ms`);
assert.strictEqual(deferred.completed, false, 'underlying deferred remains unresolved');
});
});
Loading