Read csv files from zip file #451
-
I am trying to the files inside a zip file using a transform stream all wrapped into a iterator for convenience. I haven't gotten to the csv part yet because I can't figure out how to use the Another problem is that the iterator only yields one line if I put a From reading the doc I suspect I might need to have access to the defaultWriter to ease out more values and close it after processing, but the implementation of getData is really hard to follow. import {Uint8ArrayWriter,Uint8ArrayReader,TextReader,ZipReader,ZipWriter} from 'https://deno.land/x/zipjs/index.js'
import {TextLineStream} from 'https://deno.land/std/streams/mod.ts'
import {readCSV,CSVReader} from 'https://deno.land/x/csv/mod.ts'
class FileIterator {
accumulator = []
constructor(entry) {
const transform = new TransformStream()
const queuingStrategy = new CountQueuingStrategy({highWaterMark:1})
let accumulator = this.accumulator
this.writable = new WritableStream({
write(chunk) {console.log('line');accumulator.push(chunk)},
close() {console.log('abort csv file iterator')},
abort(e) {console.error('abort csv file iterator: ',e)}
}, queuingStrategy)
transform.readable
.pipeThrough(new TextDecoderStream())
// .pipeThrough(new TextLineStream())
.pipeTo(this.writable)
this.transform = transform
this.entry = entry
}
[Symbol.asyncIterator]() {return this.iterateFromOffset(0)}
async *iterateFromOffset(offset) {
const writer = await this.entry.getData(this.transform)
while(this.accumulator[offset]) {
yield this.accumulator[offset++]
}
}
}
const zipWriter = new ZipWriter(new Uint8ArrayWriter());
await Promise.all([
zipWriter.add("a.csv", new TextReader("a,b,c\n1,2,3")),
zipWriter.add("b.csv", new TextReader("d,e,f\n4,5,6"))])
const zipFile = await zipWriter.close()
const zipReader = new ZipReader(new Uint8ArrayReader(zipFile))
for (const entry of await zipReader.getEntries()) {
console.log('file ', entry.filename)
const iter = new FileIterator(entry)
for await (const content of iter) {
console.log(content)
}
}
await zipReader.close()
console.log('END') log:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 11 replies
This comment has been hidden.
This comment has been hidden.
-
import {
ZipReader,
ZipWriter,
terminateWorkers,
Uint8ArrayWriter,
Uint8ArrayReader,
TextReader
} from 'https://deno.land/x/zipjs@v2.7.29/index.js';
import {
initParser,
inferSchema
} from 'https://esm.sh/udsv@0.5.3';
const zipWriter = new ZipWriter(new Uint8ArrayWriter());
await Promise.all([
zipWriter.add('a.csv', new TextReader(
'a,b,c\n' + Array.from({ length: 100 }).map((_, i) => `${i},${i},${i}`).join('\n')
)),
zipWriter.add('b.csv', new TextReader('d,e,f\n4,5,6'))]);
const zipFile = await zipWriter.close();
const zipReader = new ZipReader(new Uint8ArrayReader(zipFile));
for (const entry of await zipReader.getEntries()) {
console.log('file ', entry.filename);
for await (const row of await csvFromZipEntry(entry)) {
console.log('row =>', row);
}
}
await zipReader.close();
terminateWorkers();
async function csvFromZipEntry(entry) {
let csvParser;
const { readable, writable } = new TransformStream();
const readableOutput = readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TransformStream({
transform(chunk, controller) {
csvParser ??= initParser(inferSchema(chunk));
csvParser.chunk(chunk, csvParser.typedArrs, rows => rows.forEach(row => controller.enqueue(row)));
},
flush() {
csvParser.end();
}
}))
await entry.getData(writable);
return readableOutput;
} |
Beta Was this translation helpful? Give feedback.
I don't think it's possible since it depends on, the performance could be improved by not usingcontroller
. Howeverawait
when callinggetData
and.csvFromZipEntry
. The exceptiongetData
might trow can also be ignored, because the stream (i.e.readable
) can propagate itHere's below the code with these changes applied.