Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

confused about usage of parquet_decode and batched #2930

Closed
fearfate opened this issue Oct 11, 2024 · 1 comment
Closed

confused about usage of parquet_decode and batched #2930

fearfate opened this issue Oct 11, 2024 · 1 comment

Comments

@fearfate
Copy link
Contributor

I want to deal with the parquet file, with some mapping, and then encode the result back to parquet

input:
  label: projects
  file:
        paths:
          - source/*.parquet
        scanner:
          to_the_end: {}
        auto_replay_nacks: true
pipeline:
  processors:
  - parquet_decode: {}
  - for_each:
    - mapping: |
        #!blobl
        root = this
        root.ID = "project:%v/%v".format(root.Type.lowercase(), root.Name)
        root.SnapshotAt = root.SnapshotAt.ts_unix_nano()
        if root.exists("OSSFuzz") && root.OSSFuzz != null {
          if root.OSSFuzz.exists("Date") && root.OSSFuzz.Date != null {
            root.OSSFuzz.Date = root.OSSFuzz.Date.ts_unix_nano()
          }
        }
        meta SnapshotAt = root.SnapshotAt
  - parquet_encode:
      schema:
      - name: SnapshotAt
        type: INT64
      - name: Type
        type: UTF8
      - name: Name
        type: UTF8
      - name: OpenIssuesCount
        optional: true
        type: INT64
      - name: StarsCount
        optional: true
        type: INT64
      - name: ForksCount
        optional: true
        type: INT64
      - name: Licenses
        repeated: true
        type: UTF8
      - name: Description
        optional: true
        type: UTF8
      - name: Homepage
        optional: true
        type: UTF8
      - name: OSSFuzz
        optional: true
        fields:
          - name: LineCount
            optional: true
            type: INT64
          - name: LineCoverCount
            optional: true
            type: INT64
          - name: Date
            optional: true
            type: INT64
          - name: ConfigURL
            type: UTF8
      default_compression: zstd

output:
  file:
    path: target/${! timestamp_unix_nano() }.parquet # No default (required)
    codec: all-bytes

the config will break up a batch message to every single message and encode every message into independent file.

i try to using batched, then how could i using the origin filename rather than a random one( ${! timestamp_unix_nano() })

@mihaitodor
Copy link
Collaborator

Hey @fearfate 👋

then how could i using the origin filename rather than a random one( ${! timestamp_unix_nano() })

The file input adds a bunch of metadata to each message including path: https://docs.redpanda.com/redpanda-connect/components/inputs/file/#metadata. Based on that, you can do ${! @path.filepath_split().index(-1) }.

PS: Moving to a discussion as per #2026.

@redpanda-data redpanda-data locked and limited conversation to collaborators Oct 11, 2024
@mihaitodor mihaitodor converted this issue into discussion #2931 Oct 11, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants