diff --git a/README.md b/README.md index 967431358..3acc4636d 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ This repo is a monorepo which contains the core [pg](https://github.com/brianc/n - [pg-query-stream](https://github.com/brianc/node-postgres/tree/master/packages/pg-query-stream) - [pg-connection-string](https://github.com/brianc/node-postgres/tree/master/packages/pg-connection-string) - [pg-protocol](https://github.com/brianc/node-postgres/tree/master/packages/pg-protocol) +- [pg-batch-query](https://github.com/brianc/node-postgres/tree/master/packages/pg-batch-query) ## Documentation diff --git a/packages/pg-batch-query/LICENSE b/packages/pg-batch-query/LICENSE new file mode 100644 index 000000000..6b06ef98d --- /dev/null +++ b/packages/pg-batch-query/LICENSE @@ -0,0 +1,9 @@ +The MIT License (MIT) + +Copyright (c) 2013 Ankush Chadda + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/pg-batch-query/README.md b/packages/pg-batch-query/README.md new file mode 100644 index 000000000..6125bb715 --- /dev/null +++ b/packages/pg-batch-query/README.md @@ -0,0 +1,71 @@ +# pg-batch-query + +Batches queries by using the [Extended query protocol](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY). +Essentially we do the following +- send a single PARSE command to create a named statement. +- send a pair of BIND and EXECUTE commands +- Finally send a SYNC to close the current transaction. + +As [per benchmark tests](./bench.ts), number of queries per seconds gets tripled using batched queries. + +## installation + +```bash +$ npm install pg --save +$ npm install pg-batch-query --save +``` + +## use + +```js +const pg = require('pg') +var pool = new pg.Pool() +const BatchQuery = require('pg-batch-query') + +const batch = new BatchQuery({ + name: 'optional', + text: 'SELECT from foo where bar = $1', + values: [ + ['first'], + ['second'] + ] +}) + +pool.connect((err, client, done) => { + if (err) throw err + const result = client.query(batch).execute() + for (const res of result) { + for (const row of res) { + console.log(row) + } + } +}) +``` + +## contribution + +I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome? + +## license + +The MIT License (MIT) + +Copyright (c) 2013-2020 Ankush Chadda + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/pg-batch-query/bench.ts b/packages/pg-batch-query/bench.ts new file mode 100644 index 000000000..29e75d43a --- /dev/null +++ b/packages/pg-batch-query/bench.ts @@ -0,0 +1,92 @@ +import pg from 'pg' +import BatchQuery from './src' + +const insert = (value) => ({ + text: 'INSERT INTO foobar(name, age) VALUES ($1, $2)', + values: ['joe' + value, value], +}) + +const select = (value) => ({ + text: 'SELECT FROM foobar where name = $1 and age = $2', + values: ['joe' + value, value], +}) + +let counter = 0 + +const simpleExec = async (client, getQuery, count) => { + const query = getQuery(count) + await client.query({ + text: query.text, + values: query.values, + rowMode: 'array', + }) +} + +const batchExec = async (client, getQuery, count) => { + const query = getQuery(count) + + const batchQuery = new BatchQuery({ + name: 'optional'+ counter++, + text: query.text, + values: [ + ['joe1', count], + ['joe2', count], + ['joe3', count], + ['joe4', count], + ['joe5', count] + ] + }) + await client.query(batchQuery).execute() +} + +const bench = async (client, mainMethod, q, time) => { + let start = Date.now() + let count = 0 + while (true) { + await mainMethod(client, q, count) + count++ + if (Date.now() - start > time) { + return count + } + } +} + +const run = async () => { + const client = new pg.Client() + await client.connect() + console.log('start') + await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') + console.log('warmup done') + const seconds = 5 + + for (let i = 0; i < 4; i++) { + let queries = await bench(client, simpleExec, insert, 5 * 1000) + console.log('') + console.log('insert queries:', queries) + console.log('qps', queries / seconds) + console.log('on my laptop best so far seen 16115.4 qps') + + queries = await bench(client, batchExec, insert, 5 * 1000) + console.log('') + console.log('insert batch queries:', queries * 5) + console.log('qps', queries * 5 / seconds) + console.log('on my laptop best so far seen 42646 qps') + + queries = await bench(client, simpleExec, select, 5 * 1000) + console.log('') + console.log('select queries:', queries) + console.log('qps', queries / seconds) + console.log('on my laptop best so far seen 18579.8 qps') + + queries = await bench(client, batchExec, select, 5 * 1000) + console.log('') + console.log('select batch queries:', queries * 5) + console.log('qps', queries * 5 / seconds) + console.log('on my laptop best so far seen 44887 qps') + } + + await client.end() + await client.end() +} + +run().catch((e) => Boolean(console.error(e)) || process.exit(-1)) diff --git a/packages/pg-batch-query/package.json b/packages/pg-batch-query/package.json new file mode 100644 index 000000000..62a5b96b2 --- /dev/null +++ b/packages/pg-batch-query/package.json @@ -0,0 +1,45 @@ +{ + "name": "pg-batch-query", + "version": "1.0.0", + "description": "Postgres Batch Query for performant response time.", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "scripts": { + "build": "rimraf dist && tsc", + "test": "mocha -r ts-node/register test/**/*.ts" + }, + "repository": { + "type": "git", + "url": "git://github.com/brianc/node-postgres.git", + "directory": "packages/pg-batch-query" + }, + "keywords": [ + "postgres", + "batch-query", + "pg", + "query" + ], + "files": [ + "/dist/*{js,ts,map}", + "/src" + ], + "author": "Ankush Chadda", + "license": "MIT", + "bugs": { + "url": "https://github.com/brianc/node-postgres/issues" + }, + "devDependencies": { + "@types/chai": "^4.2.13", + "@types/mocha": "^8.0.3", + "@types/node": "^14.0.0", + "@types/pg": "^8.10.2", + "eslint-plugin-promise": "^6.0.1", + "mocha": "^7.1.2", + "pg": "^8.10.0", + "ts-node": "^10.9.1", + "typescript": "^5.0.4" + }, + "peerDependencies": { + "pg": "^8" + } +} diff --git a/packages/pg-batch-query/src/index.ts b/packages/pg-batch-query/src/index.ts new file mode 100644 index 000000000..2bc0063ab --- /dev/null +++ b/packages/pg-batch-query/src/index.ts @@ -0,0 +1,122 @@ +import { Submittable, Connection, QueryResult } from 'pg' +const Result = require('pg/lib/result.js') +const utils = require('pg/lib/utils.js') +let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl + +interface BatchQueryConfig { + name?: string + text: string + values?: any[][] +} + +class BatchQuery implements Submittable { + + name: string | null + text: string + values: string[][] + connection: Connection | null + _portal: string | null + _result: typeof Result | null + _results: typeof Result[] + callback: Function | null + + public constructor(batchQuery: BatchQueryConfig) { + const { name, values, text } = batchQuery + + this.name = name + this.values = values + this.text = text + this.connection = null + this._portal = null + this._result = new Result() + this._results = [] + this.callback = null + + for (const row of values) { + if (!Array.isArray(values)) { + throw new Error('Batch commands require each set of values to be an array. e.g. values: any[][]') + } + } + } + + public submit(connection: Connection): void { + this.connection = connection + + // creates a named prepared statement + this.connection.parse( + { + text: this.text, + name: this.name, + types: [] + }, + true + ) + + this.values.map(val => { + this._portal = 'C_' + nextUniqueID++ + this.connection.bind({ + statement: this.name, + values: val, + portal: this._portal, + valueMapper: utils.prepareValue, + }, true) + + // maybe we could avoid this for non-select queries + this.connection.describe({ + type: 'P', + name: this._portal, + }, true) + + this.connection.execute({portal: this._portal}, true) + }) + + this.connection.sync() + } + + execute(): Promise { + return new Promise((resolve, reject) => { + this.callback = (err, rows) => (err ? reject(err) : resolve(rows)) + }) + } + + handleError(err, connection) { + this.connection.flush() + if (this.callback) { + this.callback(err) + } + } + + handleReadyForQuery(con) { + if (this.callback) { + try { + this.callback(null, this._results) + } + catch(err) { + throw err + } + } + } + + handleRowDescription(msg) { + this._result.addFields(msg.fields) + } + + handleDataRow(msg) { + const row = this._result.parseRow(msg.fields) + this._result.addRow(row) + } + + handleCommandComplete(msg) { + this._result.addCommandComplete(msg) + this._results.push(this._result) + this._result = new Result() + this.connection.close({ type: 'P', name: this._portal }, true) + } + + + handleEmptyQuery() { + this.connection.sync() + } +} + +export = BatchQuery diff --git a/packages/pg-batch-query/test/test-error-handling.ts b/packages/pg-batch-query/test/test-error-handling.ts new file mode 100644 index 000000000..8961ac3a5 --- /dev/null +++ b/packages/pg-batch-query/test/test-error-handling.ts @@ -0,0 +1,70 @@ +import assert from 'assert' +import pg from 'pg' +import { DatabaseError } from 'pg-protocol' +import BatchQuery from "../src" + +describe('BatchQuery error handling', function () { + beforeEach(async function () { + this.client = new pg.Client() + await this.client.connect() + }) + + afterEach(function (){ + this.client.end() + }) + + it('handles error in parsing but can continue with another client', async function() { + const batch = new BatchQuery({ + text: 'INSERT INTO foo (name) VALUES ($1)', + values: [ + ['first'], + ['second'] + ] + }) + // fails since table is not yet created + try { + await this.client.query(batch).execute() + } catch (e) { + assert.equal(e.message, 'relation "foo" does not exist') + } + await this.client.query('Select now()') + }) + + it('handles error in insert of some of the values provided and reverts transaction', async function (){ + await this.client.query('CREATE TEMP TABLE foo(value int, id SERIAL PRIMARY KEY)') + const batch = new BatchQuery({ + text: 'INSERT INTO foo (value) VALUES ($1)', + values: [ + ['1'], + ['3'], + ['xxx'] + ], + }) + // fails since xxx is not an int + try { + await this.client.query(batch).execute() + } catch (e) { + assert.equal(e.message, 'invalid input syntax for integer: "xxx"') + } + const response = await this.client.query('Select sum(value) from foo') + assert.equal(response.rows[0]['sum'], null) + }) + + it('handles error in select batch query', async function (){ + await this.client.query('CREATE TEMP TABLE foo(value int, id SERIAL PRIMARY KEY)') + const batch = new BatchQuery({ + text: 'SELECT * from foo where value = ($1)', + values: [ + ['1'], + ['3'], + ['xxx'] + ], + }) + // fails since xxx is not an int + try { + await this.client.query(batch).execute() + } catch (e) { + assert.equal(e.message, 'invalid input syntax for integer: "xxx"') + } + }) +}) \ No newline at end of file diff --git a/packages/pg-batch-query/test/test-pool.ts b/packages/pg-batch-query/test/test-pool.ts new file mode 100644 index 000000000..783202273 --- /dev/null +++ b/packages/pg-batch-query/test/test-pool.ts @@ -0,0 +1,54 @@ +import assert from 'assert' +import BatchQuery from '../src' +import pg from 'pg' + +describe('batch pool query', function () { + beforeEach(async function () { + this.pool = new pg.Pool({ max: 1 }) + }) + + afterEach(function () { + this.pool.end() + }) + + it('batch insert works', async function () { + const batchQueryPromise = new BatchQuery({ + text: 'INSERT INTO foo (name) VALUES ($1)', + values: [ + ['first'], + ['second'] + ] + }) + this.pool.connect(async (err, client, done) => { + if (err) throw err + await client.query('CREATE TEMP TABLE foo(name TEXT, id SERIAL PRIMARY KEY)') + await client.query(batchQueryPromise).execute() + const resp = await client.query('SELECT COUNT(*) from foo') + await client.release() + assert.strictEqual(resp.rows[0]['count'], '2') + }) + + }) + + it('batch select works', async function () { + const batchQueryPromise = new BatchQuery({ + text: 'SELECT * from foo where name = $1', + values: [ + ['first'], + ['second'] + ], + name: 'optional' + }) + this.pool.connect(async (err, client, done) => { + if (err) throw err + await client.query('CREATE TEMP TABLE foo(name TEXT, id SERIAL PRIMARY KEY)') + await client.query('INSERT INTO foo (name) VALUES ($1)', ['first']) + await client.query('INSERT INTO foo (name) VALUES ($1)', ['second']) + const responses = await client.query(batchQueryPromise).execute() + await client.release() + for ( const response of responses) { + assert.strictEqual(response.rowCount, 1) + } + }) + }) +}) diff --git a/packages/pg-batch-query/test/test-query.ts b/packages/pg-batch-query/test/test-query.ts new file mode 100644 index 000000000..d923b434d --- /dev/null +++ b/packages/pg-batch-query/test/test-query.ts @@ -0,0 +1,73 @@ +import assert from 'assert' +import BatchQuery from '../src' +import pg from 'pg' + +describe('batch query', function () { + beforeEach(async function () { + const client = (this.client = new pg.Client()) + await client.connect() + await client.query('CREATE TEMP TABLE foo(name TEXT, id SERIAL PRIMARY KEY)') + }) + + afterEach(function () { + this.client.end() + }) + + it('batch insert works', async function () { + await this.client.query(new BatchQuery({ + text: 'INSERT INTO foo (name) VALUES ($1)', + values: [ + ['first'], + ['second'] + ] + })).execute() + const resp = await this.client.query('SELECT COUNT(*) from foo') + assert.strictEqual(resp.rows[0]['count'], '2') + }) + + it('batch select works', async function () { + await this.client.query('INSERT INTO foo (name) VALUES ($1)', ['first']) + await this.client.query('INSERT INTO foo (name) VALUES ($1)', ['second']) + const responses = await this.client.query(new BatchQuery({ + text: 'SELECT * from foo where name = $1', + values: [ + ['first'], + ['second'] + ], + name: 'optional' + })).execute() + for ( const response of responses) { + assert.strictEqual(response.rowCount, 1) + } + }) + + it('batch insert with non string values', async function () { + await this.client.query('CREATE TEMP TABLE bar(value INT, id SERIAL PRIMARY KEY)') + const batchInsert = new BatchQuery({ + text: 'INSERT INTO bar (value) VALUES ($1)', + values: [ + ['1'], + ['2'] + ] + }) + await this.client.query(batchInsert).execute() + const resp = await this.client.query('SELECT SUM(value) from bar') + assert.strictEqual(resp.rows[0]['sum'], '3') + }) + + it('If query is for an array', async function() { + await this.client.query('INSERT INTO foo (name) VALUES ($1)', ['first']) + await this.client.query('INSERT INTO foo (name) VALUES ($1)', ['second']) + const responses = await this.client.query(new BatchQuery({ + text: `SELECT * from foo where name = ANY($1)`, + values: [ + [['first', 'third']], + [['second', 'fourth']] + ], + name: 'optional' + })).execute() + assert.equal(responses.length, 2) + for ( const response of responses) { + assert.strictEqual(response.rowCount, 1) + }}) +}) diff --git a/packages/pg-batch-query/tsconfig.json b/packages/pg-batch-query/tsconfig.json new file mode 100644 index 000000000..15b962dd9 --- /dev/null +++ b/packages/pg-batch-query/tsconfig.json @@ -0,0 +1,26 @@ +{ + "compilerOptions": { + "module": "commonjs", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "strict": false, + "target": "es6", + "noImplicitAny": false, + "moduleResolution": "node", + "sourceMap": true, + "pretty": true, + "outDir": "dist", + "incremental": true, + "baseUrl": ".", + "declaration": true, + "types": [ + "node", + "pg", + "mocha", + "chai" + ] + }, + "include": [ + "src/**/*" + ] +} diff --git a/tsconfig.json b/tsconfig.json index 53fb70c6e..7351372f3 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,6 +7,7 @@ "include": [], "references": [ {"path": "./packages/pg-query-stream"}, - {"path": "./packages/pg-protocol"} + {"path": "./packages/pg-protocol"}, + {"path": "./packages/pg-batch-query"} ] }