Skip to content

Commit

Permalink
Merge pull request #27 from GreenmaskIO/cmd_drivers
Browse files Browse the repository at this point in the history
Refactored external command interaction API
  • Loading branch information
wwoytenko authored Mar 15, 2024
2 parents ad470ac + 617b653 commit 7eb2009
Show file tree
Hide file tree
Showing 16 changed files with 550 additions and 181 deletions.
178 changes: 137 additions & 41 deletions docs/built_in_transformers/standard_transformers/cmd.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,62 @@ Transform data via external program using `stdin` and `stdout` interaction.
| columns | A list of column names to be affected. If empty, the entire tuple is used. Read about the structure further. | | Yes | Any |
| executable | The path to the `executable` parameter file | | Yes | - |
| args | A list of parameters for the executable | | No | - |
| driver | The row driver with parameters that is used for interacting with cmd. `{"name": "text, csv, json", "params": { "format": "[text bytes]"}}`. The `text` and `bytes` options are available only for `json`. | `{"name": "csv"}` | No | - |
| driver | The row driver with parameters that is used for interacting with cmd. See details below. | `{"name": "csv"}` | No | - |
| validate | Performs a decoding operation using the PostgreSQL driver for data received from the command to ensure the data format is correct | `false` | No | - |
| timeout | Timeout for sending and receiving data from the external command | `2s` | No | - |
| expected_exit_code | The expected exit code on SIGTERM signal. If the exit code is unexpected, the transformation exits with an error. | `0` | No | - |
| skip_on_behaviour | Skips transformation call if one of the provided columns has a `null` value (`any`) or each of the provided columns has `null` values (`all`). This option works together with the `skip_on_null_input` parameter on columns. Possible values: `all`, `any`. | `all` | No | - |

!!! warning

The parameter `validate_output=true` may cause an error if the type does not have a PostgreSQL driver decoder implementation. Most of the types, such as `int`, `float`, `text`, `varchar`, `date`, `timestamp`, etc., have encoders and decoders, as well as inherited types like domain types based on them.
The parameter `validate_output=true` may cause an error if the type does not have a PostgreSQL driver decoder
implementation. Most of the types, such as `int`, `float`, `text`, `varchar`, `date`, `timestamp`, etc., have
encoders and decoders, as well as inherited types like domain types based on them.

## Description

The `Cmd` transformer allows you to send original data to an external program via `stdin` and receive transformed data from `stdout`. It supports various interaction formats such as `json`, `csv`, or plain `text` for one-column transformations. The interaction is performed line by line, so at the end of each sent data, a new line symbol `\n` must be included.
The `Cmd` transformer allows you to send original data to an external program via `stdin` and receive transformed data
from `stdout`. It supports various interaction formats such as `json`, `csv`, or plain `text` for one-column
transformations. The interaction is performed line by line, so at the end of each sent data, a new line
symbol `\n` must be included.

### Types of interaction modes

#### text

Textual driver that is used only for one column transformation, thus you cannot provide here more than one column. The value encodes into string laterally. For example, `2023-01-03 01:00:00.0+03`.
Textual driver that is used only for one column transformation, thus you cannot provide here more than one column.
The value encodes into string laterally. For example, `2023-01-03 01:00:00.0+03`.

#### json

JSON line driver. It has two formats that can be passed through `driver.params`: `[text|bytes]`. Use the `bytes` format for binary datatypes. Use the `text` format for non-binary datatypes and for those that can be represented as string literals.
JSON line driver. It has two formats that can be passed through `driver.json_data_format`: `[text|bytes]`. Use
the `bytes` format for binary datatypes. Use the `text` format for non-binary datatypes and for those that can be
represented as string literals. The default `json_data_format` is `text`.

=== "The text format example"
=== "Text format with indexes"

```json
{
1: {
"column1": {
"d": "some_value1",
"n": false,
},
2: {
"column2": {
"d": "some_value2",
"n": false,
}
}
```

=== "The bytes format example"
=== "Bytes format with indexes"

```json
{
1: {
"column1": {
"d": "aGVsbG8gd29ybHNeODcxMjE5MCUlJSUlJQ==",
"n": false,
},
2: {
"column2": {
"d": "aGVsbG8gd29ybHNeODcxMjE5MCUlJSUlJQ==",
"n": false,
}
Expand All @@ -64,52 +72,140 @@ JSON line driver. It has two formats that can be passed through `driver.params`:
where:

* Each line is a JSON line with a map of attribute numbers to their values
* `d` — the raw data represented as base64 encoding for the bytes format or Unicode text for the text format. The base64 encoding is needed because data can be binary.
* `d` — the raw data represented as base64 encoding for the bytes format or Unicode text for the text format. The base64
encoding is needed because data can be binary.
* `n` — indicates if NULL is present

#### csv

CSV driver (comma-separated). The number of attributes is the same as the number of table columns, but the
columns that were not mentioned in the `columns` list are empty. The `NULL` value is represented as `\N`. Each attribute is escaped by a quote (`"`). For example, if the transformed table has attributes `id`, `title`, and `created_at`, and only `id` and `created_at` require transformation, then the CSV line will look as follows:
columns that were not mentioned in the `columns` list are empty. The `NULL` value is represented as `\N`. Each attribute
is escaped by a quote (`"`). For example, if the transformed table has attributes `id`, `title`, and `created_at`, and
only `id` and `created_at` require transformation, then the CSV line will look as follows:

``` csv title="csv line example"
"123","","2023-01-03 01:00:00.0+03"
```

Column object attributes:
### Column object attributes

* `name` — the name of the column. This value is required. Depending on the attributes that follows further, this column may be used just as a value and is not affected in any way.
* `not_affected` — indicates whether the column is affected in the transformation. This attribute is required for the validation procedure when Greenmask is called with `greenmask dump --validate`. Setting `not_affected=true` can be helpful when the command transformer transforms data depending on the value of another column. For example, if you want to generate an `updated_at` column value depending on the `created_at` column value, you can set `created_at` to `not_affected=true`. The default value is `false`.
* `skip_original_data` — indicates whether the original data is required for the transformer. This attribute can be helpful for decreasing the interaction time. One use case is when the command works as a generator and returns the value without relying on the original data. The default value is `false`.
* `skip_on_null_input` — specifies whether to skip transformation when the original value is `null`. This attribute works in conjunction with the `skip_on_behaviour` parameter. For example, if you have two affected columns with `skip_on_null_input=true` and one column is `null`, then, if `skip_on_behaviour=any`, the transformation will be skipped, or, if `skip_on_behaviour=and`, the transformation will be performed. The default is `false`.
* `name` — the name of the column. This value is required. Depending on the attributes that follows further, this column
may be used just as a value and is not affected in any way.
* `not_affected` — indicates whether the column is affected in the transformation. This attribute is required for the
validation procedure when Greenmask is called with `greenmask dump --validate`. Setting `not_affected=true` can be
helpful when the command transformer transforms data depending on the value of another column. For example, if you
want to generate an `updated_at` column value depending on the `created_at` column value, you can set `created_at`
to `not_affected=true`. The default value is `false`.
* `skip_original_data` — indicates whether the original data is required for the transformer. This attribute can be
helpful for decreasing the interaction time. One use case is when the command works as a generator and returns the
value without relying on the original data. The default value is `false`.
* `skip_on_null_input` — specifies whether to skip transformation when the original value is `null`. This attribute
works in conjunction with the `skip_on_behaviour` parameter. For example, if you have two affected columns
with `skip_on_null_input=true` and one column is `null`, then, if `skip_on_behaviour=any`, the transformation will be
skipped, or, if `skip_on_behaviour=and`, the transformation will be performed. The default is `false`.

## Example: Apply transformation performed by external command

In the following example, `actual_arrival` and `scheduled_arrival` columns are transformed via external command transformer.
## Example: Apply transformation performed by external command in TEXT format

```yaml title="Cmd transformer example"
- name: "Cmd"
params:
executable: "cmd_test.sh"
driver:
name: "json"
In the following example, `jobtitle` columns is transformed via external command transformer.

```python title="External transformer in python example"
#!/usr/bin/env python3
import signal
import sys

signal.signal(signal.SIGTERM, lambda sig, frame: exit(0))


# If we want to implement a simple generator, we need read the line from stdin and write any result to stdout
for _ in sys.stdin:
# Writing the result to stdout with new line and flushing the buffer
sys.stdout.write("New Job Title")
sys.stdout.write("\n")
sys.stdout.flush()

```

```yaml title="Cmd transformer config example"
- schema: "humanresources"
name: "employee"
transformers:
- name: "Cmd"
params:
format: "bytes" # (4)
timeout: "60s"
validate_output: true # (1)
expected_exit_code: -1
skip_on_behaviour: "any" # (2)
columns:
- name: "actual_arrival"
skip_original_data: true
skip_on_null_input: true # (3)
- name: "scheduled_arrival"
skip_original_data: true
skip_on_null_input: true # (3)
driver:
name: "text"
expected_exit_code: -1
skip_on_null_input: true
validate: true
skip_on_behaviour: "any"
timeout: 60s
executable: "/var/lib/playground/test.py"
columns:
- name: "jobtitle"
skip_original_data: true
skip_on_null_input: true
```
## Example: Apply transformation performed by external command in JSON format
In the following example, `jobtitle` and `loginid` columns are transformed via external command
transformer.

```python title="External transformer in python example"
#!/usr/bin/env python3
import json
import signal
import sys
signal.signal(signal.SIGTERM, lambda sig, frame: exit(0))
for line in sys.stdin:
res = json.loads(line)
# Setting dummy values
res["jobtitle"] = {"d": "New Job Title", "n": False}
res["loginid"]["d"] = "123"
# Writing the result to stdout with new line and flushing the buffer
sys.stdout.write(json.dumps(res))
sys.stdout.write("\n")
sys.stdout.flush()
```

```yaml title="Cmd transformer config example"
- schema: "humanresources"
name: "employee"
transformers:
- name: "Cmd"
params:
driver:
name: "json" # (1)
json_data_format: "text" # (4)
expected_exit_code: -1
skip_on_null_input: true
validate: true
skip_on_behaviour: "any" # (2)
timeout: 60s
executable: "/var/lib/playground/test.py"
columns:
- name: "jobtitle"
skip_original_data: true
skip_on_null_input: true # (3)
- name: "loginid"
skip_original_data: false # (5)
skip_on_null_input: true # (3)
```

{ .annotate }

1. Validate the received data via decode procedure using the PostgreSQL driver. Note that this may cause an error if the type is not supported in the PostgreSQL driver.
1. Validate the received data via decode procedure using the PostgreSQL driver. Note that this may cause an error if the
type is not supported in the PostgreSQL driver.
2. Skip transformation (keep the values) if one of the affected columns (`not_affected=false`) has a null value.
3. If a column has a null value, then skip it. This works in conjunction with `skip_on_behaviour`. Since it has the value any, if one of the columns (`actual_arrival` or `scheduled_arrival`) has a `null` value, then skip the transformation call.
4. The format of JSON can be either `text` or `bytes`.
3. If a column has a null value, then skip it. This works in conjunction with `skip_on_behaviour`. Since it has the
value any, if one of the columns (`jobtitle` or `loginid`) has a `null` value, then skip the
transformation call.
4. The format of JSON can be either `text` or `bytes`. The default value is `text`.
5. The `skip_original_data` attribute is set to `true` the date will not be transfered to the command. This column
will contain the empty original data
4 changes: 3 additions & 1 deletion internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid) ([]*toolk
}
defer rows.Close()

idx := 0
for rows.Next() {
var column toolkit.Column
column := toolkit.Column{Idx: idx}
if err = rows.Scan(&column.Name, &column.TypeOid, &column.TypeName,
&column.NotNull, &column.Length, &column.Num); err != nil {
return nil, fmt.Errorf("cannot scan tableColumnQuery: %w", err)
}
res = append(res, &column)
idx++
}

return res, nil
Expand Down
38 changes: 24 additions & 14 deletions internal/db/postgres/transformers/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ const (
skipOnAllName = "all"
)

var defaultRowDriverParams = &toolkit.DriverParams{
Name: toolkit.JsonModeName,
JsonDataFormat: toolkit.JsonTextDataFormatName,
JsonAttributesFormat: toolkit.JsonAttributesNamesFormatName,
CsvAttributesFormat: toolkit.CsvAttributesConfigNumeratingFormatName,
}

var cmdTransformerName = "Cmd"

var CmdTransformerDefinition = utils.NewDefinition(
Expand Down Expand Up @@ -129,12 +136,12 @@ type Cmd struct {
timeout time.Duration
expectedExitCode int
affectedColumns map[int]string
affectedColumnsIdx []int
transferringColumnsIdx []int
affectedColumnsIdx []*toolkit.Column
transferringColumnsIdx []*toolkit.Column
allColumnsIdx []int
skipOnBehaviour int
checkSkip bool
rowDriverParams *toolkit.RowDriverParams
rowDriverParams *toolkit.DriverParams

driver *toolkit.Driver
t *time.Ticker
Expand All @@ -155,7 +162,7 @@ func NewCmd(
var skipOnBehaviourName string
var skipOnBehaviour = skipOnAll
var checkSkip bool
rowDriverParams := &toolkit.RowDriverParams{}
rowDriverParams := &(*defaultRowDriverParams)

p := parameters["columns"]
if _, err := p.Scan(&columns); err != nil {
Expand All @@ -176,6 +183,9 @@ func NewCmd(
if _, err := p.Scan(rowDriverParams); err != nil {
return nil, nil, fmt.Errorf(`unable to scan "mode" param: %w`, err)
}
if err := rowDriverParams.Validate(); err != nil {
return nil, nil, fmt.Errorf("error validating driver params: %w", err)
}

p = parameters["validate"]
if _, err := p.Scan(&validate); err != nil {
Expand All @@ -202,8 +212,8 @@ func NewCmd(

affectedColumns := make(map[int]string)
var warnings toolkit.ValidationWarnings
var affectedColumnsIdx []int
var transferringColumnsIdx []int
var affectedColumnsIdx []*toolkit.Column
var transferringColumnsIdx []*toolkit.Column
var allColumnsIdx []int

if len(columns) > 0 {
Expand All @@ -227,14 +237,14 @@ func NewCmd(
if !c.NotAffected {
added = true
affectedColumns[idx] = c.Name
affectedColumnsIdx = append(affectedColumnsIdx, idx)
affectedColumnsIdx = append(affectedColumnsIdx, column)
warns := utils.ValidateSchema(driver.Table, column, nil)
warnings = append(warnings, warns...)

}
if !c.SkipOriginalData {
added = true
transferringColumnsIdx = append(transferringColumnsIdx, idx)
transferringColumnsIdx = append(transferringColumnsIdx, column)
}

if !added {
Expand All @@ -247,9 +257,9 @@ func NewCmd(
}
}
} else {
for idx := range driver.Table.Columns {
transferringColumnsIdx = append(transferringColumnsIdx, idx)
affectedColumnsIdx = append(affectedColumnsIdx, idx)
for _, c := range driver.Table.Columns {
transferringColumnsIdx = append(transferringColumnsIdx, c)
affectedColumnsIdx = append(affectedColumnsIdx, c)
}
}

Expand Down Expand Up @@ -401,10 +411,10 @@ func (c *Cmd) needSkip(r *toolkit.Record) (bool, error) {
}

func (c *Cmd) validate(r *toolkit.Record) error {
for _, idx := range c.affectedColumnsIdx {
_, err := r.GetColumnValueByIdx(idx)
for _, col := range c.affectedColumnsIdx {
_, err := r.GetColumnValueByIdx(col.Idx)
if err != nil {
return fmt.Errorf("error validating received attribute \"%s\" value from cmd: %w", r.Driver.Table.Columns[idx].Name, err)
return fmt.Errorf("error validating received attribute \"%s\" value from cmd: %w", r.Driver.Table.Columns[col.Idx].Name, err)
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ const (
DefaultAutoDiscoveryTimeout = 10 * time.Second
)

var defaultRowDriver = &toolkit.RowDriverParams{
Name: CsvModeName,
Params: make(map[string]interface{}),
}

func BootstrapCustomTransformers(ctx context.Context, registry *utils.TransformerRegistry, customTransformers []*TransformerDefinition) (err error) {
for _, ctd := range customTransformers {
var td *utils.Definition
Expand All @@ -55,7 +50,7 @@ func BootstrapCustomTransformers(ctx context.Context, registry *utils.Transforme
}

if ctd.Driver == nil {
ctd.Driver = defaultRowDriver
ctd.Driver = &(*toolkit.DefaultRowDriverParams)
}

if ctd.AutoDiscover {
Expand Down
4 changes: 2 additions & 2 deletions internal/db/postgres/transformers/custom/custom_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func NewCustomCmdTransformer(
if err != nil {
return nil, nil, fmt.Errorf("error getting affeected and transferring columns: %w", err)
}
for _, idx := range affectedColumnsIdx {
affectedColumns[idx] = driver.Table.Columns[idx].Name
for _, c := range affectedColumnsIdx {
affectedColumns[c.Idx] = c.Name
}

api, err := toolkit.NewApi(ctd.Driver, transferringColumnsIdx, affectedColumnsIdx, driver)
Expand Down
Loading

0 comments on commit 7eb2009

Please sign in to comment.