Skip to content

Commit

Permalink
Add an option to return each row as an object keyed by column name (#25)
Browse files Browse the repository at this point in the history
* Add an option to return each row as an object keyed by column name

* rename option to rowFormat and address feedback
  • Loading branch information
ctranstrum authored Aug 13, 2024
1 parent 400c736 commit d13d52b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 8 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ await parquetRead({
})
```

## Column names

By default, data returned in the `onComplete` function will be one array of columns per row.
If you would like each row to be an object with each key the name of the column, set the option `rowFormat` to `object`.

```js
import { parquetRead } from 'hyparquet'

await parquetRead({
file,
rowFormat: 'object',
onComplete: data => console.log(data),
})
```

## Advanced Usage

### AsyncBuffer
Expand Down
4 changes: 2 additions & 2 deletions demo/demo.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async function render(asyncBuffer, metadata, name) {
compressors,
file: asyncBuffer,
rowEnd: 1000,
onComplete(/** @type {any[][]} */ data) {
onComplete(/** @type {any[][] | Record<string, any>[]} */ data) {
const ms = performance.now() - startTime
console.log(`parsed ${name} in ${ms.toFixed(0)} ms`)
content.appendChild(renderTable(header, data))
Expand Down Expand Up @@ -144,7 +144,7 @@ fileInput?.addEventListener('change', () => {

/**
* @param {string[]} header
* @param {any[][]} data
* @param {any[][] | Record<string, any>[]} data
* @returns {HTMLTableElement}
*/
function renderTable(header, data) {
Expand Down
4 changes: 3 additions & 1 deletion src/hyparquet.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type { AsyncBuffer, Compressors, FileMetaData, SchemaTree }
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {string} [options.rowFormat] desired format of each row passed to the onComplete function
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
Expand Down Expand Up @@ -111,10 +112,11 @@ export interface ParquetReadOptions {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
rowFormat?: string // format of each row passed to the onComplete function
rowStart?: number // inclusive
rowEnd?: number // exclusive
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
onComplete?: (rows: any[][] | Record<string, any>[]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}
Expand Down
24 changes: 19 additions & 5 deletions src/read.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import { assembleNested } from './assemble.js'
import { getColumnRange, readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
Expand All @@ -24,10 +23,11 @@ import { concat } from './utils.js'
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {string} [options.rowFormat] format of each row passed to the onComplete function
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {(rows: any[][] | Record<string, any>[]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors] custom decompressors
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
Expand Down Expand Up @@ -74,8 +74,9 @@ export async function parquetRead(options) {
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {string} [options.rowFormat] format of each row passed to the onComplete function
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {(rows: any[][] | Record<string, any>[]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors]
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
Expand Down Expand Up @@ -186,12 +187,25 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
if (options.onComplete) {
// transpose columns into rows
const groupData = new Array(rowLimit)
const includedColumns = children
const includedColumnNames = children
.map(child => child.element.name)
.filter(name => !columns || columns.includes(name))
const includedColumns = includedColumnNames
.map(name => subcolumnData.get(name))

for (let row = 0; row < rowLimit; row++) {
groupData[row] = includedColumns.map(column => column[row])
if (options.rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */
const rowData = {}
includedColumnNames.forEach((name, index) => {
rowData[name] = includedColumns[index][row]
})
groupData[row] = rowData
} else {
// return each row as an array
groupData[row] = includedColumns.map(column => column[row])
}
}
return groupData
}
Expand Down
26 changes: 26 additions & 0 deletions test/read.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,30 @@ describe('parquetRead', () => {
},
})
})

it('format row as object', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file,
columns: ['c'],
rowFormat: 'object',
onChunk: chunk => {
expect(toJson(chunk)).toEqual({
columnName: 'c',
columnData: [2, 3, 4, 5, 2],
rowStart: 0,
rowEnd: 5,
})
},
onComplete: (rows) => {
expect(toJson(rows)).toEqual([
{ c: 2 },
{ c: 3 },
{ c: 4 },
{ c: 5 },
{ c: 2 },
])
},
})
})
})

0 comments on commit d13d52b

Please sign in to comment.