Skip to content

Commit

Permalink
Add the option to write to STDOUT, add mu to lock when necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
sfomuseumbot committed Aug 17, 2024
1 parent b1dfda9 commit 8d8297c
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"strconv"
"sync"

"github.com/apache/arrow/go/v16/parquet"
"github.com/tidwall/gjson"
Expand All @@ -28,6 +29,7 @@ type GeoParquetWriter struct {
feature_writer *geoparquet.FeatureWriter
buffer []*geo.Feature
append_properties []string
mu *sync.RWMutex
}

func init() {
Expand Down Expand Up @@ -55,16 +57,16 @@ func NewGeoParquetWriter(ctx context.Context, uri string) (writer.Writer, error)
if u.Path == "" {
io_writer = os.Stdout
} else {

wr, err := os.OpenFile(u.Path, os.O_RDWR|os.O_CREATE, 0644)

if err != nil {
return nil, fmt.Errorf("Failed to open %s for writing, %w", u.Path, err)
}

io_writer = wr
}

min := 10
max := 100
compression := "zstd"
Expand Down Expand Up @@ -118,11 +120,14 @@ func NewGeoParquetWriter(ctx context.Context, uri string) (writer.Writer, error)

buffer := make([]*geo.Feature, 0)

mu := new(sync.RWMutex)

gpq := &GeoParquetWriter{
convert_options: convert_options,
io_writer: io_writer,
buffer: buffer,
append_properties: append_properties,
mu: mu,
}

return gpq, nil
Expand Down Expand Up @@ -191,6 +196,9 @@ func (gpq *GeoParquetWriter) Write(ctx context.Context, key string, r io.ReadSee
return 0, fmt.Errorf("Failed to ensure feature writer (%s), %w", key, err)
}

gpq.mu.Lock()
defer gpq.mu.Unlock()

if !ready {
gpq.buffer = append(gpq.buffer, f)
return -1, nil
Expand All @@ -214,6 +222,9 @@ func (gpq *GeoParquetWriter) Write(ctx context.Context, key string, r io.ReadSee
// Close will close the underlying GeoParquet database.
func (gpq *GeoParquetWriter) Close(ctx context.Context) error {

gpq.mu.Lock()
defer gpq.mu.Unlock()

err := gpq.flushBuffer(ctx)

if err != nil {
Expand All @@ -233,6 +244,9 @@ func (gpq *GeoParquetWriter) Close(ctx context.Context) error {

func (gpq *GeoParquetWriter) ensureFeatureWriter(ctx context.Context, f *geo.Feature) (bool, error) {

gpq.mu.Lock()
defer gpq.mu.Unlock()

if gpq.feature_writer != nil {
return true, nil
}
Expand Down

0 comments on commit 8d8297c

Please sign in to comment.