Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow send arrow scheme and data as one data field #11790

Open
rekby opened this issue Nov 20, 2024 · 0 comments
Open

Allow send arrow scheme and data as one data field #11790

rekby opened this issue Nov 20, 2024 · 0 comments

Comments

@rekby
Copy link
Member

rekby commented Nov 20, 2024

Propose: allow to send arrow data without header.

for example table:

CREATE TABLE t (
    id Int64 NOT NULL,
    val Text,
    PRIMARY KEY (id)
)

allow bytes: https://gist.githubusercontent.com/rekby/620508c1ab889bf463eeb9b9b2f3b2d5/raw/e1288b16735716482038c1a7719f59cab4b04f44/gistfile1.txt

Now I have the error:

SCHEME_ERROR (code = 400070, address = localhost:2136, nodeID = 1, issues = [{'Bulk upsert to table '/local/t'Wrong schema in bulk upsert data'}])
Example of broken code
func TestTableArrowBulkUpsertDataExample(t *testing.T) {
	ctx := context.Background()
	db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
	require.NoError(t, err)

	_ = db.Query().Exec(ctx, `DROP TABLE IF EXISTS t`)
	db.Query().Exec(ctx, `
CREATE TABLE t (
    id Int64 NOT NULL,
    val Text,
    PRIMARY KEY (id)
)
`, query.WithTxControl(query.DefaultTxControl()))
	require.NoError(t, err)

	schemaGo := arrow.NewSchema([]arrow.Field{
		arrow.Field{Name: "id", Type: &arrow.Int64Type{}},
		arrow.Field{Name: "val", Type: &arrow.StringType{}},
	}, nil)
	a := memory.NewGoAllocator()

	ids := array.NewInt64Builder(a)
	ids.AppendValues([]int64{123, 234}, []bool{true, true})
	defer ids.Release()

	vals := array.NewStringBuilder(a)
	vals.AppendValues([]string{"data1", "data2"}, []bool{true, true})
	defer vals.Release()

	b := &bytes.Buffer{}
	aw := ipc.NewWriter(b, ipc.WithSchema(schemaGo))
	err = aw.Write(array.NewRecord(schemaGo, []array.Interface{
		ids.NewArray(),
		vals.NewArray(),
	}, 2))
	require.NoError(t, err)

	goData := b.Bytes() // serialize schema, then data

	t.Log(goData)

	tablePath := "/local/t"
	err = db.Table().BulkUpsert(ctx, tablePath, table.BulkUpsertDataArrow(
		goData,
	))
	require.NoError(t, err)

}

Golang arrow library no simple way for separate schema and data binary: it can serialize arrow stream with stream end mark and can serialize scheme + data.

For extract data it needs:

  1. serialize scheme
  2. cut steam and marker bytes from scheme bytes
  3. serialize scheme + data
  4. cut scheme bytes from head of the data
Cut example
func (rows *tableRows) Data() ([]byte, error) {
	b := &bytes.Buffer{}
	aw := ipc.NewWriter(b, ipc.WithSchema(rows.scheme))
	err := aw.Write(array.NewRecord(rows.scheme, []array.Interface{
		rows.ID.NewArray(),
		rows.Val.NewArray(),
	}, rows.rowsCount))

	if err != nil {
		return nil, err
	}

	goData := b.Bytes()
	goData = bytes.TrimPrefix(goData, bytes.TrimPrefix(rows.SchemeBytes, endStreamMarker[:]))
	return goData, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant