Skip to content

Commit

Permalink
Merge pull request #109 from planetlabs/use-pqarrow
Browse files Browse the repository at this point in the history
Use the pqarrow file writer
  • Loading branch information
tschaub authored Oct 29, 2023
2 parents e01e089 + 2689577 commit ba009c9
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 19 deletions.
3 changes: 2 additions & 1 deletion internal/geoparquet/geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/compress"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/apache/arrow/go/v14/parquet/schema"
"github.com/paulmach/orb/encoding/wkb"
"github.com/paulmach/orb/encoding/wkt"
Expand Down Expand Up @@ -154,7 +155,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
return arrow.NewChunked(builder.Type(), transformed), nil
}

beforeClose := func(fileReader *file.Reader, fileWriter *file.Writer) error {
beforeClose := func(fileReader *file.Reader, fileWriter *pqarrow.FileWriter) error {
metadata := getMetadata(fileReader, convertOptions)
for name, geometryCol := range metadata.Columns {
if !datasetInfo.HasCollection(name) {
Expand Down
36 changes: 19 additions & 17 deletions internal/pqutil/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TransformConfig struct {
RowGroupLength int
TransformSchema SchemaTransformer
TransformColumn ColumnTransformer
BeforeClose func(*file.Reader, *file.Writer) error
BeforeClose func(*file.Reader, *pqarrow.FileWriter) error
}

func getWriterProperties(config *TransformConfig, fileReader *file.Reader) (*parquet.WriterProperties, error) {
Expand Down Expand Up @@ -104,7 +104,16 @@ func TransformByColumn(config *TransformConfig) error {
return propErr
}

fileWriter := file.NewParquetWriter(config.Writer, outputSchema.Root(), file.WithWriterProps(writerProperties))
arrowSchema, arrowSchemaErr := pqarrow.FromParquet(outputSchema, &arrowReadProperties, fileReader.MetaData().KeyValueMetadata())
if arrowSchemaErr != nil {
return arrowSchemaErr
}

fileWriter, fileWriterErr := pqarrow.NewFileWriter(arrowSchema, config.Writer, writerProperties, pqarrow.DefaultWriterProps())
if fileWriterErr != nil {
return fileWriterErr
}

ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)

if config.RowGroupLength > 0 {
Expand All @@ -120,7 +129,8 @@ func TransformByColumn(config *TransformConfig) error {
numRows := fileReader.NumRows()
numRowsWritten := int64(0)
for {
rowGroupWriter := fileWriter.AppendRowGroup()
fileWriter.NewRowGroup()
numRowsInGroup := 0
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
colReader := columnReaders[fieldNum]
arr, readErr := colReader.NextBatch(int64(config.RowGroupLength))
Expand All @@ -139,18 +149,14 @@ func TransformByColumn(config *TransformConfig) error {
}
arr = transformed
}
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
if colWriterErr != nil {
return colWriterErr
if numRowsInGroup == 0 {
// TODO: propose fileWriter.RowGroupNumRows()
numRowsInGroup = arr.Len()
}
if err := colWriter.Write(ctx); err != nil {
if err := fileWriter.WriteColumnChunked(arr, 0, int64(arr.Len())); err != nil {
return err
}
}
numRowsInGroup, err := rowGroupWriter.NumRows()
if err != nil {
return err
}
numRowsWritten += int64(numRowsInGroup)
if numRowsWritten >= numRows {
break
Expand All @@ -160,7 +166,7 @@ func TransformByColumn(config *TransformConfig) error {
numRowGroups := fileReader.NumRowGroups()
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
rowGroupWriter := fileWriter.AppendRowGroup()
fileWriter.NewRowGroup()
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
if readErr != nil {
Expand All @@ -175,11 +181,7 @@ func TransformByColumn(config *TransformConfig) error {
}
arr = transformed
}
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
if colWriterErr != nil {
return colWriterErr
}
if err := colWriter.Write(ctx); err != nil {
if err := fileWriter.WriteColumnChunked(arr, 0, int64(arr.Len())); err != nil {
return err
}
}
Expand Down
66 changes: 66 additions & 0 deletions internal/pqutil/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"testing"

"github.com/apache/arrow/go/v14/arrow"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/compress"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/apache/arrow/go/v14/parquet/schema"
"github.com/planetlabs/gpq/internal/pqutil"
"github.com/planetlabs/gpq/internal/test"
Expand Down Expand Up @@ -123,6 +125,70 @@ func TestTransformByColumn(t *testing.T) {
}
}

func makeOvertureData(t *testing.T) (string, []byte) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "sources", Nullable: true, Type: arrow.ListOf(arrow.StructOf(
arrow.Field{Name: "property", Nullable: true, Type: arrow.BinaryTypes.String},
arrow.Field{Name: "dataset", Nullable: true, Type: arrow.BinaryTypes.String},
arrow.Field{Name: "recordId", Nullable: true, Type: arrow.BinaryTypes.String},
arrow.Field{Name: "confidence", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
))},
{Name: "bbox", Nullable: false, Type: arrow.StructOf(
arrow.Field{Name: "minx", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "maxx", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "miny", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "maxy", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
)},
}, nil)

expected := `[
{
"sources": [
{
"property": "",
"recordId": "record-1",
"dataset": "test",
"confidence": null
}
],
"bbox": {
"minx": -180,
"maxx": -180,
"miny": -90,
"maxy": -90
}
}
]`
record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(expected))
require.NoError(t, err)

output := &bytes.Buffer{}
writer, err := pqarrow.NewFileWriter(schema, output, nil, pqarrow.DefaultWriterProps())
require.NoError(t, err)

require.NoError(t, writer.Write(record))
require.NoError(t, writer.Close())

return expected, output.Bytes()
}

func TestTransformOverture(t *testing.T) {
// minimal reproduction of https://github.com/planetlabs/gpq/issues/102
expected, parquetData := makeOvertureData(t)

input := bytes.NewReader(parquetData)
output := &bytes.Buffer{}
config := &pqutil.TransformConfig{
Reader: input,
Writer: output,
}

require.NoError(t, pqutil.TransformByColumn(config))

outputAsJSON := test.ParquetToJSON(t, bytes.NewReader(output.Bytes()))
assert.JSONEq(t, expected, outputAsJSON)
}

func TestTransformByRowGroupLength(t *testing.T) {
numRows := 100
rows := make([]map[string]any, numRows)
Expand Down
3 changes: 2 additions & 1 deletion internal/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/paulmach/orb"
"github.com/paulmach/orb/encoding/wkb"
"github.com/planetlabs/gpq/internal/geojson"
Expand Down Expand Up @@ -87,7 +88,7 @@ func (s *Suite) copyWithMetadata(input parquet.ReaderAtSeeker, output io.Writer,
config := &pqutil.TransformConfig{
Reader: input,
Writer: output,
BeforeClose: func(fileReader *file.Reader, fileWriter *file.Writer) error {
BeforeClose: func(fileReader *file.Reader, fileWriter *pqarrow.FileWriter) error {
return fileWriter.AppendKeyValueMetadata(geoparquet.MetadataKey, metadata)
},
}
Expand Down

0 comments on commit ba009c9

Please sign in to comment.