Skip to content

Commit

Permalink
fix waitForEnd reject
Browse files Browse the repository at this point in the history
  • Loading branch information
UrielCh committed Nov 4, 2022
1 parent 1a3817f commit ac95343
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## V4.1.13 (2022-11-04)
* improve file transfert

## V4.1.12 (2022-11-04)
* improve file transfert
* improve install APK
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@u4/adbkit",
"version": "4.1.12",
"version": "4.1.13",
"description": "A Typescript client for the Android Debug Bridge.",
"keywords": [
"adb",
Expand Down
20 changes: 10 additions & 10 deletions src/adb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ export default class Sync extends EventEmitter {
* @param mode Optional. The mode of the file. Defaults to `0644`.
* @returns A `PushTransfer` instance. See below for details.
*/
public async push(contents: string | Readable, path: string, mode?: number): Promise<PushTransfer> {
public async push(contents: string | Readable, path: string, mode?: number, streamName = 'stream'): Promise<PushTransfer> {
if (typeof contents === 'string') {
return this.pushFile(contents, path, mode);
} else {
return this.pushStream(contents, path, mode);
return this.pushStream(contents, path, mode, streamName);
}
}
/**
Expand All @@ -222,7 +222,8 @@ export default class Sync extends EventEmitter {
} catch (e) {
throw Error(`can not read file "${file}"`);
}
return this.pushStream(fs.createReadStream(file), path, mode);
const stream = fs.createReadStream(file);
return this.pushStream(stream, path, mode, file);
}

/**
Expand All @@ -233,10 +234,10 @@ export default class Sync extends EventEmitter {
* @param mode See `sync.push()` for details.
* @returns See `sync.push()` for details.
*/
public async pushStream(stream: Readable, path: string, mode = DEFAULT_CHMOD): Promise<PushTransfer> {
public async pushStream(stream: Readable, path: string, mode = DEFAULT_CHMOD, streamName = 'stream'): Promise<PushTransfer> {
mode |= Stats.S_IFREG;
await this.sendCommandWithArg(Protocol.SEND, `${path},${mode}`);
return this._writeData(stream, Math.floor(Date.now() / 1000));
return this._writeData(stream, Math.floor(Date.now() / 1000), streamName);
}

/**
Expand Down Expand Up @@ -268,15 +269,15 @@ export default class Sync extends EventEmitter {
return Sync.temp(path);
}

private _writeData(stream: Readable, timeStamp: number): PushTransfer {
private _writeData(stream: Readable, timeStamp: number, streamName: string): PushTransfer {
const transfer = new PushTransfer();

let readableListener: () => void;
let connErrorListener: (err: Error) => void;
let endListener: () => void;
let errorListener: (err: Error) => void;

const writeData = (): Promise<unknown> => new Promise((resolve, reject) => {
const writeData = () => new Promise<void>((resolve, reject) => {

const writer = Promise.resolve();
endListener = () => {
Expand All @@ -301,12 +302,12 @@ export default class Sync extends EventEmitter {

readableListener = () => writer.then(writeAll);
stream.on('readable', readableListener);
errorListener = (err) => reject(err);
errorListener = (err) => reject(new Error(`Source Error: ${err.message} while transfering ${streamName}`));
stream.on('error', errorListener);
connErrorListener = (err: Error) => {
stream.destroy(err);
this.connection.end();
reject(err);
reject(new Error(`Target Error: ${err.message} while transfering ${streamName}`));
};
this.connection.on('error', connErrorListener);
})
Expand All @@ -318,7 +319,6 @@ export default class Sync extends EventEmitter {
// writer.cancel();
});


// While I can't think of a case that would break this double-Promise
// writer-reader arrangement right now, it's not immediately obvious
// that the code is correct and it may or may not have some failing
Expand Down
23 changes: 14 additions & 9 deletions src/adb/sync/pulltransfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ export default class PullTransfer extends Stream.PassThrough {
})
}

private waitForEndPromise?: Promise<void>;
/**
* get end notification using Promise
*/
public waitForEnd(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const unReg = (cb: () => void) => {
this.off('end', resolve);
this.off('error', reject);
cb();
}
this.on('end', () => unReg(resolve));
this.on('error', () => unReg(reject));
})
if (!this.waitForEndPromise) {
this.waitForEndPromise = new Promise<void>((resolve, reject) => {
const unReg = (cb: () => void, e?: Error) => {
this.off('end', resolve);
this.off('error', () => reject(e));
cb();
}
this.on('end', () => unReg(resolve));
this.on('error', (e) => unReg(reject, e));
})
}
return this.waitForEndPromise;

}
}
9 changes: 5 additions & 4 deletions src/adb/sync/pushtransfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ interface IEmissions {
*/
export default class PushTransfer extends EventEmitter {
private stack: number[] = [];
private waitForEndPromise?: Promise<void>;
public stats = {
bytesTransferred: 0,
};
Expand Down Expand Up @@ -58,19 +57,21 @@ export default class PushTransfer extends EventEmitter {
return this.emit('end');
}


private waitForEndPromise?: Promise<void>;
/**
* get end notification using Promise
*/
public waitForEnd(): Promise<void> {
if (!this.waitForEndPromise) {
this.waitForEndPromise = new Promise<void>((resolve, reject) => {
const unReg = (cb: () => void) => {
const unReg = (cb: () => void, e?: Error) => {
this.off('end', resolve);
this.off('error', reject);
this.off('error', () => reject(e));
cb();
}
this.on('end', () => unReg(resolve));
this.on('error', () => unReg(reject));
this.on('error', (e) => unReg(reject, e));
})
}
return this.waitForEndPromise;
Expand Down
4 changes: 2 additions & 2 deletions test/adb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Sinon from 'sinon';
import Chai, { expect, assert } from 'chai';
import simonChai from 'sinon-chai';
Chai.use(simonChai);
import Adb, { Client } from '../../src/index';
import Adb, { Client } from '../../src/index';
import Sync, { ENOENT } from '../../src/adb/sync';
import Stats from '../../src/adb/sync/stats';
import Entry from '../../src/adb/sync/entry';
Expand Down Expand Up @@ -148,7 +148,7 @@ describe('Sync', () => {
}).finally(done);
});
dt('should return a PullTransfer instance', (done) => {
return forEachSyncDevice( async (sync) => {
return forEachSyncDevice(async (sync) => {
const rval = await sync.pull(SURELY_EXISTING_FILE);
expect(rval).to.be.an.instanceof(PullTransfer);
return rval.cancel();
Expand Down

0 comments on commit ac95343

Please sign in to comment.