diff --git a/connect-file-pulse-api/pom.xml b/connect-file-pulse-api/pom.xml index 6bc578f24..ab8b4181f 100644 --- a/connect-file-pulse-api/pom.xml +++ b/connect-file-pulse-api/pom.xml @@ -26,7 +26,7 @@ io.streamthoughts kafka-connect-filepulse-reactor - 2.10.0-SNAPSHOT + 2.10.0 kafka-connect-filepulse-api diff --git a/connect-file-pulse-dataformat/pom.xml b/connect-file-pulse-dataformat/pom.xml index fe722bc4e..54574446f 100644 --- a/connect-file-pulse-dataformat/pom.xml +++ b/connect-file-pulse-dataformat/pom.xml @@ -5,7 +5,7 @@ kafka-connect-filepulse-reactor io.streamthoughts - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-expression/pom.xml b/connect-file-pulse-expression/pom.xml index 2233e3703..98f1738de 100644 --- a/connect-file-pulse-expression/pom.xml +++ b/connect-file-pulse-expression/pom.xml @@ -24,7 +24,7 @@ kafka-connect-filepulse-reactor io.streamthoughts - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml index 7c6ddbb96..42fa5eee7 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml @@ -23,7 +23,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml index 0540635eb..30996f239 100644 --- a/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml @@ -7,7 +7,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.10.0-SNAPSHOT + 2.10.0 Kafka Connect Source File Pulse Azure Storage FS diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml index bc211cea0..5d538b9e6 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml @@ -23,7 +23,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml index 509833d05..ff3c8f1a6 100644 --- a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml @@ -7,7 +7,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.10.0-SNAPSHOT + 2.10.0 Kafka Connect Source File Pulse Google Cloud Storage FS diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml index 090d31921..159616c5f 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml @@ -23,7 +23,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-filesystems/pom.xml b/connect-file-pulse-filesystems/pom.xml index e2a6154d9..a1cea9d8a 100644 --- a/connect-file-pulse-filesystems/pom.xml +++ b/connect-file-pulse-filesystems/pom.xml @@ -23,7 +23,7 @@ kafka-connect-filepulse-reactor io.streamthoughts - 2.10.0-SNAPSHOT + 2.10.0 4.0.0 diff --git a/connect-file-pulse-filters/pom.xml b/connect-file-pulse-filters/pom.xml index e634f89ef..284bf03e9 100644 --- a/connect-file-pulse-filters/pom.xml +++ b/connect-file-pulse-filters/pom.xml @@ -26,7 +26,7 @@ io.streamthoughts kafka-connect-filepulse-reactor - 2.10.0-SNAPSHOT + 2.10.0 kafka-connect-filepulse-filters diff --git a/connect-file-pulse-plugin/pom.xml b/connect-file-pulse-plugin/pom.xml index dd6a6a89f..e57d0c0e3 100644 --- a/connect-file-pulse-plugin/pom.xml +++ b/connect-file-pulse-plugin/pom.xml @@ -26,7 +26,7 @@ io.streamthoughts kafka-connect-filepulse-reactor - 2.10.0-SNAPSHOT + 2.10.0 kafka-connect-filepulse-plugin diff --git a/docs/config.toml b/docs/config.toml index 7dbeed6df..b9fe075bb 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -178,3 +178,6 @@ no = 'Sorry to hear that. Please + Learn about the concepts and the functionalities of the Connect File Pulse Plugin. +--- + +The Developer Guide section helps you learn about the functionalities of the File Pulse Connector and the concepts +File Pulse uses to process and transform your data, and helps you obtain a deeper understanding of how File Pulse Connector works. + diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/accessing-data-and-metadata.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/accessing-data-and-metadata.md new file mode 100644 index 000000000..068c0c6a1 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/accessing-data-and-metadata.md @@ -0,0 +1,700 @@ +--- +date: 2022-06-03 +title: "Accessing Data and Metadata" +linkTitle: "Accessing Data and Metadata" +weight: 60 +description: > + Learn how to use the Simple Connect Expression Language (SCeL) for accessing and manipulating data. +--- + +Some filters (e.g : [AppendFilter](/kafka-connect-file-pulse/docs/developer-guide/filters/#appendfilter)) can be configured using the *Simple Connect Expression Language*. + +*Simple Connect Expression Language* (ScEL for short) is an expression language that allows accessing and manipulating record fields and metadata. + +The syntaxes to define an expression are of the form : `` or `"{{ }}"`. + +## Expressions + +ScEL supports the following capabilities : + +* **Literal expressions** +* **Field Selector** +* **Nested Navigation** +* **String substitution** +* **Functions** + +### Literal expressions + +* String : `'Hello World'` +* Number : `42` +* Boolean: `True` +* Nullable: `null` + +### Field Selector + +The expression language can be used to easily select one field from the input record : + +`$.username` + +### Nested Navigation + +To navigate down a struct value, just use a period to indicate a nested field value: + +`$.address.city` + +### String substitution + +The expression language can be used to easily build a new string field that concatenate multiple ones: + +`The user {{ $.username }} is living in city {{ $.address.city }}` + +### Function + +The expression language provides built-in functions that can be used for easily transforming a field value: + +`The user {{ $.username }} is living in city {{ uppercase($.address.city) }}` + +Functions can also be nested to build more complex transformations. +For example, the below expression shows how to replace all whitespace characters after transforming a field's value into lowercase. + +``` +replace_all(lowercase($.field), '\\s', '-') +``` + +{{% alert title="Limitations" color="warning" %}} +Currently, FilePulse does not support user-defined functions (UDFs). So you cannot register your own functions to enrich the expression language. +{{% /alert %}} + +### Dynamic Field Selector + +String substitution can be used to dynamically select a field : + +The bellow example shows how to dynamically build a field selector by concatenating `$.` and +the first element present in the array field `$.values`. + +`{{ '$.'extract_array($.values, 0) }}` + +## Scopes + +In the previous section, we saw how to use the expression language to select a specific field was part of the record being processed. + +In addition to that, _ScEL_ allows you to access additional fields through the use of _Scopes_. +Basically, a _scope_ defined the _root object_ on which a selector expression will be evaluated. + +The syntax to define an expression with a _scope_ is of the form : "`$[].`". + +By default, if no _scope_ is defined in the expression, the scope `$value` is implicitly used. + +ScEL supports a number of predefined _scopes_ that can be used for example : + +- **To define the _topic_, the _key_, the _headers_, or the _timestamp_ for the record.** +- **To access to the metadata of the source file.** +- **To keep transient and contextual data between filters**. +- Etc. + +| Scope | Description | Type | +|--------------|------------------------------------------------------------|-----------------------| +| `$headers` | The record headers | `map[string, object]` | +| `$key` | The record key | `string` | +| `$metadata` | The file metadata | `struct` | +| `$offset` | The offset information of this record into the source file | `struct` | +| `$system` | The system environment variables and runtime properties | `struct` | +| `$timestamp` | The record timestamp | `long` | +| `$topic` | The output topic | `string` | +| `$value` | The record value | `struct` | +| `$variables` | The contextual filter-chain variables | `map[string, object]` | + +{{% alert title="Access error" color="info" %}} +In case of failures an additional `$error` scope will be added to the current filter context (see : [Handling Failures](/kafka-connect-file-pulse/docs/developer-guide/handling-failures/)) +{{% /alert %}} + +### Record Headers + +The scope `headers` allows defining the headers of the output record. + +### Record key + +The scope `key` allows defining the key of the output record. Only string key is currently supported. + +### Source Metadata + +The scope `metadata` allows read access to information about the file being processing. + +#### Commons Metadata + +| Predefined Fields (ScEL) | Description | Type | +|---------------------------------|-----------------------------------------------------------------------|-----------------------| +| `$metadata.name` | The URI of the source object. | `string` | +| `$metadata.uri` | The name of the source object. | `string` | +| `$metadata.contentLength` | The content-length of the source object. | `string` | +| `$metadata.lastModified` | The creation date or the last modified date, whichever is the latest. | `string` | +| `$metadata.contentDigest` | The digest of the source object content. | `string` | +| `$metadata.userDefinedMetadata` | The user-defined metadata. | `Map[string, object]` | + +The `userDefinedMetadata` object may contain additional information (i.e. properties) about the source object. + +**Azure** + +* `azure.blob.storage.account` +* `azure.blob.storage.blobUrl` +* `azure.blob.storage.creationTime` +* `azure.blob.storage.contentEncoding` +* `azure.blob.storage.contentType` + +**AWS** + +* `s3.object.summary.bucketName` +* `s3.object.summary.key` +* `s3.object.summary.etag` +* `s3.object.summary.storageClass` +* `s3.object.user.metadata.` (optional) + +**GCP** + +* `gcs.blob.bucket` +* `gcs.blob.name` +* `gcs.blob.storageClass` (optional) +* `gcs.blob.contentEncodinge` (optional) +* `gcs.blob.contentType` (optional) +* `gcs.blob.createTime` (optional) +* `gcs.blob.ownerType` (optional) +* `gcs.blob.user.metadata.` (optional) + +#### Local File Object + +For files read from the local file system, the following additional metadata will be available. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|------------------------------|----------| +| `$metadata.absolutePath` | The file absolute path | `string` | +| `$metadata.inode` | The file Unix inode | `long` | +| `$metadata.path` | The file directory path | `string` | + + +### Record Offset + +The scope `offset` allows read access to information about the original position of the record into the source file. +The available fields depend on the configured FileInputRecord. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|-----------------------------------------------|--------| +| `$offset.timestamp` | The creation time of the record (millisecond) | `long` | + +Information only available if `RowFilterReader` is configured. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|-------------------------------------------------------|--------| +| `$offset.startPosition` | The start position of the record into the source file | `long` | +| `$offset.endPosition` | The end position of the record into the source file | `long` | +| `$offset.size` | The size in bytes | `long` | +| `$offset.rows` | The number of rows already read from the source file. | `long` | + +Information only available if `BytesArrayInputReader` is configured. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|-------------------------------------------------------------------------------|--------| +| `$offset.startPosition` | The start position of the record into the source file (always equals to 0) | `long` | +| `$offset.endPosition` | The end position of the record into the source file (equals to the file size) | `long` | + +Information only available if `AvroFilterInputReader` is configured. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|---------------------------------------------------|--------| +| `$offset.blockStart` | The start position of the current block | `long` | +| `$offset.position` | The position into the current block. | `long` | +| `$offset.records` | The number of record read into the current block. | `long` | + +### System + +The scope `system` allows accessing to the system environment variables and runtime properties. + +| Predefined Fields (ScEL) | Description | Type | +|--------------------------|------------------------------------|-----------------------| +| `$system.env` | The system environment variables. | `map[string, string]` | +| `$system.props` | The system environment properties. | `map[string, string]` | + +### Timestamp + +The scope `$timestamp` allows defining the timestamp of the output record. + +### Topic + +The scope `$topic` allows defining the target topic of the output record. + +### Value + +The scope `$value` allows defining the fields of the output record + +### Variables + +The scope `$variables` allows read/write access to a simple key-value map structure. +This scope can be used to share user-defined variables between [Processing Filters](/kafka-connect-file-pulse/docs/developer-guide/filters/). + +{{% alert title="Warning" color="warning" %}} +Variables are not cached between records. +{{% /alert %}} + +## Built-in Functions + +ScEL supports a number of predefined functions that can be used to apply a single transformation on a field. + +### Numeric functions + +ScEL numeric functions are used primarily for numeric manipulation and/or mathematical calculations. + +#### `CONVERTS` + +| **Since**: **`-`** | +|-----------------------------------------------------------| +| **Syntax** : `{{ converts(, ) }}` | +| **Returned type** : `ANY` | + +> Converts one type to another. The following casts are supported: + +#### `GT` + +| **Since**: **`2.4.0`** | +|-------------------------------------------------------------------| +| **Syntax** : `{{ gt(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Executes "*less than operation*" on two values and returns `TRUE` if the first value is less than the second value, `FALSE`, otherwise. + +#### `LT` + +| **Since**: **`2.4.0`** | +|-------------------------------------------------------------------| +| **Syntax** : `{{ lt(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Executes "*greater than operation*" on two values and returns `TRUE` if the first value is greater than the second value, `FALSE`, otherwise. + +### Binary Functions + +#### `AND` + +| **Since**: **`2.4.0`** | +|-------------------------------------------------------------------------------| +| **Syntax** : `{{ and(, , [...]) }}` | +| **Returned type** : `BOOLEAN` | + +> Checks if all of the given conditional expressions are `TRUE`. + +#### `IF` + +| **Since**: **`2.4.0`** | +|---------------------------------------------------------------------------------------------------------| +| **Syntax** : `{{ if(, , ) }}` | +| **Returned type** : `BOOLEAN` | + +> Evaluates the given boolean expression and returns one value if `TRUE` and another value if `FALSE`. + +#### `NOT` + +| **Since**: **`2.4.0`** | +|------------------------------------------------| +| **Syntax** : `{{ not() }}` | +| **Returned type** : `BOOLEAN` | + +> Reverses a boolean value. + +#### `OR` + +| **Since**: **`2.4.0`** | +|------------------------------------------------------------------------------| +| **Syntax** : `{{ or(, , [...]) }}` | +| **Returned type** : `BOOLEAN` | + +> Checks if at least one of the given conditional expressions is `TRUE`. + +### Collection + +#### `EXCTRACT_ARRAY` + +| **Since**: **`-`** | +|-----------------------------------------------------------------| +| **Syntax** : `{{ extract_array(, ) }}` | +| **Returned type** : `ANY` | + +> Returns the element at the specified position of the specified array. + +#### `LENGTH` + +| **Since**: **`2.4.0`** | +|-------------------------------------------------| +| **Syntax** : `{{ length() }}` | +| **Returned type** : `INTEGER` | + +> Returns the number of elements into an array or the length of a string field + +#### `CONTAINS` + +| **Since**: **`-`** | +|-----------------------------------------------------------------------| +| **Syntax** : `{{ contains(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if an array contains a given value. + +### Date and time + +#### `TIMESTAMP_DIFF` + +| **Since**: **`2.4.0`** | +|-------------------------------------------------------------------------------------------| +| **Syntax** : `{{ timestamp_diff(unit, epoch_time_expression1, epoch_time_expression2) }}` | +| **Returned type** : `LONG` | + +> Calculates the amount of time between two epoch times in seconds or milliseconds. For more information on `unit` see [ChronoUnit](https://docs.oracle.com/javase/8/docs/api/java/time/temporal/ChronoUnit.html). + +#### `TO_TIMESTAMP` + +| **Since**: **`2.4.0`** | +|------------------------------------------------------------------------------------| +| **Syntax** : `{{ to_timestamp(, , []) }}` | +| **Returned type** : `LONG` | + +> Parses a given string value and returns the epoch-time in milliseconds. + +#### `UNIX_TIMESTAMP` + +| **Since**: **`2.4.0`** | +|---------------------------------------| +| **Syntax** : `{{ unix_timestamp() }}` | +| **Returned type** : `LONG` | + +> Returns the current time in milliseconds. + +### Nulls + +#### `IS_EMPTY` + +| **Since**: **`2.4.0`** | +|----------------------------------------------------| +| **Syntax** : `{{ is_empty() }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if an array as no elements or a string field has no characters + +#### `IS_NULL` + +| **Since**: **`2.4.0`** | +|--------------------------------------------------| +| **Syntax** : `{{ is_null() }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if a field's value is `NULL`. + +#### `NLV` + +| **Since**: **`-`** | +|--------------------------------------------------------------------| +| **Syntax** : `{{ nlv(, ) }}` | +| **Returned type** : `Any` | + +> Sets a default value if a field's value is `NULL` + +### Strings & Objects + +#### `CONCAT` + +| **Since**: **`-`** | +|---------------------------------------------------------------------------| +| **Syntax** : `{{ concat(, Returns a `STRING` value consisting of the concatenation of two or more string expressions. + +##### Examples + +**Concatenate two fields** + +* _Expression_: + + `{{ concat(world'hello','') }}` + +* _Output (type = `STRING`)_: + + `helloworld` + +#### `CONCAT_WS` + +| **Since**: **`-`** | +|----------------------------------------------------------------------------------------------------------------| +| **Syntax** : `{{ concat_ws(, , , , , ...) }}` | +| **Returned type** : `STRING` | + +> Returns a `STRING` value consisting of the concatenation of two or more string expressions, using the specified separator between each. + Optionally, the returned `STRING` may be prefixed and/or suffixed. + +##### Examples + +**Concatenate two fields** + +* _Expression_: + + `{{ concat(' ', '', '!', 'hello','world') }}` + +* _Output (type = `STRING`)_: + + `hello world!` + +##### `HASH` + +| **Since**: **`-`** | +|-----------------------------------------------| +| **Syntax** : `{{ hash() }}` | +| **Returned type** : `STRING` | + +> Returns the hashed of a given `STRING` expression, using murmur2 algorithm. + +#### `EQUALS` + +| **Since**: **`-`** | +|---------------------------------------------------------------------| +| **Syntax** : `{{ equals(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if a `STRING` or number fields's value equals the specified value. + +#### `ENDS_WITH` + +| **Since**: **`-`** | +|--------------------------------------------------------------| +| **Syntax** : `{{ ends_with(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if a string field's value end with the specified string suffix. + +##### Examples + +**Check whether a field ends with a given suffix** + +* _Expression_: + + `{{ ends_with('thumbnail.png', '.png') }}` + +* _Output (type = `BOOLEAN`)_: + + `true` + +#### `EXISTS` + +| **Since**: **`-`** | +|-----------------------------------------------------------| +| **Syntax** : `{{ exists(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if a `STRUCT` has the specified field. + +#### `EXTRACT_STRUCT_FIELD` + +| **Since**: **`2.7.0`** | +|------------------------------------------------------------------------| +| **Syntax** : `{{ extract_struct_field(, ) }}` | +| **Returned type** : `ANY` | + +> Extracts the value at the specified field `path` from the `STRUCT` returned by the given `struct_expression`. +> If the requested `path` does not exist, the function returns `NULL`. + +#### `FROM_BYTES` + +| **Since**: **`2.7.0`** | +|--------------------------------------------------------------| +| **Syntax** : `{{ from_bytes(struct_expression, '') }}` | +| **Returned type** : `STRING` | + +> Converts a `BYTES` value to a `STRING` in the specified encoding type. +> The following list shows the supported encoding types: `hex`, `utf8`, `ascii` and `base64`. + +#### `LOWERCASE` + +| **Since**: **`-`** | +|----------------------------------------------------| +| **Syntax** : `{{ lowercase() }}` | +| **Returned type** : `STRING` | + +> Converts all of the characters in a `STRING` value to lower case. + +##### Examples + +**Converts a field to lowercase** + +* _Expression_: + + `{{ lowercase('Apache Kafka') }}` + +* _Output (type = `STRING`)_ + + `apache kafka` + +#### `MATCHES` + +| **Since**: **`-`** | +|-----------------------------------------------------------| +| **Syntax** : `{{ matches(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `TRUE` if a field's value match the specified regex. + +#### `MD5` + +| **Since**: **`-`** | +|----------------------------------------------| +| **Syntax** : `{{ md5() }}` | +| **Returned type** : `STRING` | + +> Returns the MD5 digest of `STRING` value. + +#### `REPLACE_ALL` + +| **Since**: **`-`** | +|------------------------------------------------------------------------------| +| **Syntax** : `{{ replace_all(, , ) }}` | +| **Returned type** : `STRING` | + +> Replaces every subsequence of a `STRING` that matches the given pattern with the given replacement string. + +#### `SPLIT` + +| **Since**: **`-`** | +|--------------------------------------------------------------------| +| **Syntax** : `{{ split(, , []) }}` | +| **Returned type** : `ARRAY` | + +> Splits a `STRING` value using the specified regex or character and returns the resulting array. + +#### `STARTS_WITH` + +| **Since**: **`-`** | +|----------------------------------------------------------------| +| **Syntax** : `{{ starts_with(, ) }}` | +| **Returned type** : `BOOLEAN` | + +> Returns `STRING` if a string field's value start with the specified string prefix. + +##### Examples + +**Check whether a field starts with a given prefix** + +* _Expression_: + + `{{ starts_with('fr_FR', 'fr') }}` + +* _Output (type = `BOOLEAN`)_: + + `true` + + +#### `TRIM` + +| **Since**: **`-`** | +|-----------------------------------------------| +| **Syntax** : `{{ trim() }}` | +| **Returned type** : `STRING` | + +> Trims the spaces from the beginning and end of a string. + +##### Examples + +**Remove leading and tailing blank spaces from strings** + +* _Expression_: + + `{{ trim(' FilePulse ') }}` + +* _Output (type = `STRING`):_ + + `FilePulse` + +#### `UPPERCASE` + +| **Since**: **`-`** | +|----------------------------------------------------| +| **Syntax** : `{{ uppercase() }}` | +| **Returned type** : `STRING` | + +> Converts all of the characters in a `STRING` value to upper case. + +##### Examples + +**Convert a field to uppercase** + +* _Expression_: + + `{{ uppercase('Apache Kafka') }}` + +* _Output_ (type = `STRING`): + + `APACHE KAFKA` + +#### `UUID` + +| **Since**: **`-`** | +|------------------------------| +| **Syntax** : `{{ uuid() }}` | +| **Returned type** : `STRING` | + +> Returns a Universally Unique Identifier (UUID) + +### URLs + +#### `PARSE_URL` + +| **Since**: **`2.7.0`** | +|--------------------------------------------------------------------| +| **Syntax** : `{{ parse_url(, []) }}` | + +> Parses a valid field-value URL/URI and return a struct consisting of all the components (fragment, host, path, port, query, scheme, userInfo). + +##### Examples + +**Parse a simple URL:** + +* _Expression_: + + `{{ parse_url('https://www.example.com') }}` + +* _Output_ (type=`STRUCT`): + + ```json + {"host":"www.example.com", "path":"","port":null,"scheme":"https", "fragment":null,"query":null, "userInfo": null} + ``` + +**Parse a complex URL that includes a path, a port number, and user information:** + +* _Expression_: + + `{{ parse_url('http://user:password@example.com:1234/index.html?user=1') }}` + +* _Output_ (type=`STRUCT`): + + ```json + {"host":"www.example.com", "path":"/index.html", "port":1234, "scheme":"http", "fragment":null, "query":"?user=1", "userInfo": "user:password"} + ``` + +**Parse an email URL:** + +* _Expression_: + + `{{ parse_url('mailto:abc@xyz.com') }}` + +* _Output_ (type=`STRUCT`): + + ```json + {"host":null, "path":"abc@xyz.com", "port":null, "scheme":"mailto", "fragment":null, "query":null, "userInfo":null} + ``` + +**Parse an invalid URL that is missing the scheme. missing _scheme_.** + +Set the `permissive<>` parameter set to `true` to indicate that the function should return an object that contains the error message. + +* _Expression_: + + `{{ parse_url('example.com', true) }}` + +* _Output_ (type=`STRUCT`): + + ```json + {"error":"Could not parse URL: scheme not specified"} + ``` \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/cleaning-completed-files.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/cleaning-completed-files.md new file mode 100644 index 000000000..9ad3fc8b0 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/cleaning-completed-files.md @@ -0,0 +1,85 @@ +--- +date: 2022-03-02 +title: "File Cleanup Policies" +linkTitle: "File Cleanup Policies" +weight: 100 +description: > + The commons configuration for Connect File Pulse. +--- + +The connector can be configured with a specific [FileCleanupPolicy](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/FileCleanupPolicy.java) implementation. + +The cleanup policy can be configured with the below connect property : + +| Configuration | Description | Type | Default | Importance | +|---------------------------|----------------------------------------------------------------------|-------|---------|------------| +| `fs.cleanup.policy.class` | The fully qualified name of the class which is used to cleanup files | class | *-* | high | + + +## Generic Cleanup Policies + +### `DeleteCleanPolicy` + +This policy deletes all files regardless of their final status (completed or failed). + +To enable this policy, the property `fs.cleanup.policy.class` must be configured to : + +``` +io.streamthoughts.kafka.connect.filepulse.fs.clean.DeleteCleanupPolicy +``` + +### `LogCleanPolicy` + +This policy prints into logs some information after files completion. + +To enable this policy, the property `fs.cleanup.policy.class` must be configured to : + +``` +io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy +``` + +## Cleanup Policies: Local Filesystem + +### `LocalMoveCleanupPolicy` + +This policy attempts to move atomically files to configurable target directories. + +To enable this policy, the property `fs.cleanup.policy.class` must be configured to : + +``` +io.streamthoughts.kafka.connect.filepulse.fs.clean.LocalMoveCleanupPolicy +``` + +{{% alert title="Usage" color="warning" %}} +This policy only works when using the `LocalFSDirectoryListing`. +{{% /alert %}} + +#### Configuration + +| Configuration | Description | Type | Default | Importance | +|-------------------------------|------------------------------------------------|--------|------------|------------| +| `cleaner.output.failed.path` | Target directory for file proceed with failure | string | *.failure* | high | +| `cleaner.output.succeed.path` | Target directory for file proceed successfully | string | *.success* | high | + +## Cleanup Policies: Amazon + +### `AmazonMoveCleanupPolicy` + +This policy moves S3 objects atomically files to configurable target directories. + +To enable this policy, the property `fs.cleanup.policy.class` must be configured to : + +``` +io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy +``` + + +| Configuration | Description | Type | Default | Importance | +|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------------------|------------| +| `fs.cleanup.policy.move.success.aws.bucket.name` | The name of the destination S3 bucket for success objects (optional) | `string` | *Bucket name of the source S3 Object* | HIGH | +| `fs.cleanup.policy.move.success.aws.prefix.path` | The prefix to be used for defining the key of an S3 object to move into the destination bucket. | `string` | | HIGH | +| `fs.cleanup.policy.move.failure.aws.bucket.name` | The name of the destination S3 bucket for failure objects (optional) | `string` | *Bucket name of the source S3 Object* | HIGH | +| `fs.cleanup.policy.move.failure.aws.prefix.path` | The prefix to be used for defining the key of an S3 object to move into the destination bucket. | `string` | | HIGH | +| `aws.s3.default.object.storage.class` | The AWS storage class to associate with an S3 object when it is copied by the connector (e.g., during a move operation). Accepted values are: `STANDARD`, `GLACIER`, `REDUCED_REDUNDANCY`, `STANDARD_IA`,`ONEZONE_IA`,`INTELLIGENT_TIERING`,`DEEP_ARCHIVE` | `string` | | LOW | + +## Implementing your own policy \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/conditional-execution.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/conditional-execution.md new file mode 100644 index 000000000..e9c0b079a --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/conditional-execution.md @@ -0,0 +1,44 @@ +--- +date: 2022-03-02 +title: "Conditional Execution" +linkTitle: "Conditional Execution" +weight: 60 +description: > + Learn how to conditionally execute a transformation filter. +--- + +A conditional property `if` can be configured on each filter to determine if that filter should be applied or skipped. +When a filter is skipped, record flow to the next filter without any modification. + +`if` configuration accepts a [Simple Connect Expression](../accessing-data-and-metadata) that must return to `TRUE` or `FALSE`. +If the configured expression does not evaluate to a boolean value the filter chain will fail. + +The `BOOLEAN` value returned from the filter condition can be inverted by setting the property `invert` to `TRUE`. + +For example, the below filter will only be applied on records having a log message containing "BadCredentialsException" + +``` +filters.TagSecurityException.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter +filters.TagSecurityException.if={{ contains(data.logmessage, BadCredentialsException) }} +filters.TagSecurityException.invert=false +filters.TagSecurityException.field=tags +filters.TagSecurityException.values=SecurityAlert +``` + +The following list shows the supported functions that return a `BOOLEAN` type for use with the `if` config property: + +* [`and`](../accessing-data-and-metadata#and) +* [`contains`](../accessing-data-and-metadata#contains) +* [`ends_with`](../accessing-data-and-metadata#ends_with) +* [`equals`](../accessing-data-and-metadata#equals) +* [`exists`](../accessing-data-and-metadata#exists) +* [`gt`](../accessing-data-and-metadata#gt) +* [`is_null`](../accessing-data-and-metadata#is_null) +* [`lt`](../accessing-data-and-metadata#lt) +* [`matches`](../accessing-data-and-metadata#matches) +* [`starts_with`](../accessing-data-and-metadata#starts_with) +* [`or`](../accessing-data-and-metadata#or) + +{{% alert title="Limitations" color="warning" %}} +Conditions cannot be used to easily create pipeline branching. +{{% /alert %}} diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/configuration.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/configuration.md new file mode 100644 index 000000000..938dd94ac --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/configuration.md @@ -0,0 +1,249 @@ +--- +date: 2022-03-02 +title: "Configuration" +linkTitle: "Configuration" +weight: 20 +description: > + The common configurations for deploying a File Pulse connector. +--- + +## Commons configuration + +Whatever the kind of files you are processing a connector should always be configured with the below properties. +These configurations are described in detail in subsequent chapters. + +**Common Kafka Connect properties** + +| Configuration | Description | Type | Default | Importance | +|---------------|------------------------------------------------------------------------|--------|---------|------------| +| `topic` | The default output topic to write | string | *-* | high | +| `tasks.max` | The maximum number of tasks that should be created for this connector. | string | *-* | high | + +**Properties for listing and cleaning object files ([FileSystemListing](/kafka-connect-file-pulse/docs/developer-guide/file-system-listing/))** + +| Configuration | Description | Type | Default | Importance | +|------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|---------------------------------------------------------------------------|------------| +| `fs.listing.class` | Class which is used to list eligible files from the scanned file system. | class | *-* | MEDIUM | +| `fs.listing.filters` | Filters use to list eligible input files | list | *-* | MEDIUM | +| `fs.listing.interval.ms` | Time interval (in milliseconds) at wish to scan input directory | long | *10000* | HIGH | +| `fs.listing.task.delegation.enabled` | Boolean indicating whether the file listing process should be delegated to tasks. | boolean | *false* | LOW | +| `fs.cleanup.policy.class` | The fully qualified name of the class which is used to cleanup files | class | *-* | HIGH | +| `fs.cleanup.policy.triggered.on` | Specify the status when a file get cleanup. Valid values are: `COMPLETED`, `COMMITTED` | string | *COMPLETED* | MEDIUM | +| `max.scheduled.files` | Maximum number of files that can be schedules to tasks. | long | *1000* | HIGH | +| `allow.tasks.reconfiguration.after.timeout.ms` | Specify the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed. | long | *-* | LOW | +| `task.partitioner.class` | The TaskPartitioner to be used for partitioning files to tasks. | class | `io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner` | HIGH | +| `tasks.halt.on.error` | Should a task halt when it encounters an error or continue to the next file. | boolean | *false* | HIGH | +| `tasks.file.processing.order.by` | The strategy to be used for sorting files for processing. Valid values are: `LAST_MODIFIED`, `URI`, `CONTENT_LENGTH`, `CONTENT_LENGTH_DESC`. | string | `LAST_MODIFIED` | MEDIUM | +| `tasks.empty.poll.wait.ms` | The amount of time in millisecond a tasks should wait if a poll returns an empty list of records. | long | *500* | HIGH | +| `ignore.committed.offsets` | Should a task ignore committed offsets while scheduling a file. | boolean | *false* | LOW | +| `value.connect.schema` | The schema for the record-value. | string | *-* | MEDIUM | + +**Properties for transforming object file record([Filters Chain Definition](/kafka-connect-file-pulse/docs/developer-guide/filters-chain-definition/))** + +| Configuration | Description | Type | Default | Importance | +|---------------|--------------------------------------------------------------------|------|---------|------------| +| `filters` | List of filters aliases to apply on each data (order is important) | list | *-* | MEDIUM | + +**Properties for reading object file record([FileReaders](/kafka-connect-file-pulse/docs/developer-guide/file-readers/))** + +| Configuration | Description | Type | Default | Importance | +|----------------------|----------------------------------------------------------------------------------|-------|---------|------------| +| `tasks.reader.class` | The fully qualified name of the class which is used by tasks to read input files | class | *-* | HIGH | + +**Properties for uniquely identifying object files and records ([FileReaders](/kafka-connect-file-pulse/docs/developer-guide/file-readers/))** + +| Configuration | Description | Type | Default | Importance | +|-----------------------|--------------------------------------------------------------------------------------------------------|---------|------------------------------------------------------------------------------|------------| +| `offset.policy.class` | Class which is used to determine the source partition and offset that uniquely identify a input record | `class` | *io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy* | HIGH | + +**Properties for synchronizing Connector and Tasks** + +| Configuration | Description | Type | Default | Importance | +|-----------------------------------|--------------------------------------------------------------------------------------------|---------|------------------------------------------------------------------------------------|------------| +| `tasks.file.status.storage.class` | The FileObjectStateBackingStore class to be used for storing status state of file objects. | `Class` | `io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore` | HIGH | + +**Available implementations are :** +* `io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore` +* `io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore` + +{{% alert title="Limitation" color="warning" %}} +The `InMemoryFileObjectStateBackingStore` implement is not fault-tolerant and should be only when using Kafka Connect in standalone mode or a single worker. +{{% /alert %}} + +**Properties for configuring the `KafkaFileObjectStateBackingStore` class** + +| Configuration | Description | Type | Default | Importance | +|------------------------------------------------------|--------------------------------------------------------------------------------------------------------------|--------|-----------------------------|------------| +| `tasks.file.status.storage.topic` | Name of the internal topic used by tasks and connector to report and monitor file progression. | class | *connect-file-pulse-status* | HIGH | +| `tasks.file.status.storage.bootstrap.servers` | A list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster. | string | *-* | HIGH | +| `tasks.file.status.storage.topic.partitions` | The number of partitions to be used for the status storage topic. | int | *-* | LOW | +| `tasks.file.status.storage.topic.replication.factor` | The replication factor to be used for the status storage topic. | float | *-* | LOW | + + +**Properties for configuring the `InMemoryFileObjectStateBackingStore` class** + +| Configuration | Description | Type | Default | Importance | Since | +|-----------------------------------------------------|-------------------------------------------------------------|-------|---------|------------|--------| +| `tasks.file.status.storage.cache.max.size.capacity` | Specifies the max size capacity of the LRU in-memory cache. | `int` | *10000* | LOW | v2.5.0 | + +In addition, to override the default configuration for the internal consumer and producer clients, +you can use one of the following override prefixes : + +* `tasks.file.status.storage.consumer.` +* `tasks.file.status.storage.producer.` + +## Examples + +Some configuration examples are available [here](https://github.com/streamthoughts/kafka-connect-file-pulse/tree/master/examples). + +## Defining Connect Record Schema + +The optional `value.connect.schema` config property can be used to set the connect-record schema that should be used. +If there is no schema pass through the connector configuration, a schema will be resolved for each record produced. + +The `value.connect.schema` must be passed as a JSON string that respects the following schema (using Avro representation): + +```json +{ + "type":"record", + "name":"Schema", + "fields":[ + { + "name":"name", + "type":"string", + "doc": "The name of this schema" + }, + { + "name":"type", + "type":{ + "type":"enum", + "name":"Type", + "symbols":[ + "STRUCT", + "STRING", + "BOOLEAN", + "INT8", + "INT16", + "INT32", + "INT64", + "FLOAT32", + "FLOAT64", + "BYTES", + "MAP", + "ARRAY" + ] + }, + "doc": "The type of this schema" + }, + { + "name":"doc", + "type":[ + "null", + "string" + ], + "default":null, + "doc": "The documentation for this schema" + }, + { + "name":"fieldSchemas", + "type":[ + "null", + { + "type":"map", + "values":"Schema" + } + ], + "default":null, + "doc": "The fields for this Schema. Throws a DataException if this schema is not a struct." + }, + { + "name":"valueSchema", + "type":[ + "null", + { + "type":"map", + "values":"Schema" + } + ], + "default":null, + "doc": "The value schema for this map or array schema. Throws a DataException if this schema is not a map or array." + }, + { + "name":"keySchema", + "type":[ + "null", + { + "type":"map", + "values":"Schema" + } + ], + "default":null, + "doc": "The key schema for this map schema. Throws a DataException if this schema is not a map." + }, + { + "name":"defaultValue", + "type":[ + "null", + "string" + ], + "default":null + }, + { + "name":"isOptional", + "type":"boolean", + "default":false, + "doc": "true if this field is optional, false otherwise" + }, + { + "name":"version", + "type":[ + "null", + "integer" + ], + "default":null, + "doc": "The optional version of the schema. If a version is included, newer versions *must* be larger than older ones." + } + ] +} +``` + +**Example:** + +```json +{ + "name":"com.example.User", + "type":"STRUCT", + "isOptional":false, + "fieldSchemas":{ + "id":{ + "type":"INT64", + "isOptional":false + }, + "first_name":{ + "type":"STRING", + "isOptional":true + }, + "last_name":{ + "type":"STRING", + "isOptional":true + }, + "email":{ + "type":"STRING", + "isOptional":true + }, + "gender":{ + "type":"STRING", + "isOptional":true + }, + "country":{ + "type":"STRING", + "isOptional":true + }, + "favorite_colors":{ + "type":"ARRAY", + "isOptional":true, + "valueSchema": {"type": "STRING"} + } + } +} +``` diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-readers.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-readers.md new file mode 100644 index 000000000..2594e2612 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-readers.md @@ -0,0 +1,104 @@ +--- +date: 2022-03-02 +title: "File Readers" +linkTitle: "File Readers" +weight: 40 +description: > + Learn how to configure Connect FilePulse for a specific file format. +--- + +The `FilePulseSourceTask` uses the [FileInputReader](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/FileInputReader.java). +configured in the connector's configuration for reading object files (i.e., `tasks.reader.class`). + +Currently, Connect FilePulse provides the following `FileInputReader` implementations : + +**Amazon S3** + +package: `io.streamthoughts.kafka.connect.filepulse.fs.reader` + +* `AmazonS3AvroFileInputReader` +* `AmazonS3BytesArrayInputReader` +* `AmazonS3RowFileInputReader` +* `AmazonS3XMLFileInputReader` +* `AmazonS3MetadataFileInputReader` + +**Azure Blob Storage** + +package: `io.streamthoughts.kafka.connect.filepulse.fs.reader` + +* `AzureBlobStorageAvroFileInputReader` +* `AzureBlobStorageBytesArrayInputReader` +* `AzureBlobStorageRowFileInputReader` +* `AzureBlobStorageXMLFileInputReader` +* `AzureBlobStorageMetadataFileInputReader` + +**Google Cloud Storage** + +package: `io.streamthoughts.kafka.connect.filepulse.fs.reader` + +* `GcsAvroFileInputReader` +* `GcsBytesArrayInputReader` +* `GcsRowFileInputReader` +* `GcsXMLFileInputReader` +* `GcsMetadataFileInputReader` + +**Local Filesystem** + +package: `io.streamthoughts.kafka.connect.filepulse.fs.reader` + +* `LocalAvroFileInputReader` +* `LocalBytesArrayInputReader` +* `LocalRowFileInputReader` +* `LocalXMLFileInputReader` +* `LocalMetadataFileInputReader` + +## RowFileInputReader (default) + +The `RowFileInputReader`s can be used to read files line by line. +This reader creates one record per row. It should be used for reading delimited text files, application log files, etc. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------------------|------------------------------------------------------------------------------------|-----------|---------|------------| +| `file.encoding` | The text file encoding to use | `String` | `UTF_8` | High | +| `buffer.initial.bytes.size` | The initial buffer size used to read input files. | `String` | `4096` | Medium | +| `min.read.records` | The minimum number of records to read from file before returning to task. | `Integer` | `1` | Medium | +| `skip.headers` | The number of rows to be skipped in the beginning of file. | `Integer` | `0` | Medium | +| `skip.footers` | The number of rows to be skipped at the end of file. | `Integer` | `0` | Medium | +| `read.max.wait.ms` | The maximum time to wait in milliseconds for more bytes after hitting end of file. | `Long` | `0` | Medium | + +## XxxBytesArrayInputReader + +The `BytesArrayInputReader`s create a single byte array record from a source file. + +## XxxAvroFileInputReader + +The `AvroFileInputReader`s can be used to read Avro files. + +## XxxXMLFileInputReader + +The `XMLFileInputReader`s can be used to read XML files. + +### Configuration + +| Configuration | Since | Description | Type | Default | Importance | +|-------------------------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------|------------| +| `reader.xpath.expression` | | The XPath expression used extract data from XML input files | `String` | `/` | High | +| `reader.xpath.result.type` | | The expected result type for the XPath expression in [NODESET, STRING] | `String` | `NODESET` | High | +| `reader.xml.force.array.on.fields` | | The comma-separated list of fields for which an array-type must be forced | `List` | - | High | +| `reader.xml.parser.validating.enabled` | `2.2.0` | Specifies that the parser will validate documents as they are parsed. | `boolean` | `false` | Low | +| `reader.xml.parser.namespace.aware.enabled` | `2.2.0` | Specifies that the XML parser will provide support for XML namespaces. | `boolean` | `false` | Low | +| `reader.xml.exclude.empty.elements` | `2.2.0` | Specifies that the reader should exclude element having no field. | `boolean` | `false` | Low | +| `reader.xml.exclude.node.attributes` | `2.4.0` | Specifies that the reader should exclude all node attributes. | `boolean` | `false` | Low | +| `reader.xml.exclude.node.attributes.in.namespaces` | `2.4.0` | Specifies that the reader should only exclude node attributes in the defined list of namespaces. | `list` | `false` | Low | +| `reader.xml.data.type.inference.enabled` | `2.3.0` | Specifies that the reader should try to infer the type of data nodes. | `boolean` | `false` | High | +| `reader.xml.attribute.prefix` | `2.4.0` | If set, the name of attributes will be prepended with the specified prefix when they are added to a record. | `string` | `""` | Low | +| `reader.xml.content.field.name` | `2.5.4` | Specifies the name to be used for naming the field that will contain the value of a TextNode element having attributes. | `string` | `value` | Low | +| `reader.xml.field.name.characters.regex.pattern` | `2.5.4` | Specifies the regex pattern to use for matching the characters in XML element name to replace when converting a document to a struct. | `string` | `[.\-]'` | Low | +| `reader.xml.field.name.characters.string.replacement` | `2.5.4` | Specifies the replacement string to be used when converting a document to a struct. | `string` | `_` | Low | +| `reader.xml.force.content.field.for.paths` | `2.5.4` | The comma-separated list of field for which a content-field must be forced. | `List` | - | Low | + +## XxxMetadataFileInputReader + +The `FileInputMetadataReader`s can be used to send a single record per file containing metadata, i.e.: `name`, `path`, `hash`, `lastModified`, `size`, etc. \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-system-listing.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-system-listing.md new file mode 100644 index 000000000..bef24853d --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/file-system-listing.md @@ -0,0 +1,157 @@ +--- +date: 2022-04-13 +title: "FileSystem Listing" +linkTitle: "FileSystem Listing" +weight: 30 +description: > + Learn how to configure Connect FilePulse for listing files from local or remote storage system. +--- + +The `FilePulseSourceConnector` periodically lists object files that may be streamed into Kafka using the [FileSystemListing](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileSystemListing.java) +configured in the connector's configuration. + +## Supported Filesystems + +Currently, Kafka Connect FilePulse supports the following implementations: + +* `AmazonS3FileSystemListing` +* `AzureBlobStorageFileSystemListing` +* `GcsFileSystemListing` +* `LocalFSDirectoryListing` (default) + +### Local Filesystem (default) + +The `LocalFSDirectoryListing` class can be used for listing files that exist in a local filesystem directory. + +#### How to use it ? + +`fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing` + +#### Configuration + +| Configuration | Description | Type | Default | Importance | +|--------------------------------|-----------------------------------------------------------------------|-----------|---------|------------| +| `fs.listing.directory.path` | The input directory to scan | `string` | - | HIGH | +| `fs.listing.recursive.enabled` | Flag indicating whether local directory should be recursively scanned | `boolean` | `true` | MEDIUM | + +#### Supported File types + +The `LocalFSDirectoryListing` will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : [Files#probeContentType](https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#probeContentType-java.nio.file.Path)) +Supported content-types are: + +* **GZIP** : `application/x-gzip` +* **TAR** : `application/x-tar` +* **ZIP** : `application/x-zip-compressed` or `application/zip` + +### Amazon S3 + +The `AmazonS3FileSystemListing` class can be used for listing objects that exist in a specific Amazon S3 bucket. + +#### How to use it ? + +`fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing` + +#### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------------|------------| +| `aws.access.key.id` | AWS Access Key ID AWS | `string` | - | HIGH | +| `aws.secret.access.key` | AWS Secret Access Key | `string` | - | HIGH | +| `aws.secret.session.token` | AWS Secret Session Token | `string` | - | HIGH | +| `aws.credentials.provider.class` | The AWSCredentialsProvider to use if no access key id and secret access key is configured. | `class` | `com.amazonaws.auth.EnvironmentVariableCredentialsProvider` | LOW | +| `aws.s3.region` | The AWS S3 Region, e.g. us-east-1 | `string` | `Regions.DEFAULT_REGION.getName()` | MEDIUM | +| `aws.s3.service.endpoint` | AWS S3 custom service endpoint. | `string` | - | MEDIUM | +| `aws.s3.path.style.access.enabled` | Configures the client to use path-style access for all requests. | `string` | - | MEDIUM | +| `aws.s3.bucket.name` | The name of the Amazon S3 bucket. | `string` | - | HIGH | +| `aws.s3.bucket.prefix` | The prefix to be used for restricting the listing of the objects in the bucket | `string` | - | MEDIUM | +| `aws.s3.default.object.storage.class` | The AWS storage class to associate with an S3 object when it is copied by the connector (e.g., during a move operation). Accepted values are: `STANDARD`, `GLACIER`, `REDUCED_REDUNDANCY`, `STANDARD_IA`,`ONEZONE_IA`,`INTELLIGENT_TIERING`,`DEEP_ARCHIVE` | `string` | | LOW | +| `aws.s3.backoff.delay.ms` | The base back-off time (milliseconds) before retrying a request. | `int` | `100` | MEDIUM | +| `aws.s3.backoff.max.delay.ms` | The maximum back-off time (in milliseconds) before retrying a request. | `int` | `20_000` | MEDIUM | +| `aws.s3.backoff.max.retries` | The maximum number of retry attempts for failed retryable requests. | `int` | `3` | MEDIUM | + +### Google Cloud Storage + +The `GcsFileSystemListing` class can be used for listing objects that exist in a specific Google Cloud Storage bucket. + +#### How to use it ? + +`fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing` + +#### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------|------------| +| `gcs.credentials.path` | The path to GCP credentials file. Cannot be set when `GCS_CREDENTIALS_JSON_CONFIG` is provided. If no credentials is specified the client library will look for credentials via the environment variable `GOOGLE_APPLICATION_CREDENTIALS`. | `string` | - | HIGH | +| `gcs.credentials.json` | The GCP credentials as JSON string. Cannot be set when `GCS_CREDENTIALS_PATH_CONFIG` is provided. If no credentials is specified the client library will look for credentials via the environment variable `GOOGLE_APPLICATION_CREDENTIALS`. | `string` | - | HIGH | +| `gcs.bucket.name` | The GCS bucket name to download the object files from. | `string` | - | HIGH | +| `gcs.blobs.filter.prefix` | The prefix to be used for filtering blobs whose names begin with it. | `string` | - | MEDIUM | + +### Azure Blob Storage + +The `AzureBlobStorageConfig` class can be used for listing objects that exist in a specific Azure Storage Container. + +#### How to use it ? + +`fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AzureBlobStorageFileSystemListing` + +#### Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------------------------|----------------------------------------------------------------------------------|----------|---------|------------| +| `azure.storage.connection.string` | Azure storage account connection string. | `string` | - | HIGH | +| `azure.storage.account.name` | The Azure storage account name. | `string` | - | HIGH | +| `azure.storage.account.key` | The Azure storage account key. | `string` | - | HIGH | +| `azure.storage.container.name` | The Azure storage container name. | `string` | - | MEDIUM | +| `azure.storage.blob.prefix` | The prefix to be used for restricting the listing of the blobs in the container. | `string` | - | MEDIUM | + +## Filtering input files + +You can configure one or more `FileFilter` that will be used to determine if a file should be scheduled for processing or ignored. +All files that are filtered out are simply ignored and remain untouched on the file system until the next scan. +At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing. + +FilePulse packs with the following built-in filters : + +### IgnoreHiddenFileFilter + +The `IgnoreHiddenFileFilter` can be used to filter hidden files from being read. + +**Configuration example** + +```properties +fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.IgnoreHiddenFileListFilter +``` + +{{% alert title="Limitation" color="warning" %}} +This filter is only supported by the `LocalFSDirectoryListing`. +{{% /alert %}} + +### LastModifiedFileFilter + +The `LastModifiedFileFilter` can be used to filter all files that have been modified to recently based on their last modified date property. + +```properties +fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.LastModifiedFileFilter +# The last modified time for a file can be accepted (default: 5000) +file.filter.minimum.age.ms=10000 +``` + +### RegexFileFilter + +The `RegexFileFilter` can be used to filter all files that do not match the specified regex. + +```properties +fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter +# The regex pattern used to match input files +file.filter.regex.pattern="\\.log$" +``` + +### SizeFileListFilter + +The `SizeFileListFilter` can be used to filter all files that are smaller or larger than a specific byte size. + +```properties +fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter +file.filter.minimum.size.bytes=0 +file.filter.maximum.size.bytes=9223372036854775807 +``` \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters-chain-definition.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters-chain-definition.md new file mode 100644 index 000000000..205077390 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters-chain-definition.md @@ -0,0 +1,70 @@ +--- +date: 2022-06-08 +title: "Filter Chain Definition" +linkTitle: "Filter Chain Definition" +weight: 50 +description: > + Learn how to define complex pipelines to transform and structure your data before integration into Kafka. +--- + +The connector can be configured to apply complex transformations on messages before they are written to Kafka. + +## Configuration + +A [filter](../filters) chain can be specified in the connector configuration. + + * `filters` - List of aliases for the filter, specifying the order in which the filters will be applied. + * `filters.$alias.type` - Fully qualified class name for the filter. + * `filters.$alias.$filterSpecificConf` - Configuration properties for the filter + +For example, let's parse a standard application logs file written with log4j using the build-in filters : + +```properties +filters=GroupMultilineException, ExtractFirstLine, ParseLog4jLog + +filters.GroupMultilineException.type=io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter +filters.GroupMultilineException.negate=false +filters.GroupMultilineException.pattern="^[\\t]" + +filters.ExtractFirstLine.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter +filters.ExtractFirstLine.field=$.logmessage +filters.ExtractFirstLine.values={{ extract_array($.message, 0) } + +filters.ParseLog4jLog.type=io.streamthoughts.kafka.connect.filepulse.filter.impl.GrokFilter +filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:thread} %{GREEDYDATA:logmessage}" +filters.ParseLog4jLog.source=log +filters.ParseLog4jLog.overwrite=logmessage +``` + +## List of Processing Filters Available + +These filters are available for use with Kafka Connect File Pulse: + +| Filter | Description | Since | +|-----------------------------------------------------|------------------------------------------------------------------------------------------|----------| +| [AppendFilter](../filters#appendfilter) | Appends one or more values to an existing or non-existing array field | | +| [ConvertFilter](../filters#convertfilter) | Converts a message field's value to a specific type | | +| [CSVFilter](../filters#csvfilter) | Parses a message field's value containing columns delimited by a character into a struct | `v2.7.0` | +| [DateFilter](../filters#datefilter) | Converts a field's value containing a date to a unix epoch time | | +| [DelimitedRowFilter](../filters#delimitedrowfilter) | Parses a message field's value containing columns delimited by a separator into a struct | | +| [DropFilter](../filters#dropfilter) | Drops messages satisfying a specific condition without throwing exception. | | +| [ExcludeFilter](../filters#excludefilter) | Excludes one or more fields from the input record. | `v1.4.0` | +| [ExplodeFilter](../filters#explodefilter) | Explodes an array or list field into separate records. | `v1.4.0` | +| [FailFilter](../filters#failfilter) | Throws an exception when a message satisfy a specific condition | | +| [GrokFilter](../filters#grokfilter) | Parses an unstructured message field's value to a struct by combining Grok patterns | | +| [GroupRowFilter](../filters#grouprowfilter) | Regroups multiple following messages into a single message by composing a grouping key | | +| [JoinFilter](../filters#joinfilter) | Joins values of an array field with a specified separator | | +| [JSONFilter](../filters#jsonfilter) | Unmarshallings a JSON message field's value to a complex struct | | +| [MoveFilter](../filters#movefilter) | Moves an existing record field's value to a specific target path | `v1.5.0` | +| [MultiRowFilter](../filters#multirowfilter) | Combines following message lines into single one by combining patterns | | +| [NullValueFilter](../filters#nullvaluefilter) | Combines following message lines into single one by combining patterns | `v2.3.0` | +| [RenameFilter](../filters#renamefilter) | Renames a message field | | +| [SplitFilter](../filters#splitfilter) | Splits a message field's value to array | | +| [XmlToJsonFilter](../filters#xmltojsonfilter) | Parses an XML record-field and convert it to a JSON string | `v2.4.0` | +| [XmlToStructFilter](../filters#xmltostructfilter) | Parses an XML record-field into STRUCT | `v2.4.0` | + +## Difference between Kafka Connect Single Message Transforms (SMT) functionality + +Filters can be compared to Kafka Connect built-in [Transformers](https://kafka.apache.org/documentation/#connect_transforms). +However, filters allow more complex pipelines to be built for structuring file data. +For example, they can be used to split one input message to multiple messages or to temporarily buffer consecutive messages in order to regroup them by fields or a pattern. \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters.md new file mode 100644 index 000000000..641ebb53c --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/filters.md @@ -0,0 +1,846 @@ +--- +date: 2022-06-03 +title: "Processing Filters" +linkTitle: "Processing Filters" +weight: 80 +description: > + The list of available transformation filters. +--- + +These filters are available for use with Kafka Connect File Pulse: + +| Filter | Description | Since | +|-------------------------------------------|------------------------------------------------------------------------------------------|----------| +| [AppendFilter](#appendfilter) | Appends one or more values to an existing or non-existing array field | | +| [ConvertFilter](#convertfilter) | Converts a message field's value to a specific type | | +| [CSVFilter](#csvfilter) | Parses a message field's value containing columns delimited by a character into a struct | `v2.7.0` | +| [DateFilter](#datefilter) | Converts a field's value containing a date to a unix epoch time | | +| [DelimitedRowFilter](#delimitedrowfilter) | Parses a message field's value containing columns delimited by a regex into a struct | | +| [DropFilter](#dropfilter) | Drops messages satisfying a specific condition without throwing exception. | | +| [ExcludeFilter](#excludefilter) | Excludes one or more fields from the input record. | `v1.4.0` | +| [ExplodeFilter](#explodefilter) | Explodes an array or list field into separate records. | `v1.4.0` | +| [FailFilter](#failfilter) | Throws an exception when a message satisfy a specific condition | | +| [GrokFilter](#grokfilter) | Parses an unstructured message field's value to a struct by combining Grok patterns | | +| [GroupRowFilter](#grouprowfilter) | Regroups multiple following messages into a single message by composing a grouping key | | +| [JoinFilter](#joinfilter) | Joins values of an array field with a specified separator | | +| [JSONFilter](#jsonfilter) | Unmarshalling a JSON message field's value to a complex struct | | +| [MoveFilter](#movefilter) | Moves an existing record field's value to a specific target path | `v1.5.0` | +| [MultiRowFilter](#multirowfilter) | Combines following message lines into single one by combining patterns | | +| [NullValueFilter](#nullvaluefilter) | Combines following message lines into single one by combining patterns | `v2.3.0` | +| [RenameFilter](#renamefilter) | Renames a message field | | +| [SplitFilter](#splitfilter) | Splits a message field's value to array | | +| [XmlToJsonFilter](#xmltojsonfilter) | Parses an XML record-field and convert it to a JSON string | `v2.4.0` | +| [XmlToStructFilter](#xmltostructfilter) | Parses an XML record-field into STRUCT | `v2.4.0` | + +## AppendFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter` + +The `AppendFilter` is probably one of the most important processing filters to know. +It allows you to manipulate a source record by easily adding or replacing a field with a constant +value or a value extracted from another existing field +using [ScEL](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/). + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|---------------------------------------|--------------------------------------------------------------------------------------------------------|---------|------------| +| `field` | The name of the field to be added | string ([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `value` | The value of the field to be added | string ([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `overwrite` | Is existing field should be overwrite | boolean | *false* | high | + +### Examples + +The following examples shows how to use the `AppendFilter` to concat two values from the array field named `values` +using +a [substitution expression](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/#string-substitution) +. +The concat value is then added to the field named `result`. + +**Configuration** + +```properties +filters=SubstituteFilter +filters.SubstituteFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter +filters.SubstituteFilter.field="$.result" +filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}" +``` + +**Input** + +```json +{ + "record": { + "value": [ + "Hello", + "World" + ] + } +} +``` + +**Output** + +```json +{ + "record": { + "value": [ + "Hello", + "World" + ], + "result": "Hello-World" + } +} +``` + +In the previous example, we used the simple property expression `result` to indicate the target field to which our value +is added. +We have actually omitted +the [expression scope](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/#scopes) `$value`. +By default, if no scope is defined in an expression, the scope `$value` is implicitly applied. +Hence, we could have used the fully expression `$value.result` which is similar to the simplified expression `result`. + +But, you can perfectly use another expression scope. For example, you can leverage the `AppendFilter` to dynamically +resolve the record-key or the output topic based on the record data. + +The following configuration show how to use the `$topic` scope : + +```properties +filters.SubstituteFilter.field="$topic" +filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}" +``` + +**Input** + +```json +{ + "record": { + "value": [ + "Hello", + "World" + ] + } +} +``` + +**Output** + +```json +{ + "context": { + "topic": "my-topic-hello" + }, + "record": { + "value": [ + "Hello", + "World" + ] + } +} +``` + +Finally, the `AppendFilter` can also accept a substitution expression for the property field. +This allows to dynamically determine the name of the field to be added. + +The following examples show how to use a property expression to get the named of the field from a + +```properties +filters.SubstituteFilter.field="$.target" +filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}" +``` + +**Input** + +```json +{ + "record": { + "target": "result", + "value": [ + "Hello", + "World" + ] + } +} +``` + +**Output** + +```json +{ + "record": { + "target": "result", + "value": [ + "Hello", + "World" + ], + "result": "Hello-World" + } +} +``` + +## ConvertFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter` + +The `ConvertFilter` can be used to convert a field's value into a specific type. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------|---------|---------|------------| +| `field` | The field to convert (dot notation is supported) | string | *-* | high | +| `to` | The type to which the field must be converted | string | *,* | high | +| `default` | The default value to apply if the field cannot be converted | string | *,* | medium | +| `ignoreMissing` | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | *true* | high | + +Supported types are : + +* `SHORT` +* `INTEGER` +* `LONG` +* `FLOAT` +* `DOUBLE` +* `BOOLEAN` +* `STRING` +* `ARRAY` +* `BYTES` + +### Examples + +The following example shows how to convert a a field's value containing the string `yes` into a boolean. + +**Configuration** + +```properties +filters.BooleanConverter.field="target" +filters.BooleanConverter.to="BOOLEAN" +``` + +**Input** + +```json +{ + "record": { + "target": "yes" + } +} +``` + +**Output** + +```json +{ + "record": { + "target": true + } +} +``` + +## CsvFilter + +The following provides usage information for: `io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter` + +The `CsvFilter` can be used to parse a message field's value containing columns delimited by a character into a struct + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|---------|---------|------------| +| `separator` | Sets the delimiter to use for separating entries. | string | *,* | High | +| `ignore.quotations` | Sets the ignore quotations mode - if true, quotations are ignored. | boolean | *false* | Medium | +| `escape.char` | Sets the character to use for escaping a separator or quote. | string | *\\* | Medium | +| `ignore.leading.whitespace` | Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored. | boolean | *true* | Medium | +| `quote.char` | Sets the character to use for quoted elements. | string | *"* | Medium | +| `strict.quotes` | Sets the strict quotes setting - if true, characters outside the quotes are ignored. | boolean | *false* | Medium | +| `trim.column` | Remove the leading and trailing whitespaces from all columns. | boolean | *false* | Low | +| `duplicate.columns.as.array` | Treat duplicate columns as an array. If false and a record contains duplicate columns an exception will be thrown. | boolean | *false* | High | +| `extract.column.name` | Define the field from which the schema should be detected (all columns will be of type 'string') | string | | High | +| `auto.generate.column.names` | Define whether column names should autogenerated or not (column names will of the form 'column1, column2') | string | | High | +| `columns` | The list of comma-separated column names in order they appear in each row. Columns must be in the form of: COL1_NAME:COL1_TYPE;COL2_NAME:COL2_TYPE | string | | High | + +### Examples + +The following example shows the use of the `CSVFilter` to split the `message` field using a comma (`,`) as a separator character. +The name of each column is extracted from the fields `headers`. + +```properties +filters=ParseCSVLine +filters.ParseCSVLine.type="io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter" +filters.ParseCSVLine.extract.column.name="headers" +filters.ParseCSVLine.separator="," +filters.ParseCSVLine.trim.column="true" +``` + +## DateFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.DateFilter` + +The `DateFilter` converts a field's value containing a date to a unix epoch time. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|---------------------------------------|-------------------------------------------------------------------------------------------------------|---------|------------| +| `field` | The field to get the date from . | string([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `target` | The target field. | string([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `timezone` | The timezone to use for parsing date. | string | *UTC* | high | +| `locale` | The locale to use for parsing date. | string | *en_EN* | high | +| `formats` | List of the expected date formats. | list | *-* | high | + +### Examples + +```properties +filters=ParseISODate +filters.ParseISODate.type=io.streamthoughts.kafka.connect.filepulse.filter.DateFilter +filters.ParseISODate.field="$.date" +filters.ParseISODate.target="$.timestamp" +filters.ParseISODate.formats="yyyy-MM-dd'T'HH:mm:ss" +``` + +**Input** + +```json +{ + "record": { + "date": "2001-07-04T12:08:56" + } +} +``` + +**Output** + +```json +{ + "record": { + "date": "2001-07-04T12:08:56", + "timestamp": 994248536000 + } +} +``` + +## DelimitedRowFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter`. + +The `DelimitedRowFilter` can be used to parse and stream delimited row files (i.e CSV) into Kafka. +Each row is parsed and published into a configured topic as a single Kafka data. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------------|---------------------------------------------------------------------------------------------------------------------|---------|---------|------------| +| `separator` | The character used as a delimiter/separator between each value | string | *;* | high | +| `trimColumn` | Remove the leading and trailing whitespaces from all columns. | boolean | *false* | low | +| `extractColumnName` | Define the field from which the schema should be detected (all columns will be of type 'string') | string | | high | +| `columns` | The list of comma-separated column names in order they appear in each row. columns must be in the form of NAME:TYPE | string | | high | + +### Examples + +The following example shows the use of the `DelimitedRowFilter` to split the `message` field using `|` as a separator +character. +The name of each column is extracted from the fields `headers`. + +```properties +filters=ParseDelimitedRow +filters.ParseDelimitedRow.extractColumnNam="headers" +filters.ParseDelimitedRow.separator="\\|" +filters.ParseDelimitedRow.trimColumn="true" +filters.ParseDelimitedRow.type="io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter" +``` + +{{% alert title="Important" color="info" %}} +Under the hood, the `DelimitedRowFilter` will use +the [`String#split`](https://docs.oracle.com/javase/9/docs/api/java/lang/String.html#split-java.lang.String-) method to +parse the input line. This +method accepts a regex as argument then any special character must be escaped. +{{% /alert %}} + +## DropFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.DropFilter`. + +The `DropFilter` can be used to prevent some messages (i.e records) to be written into Kafka. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|---------|------------| +| `if` | Condition to apply the filter on the current record. | string [ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `invert` | Invert the boolean value return from the filter condition. | boolean | *false* | medium | + +For more information about `if` property, see : [Conditional execution](conditional-execution). + +### Examples + +The following example shows the usage of **DropFilter** to only keep records with a field `level` containing to `ERROR`. + +```properties +filters=Drop +filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter +filters.Drop.if={{ equals($.level, 'ERROR') }} +filters.Drop.invert=true +``` + +## ExcludeFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter`. + +The `ExcludeFilter` can be used to exclude one or more fields from the input record. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|----------------------------------------------------|------|---------|------------| +| `fields` | The comma-separated list of field names to exclude | list | ** | high | + +### Examples + +The following example shows the usage of **ExplodeFilter**. + +```properties +filters=Exclude +filters.Exclude.type=io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter +filters.Exclude.fields=message +``` + +**Input** + +```json +{ + "record": { + "message": "{\"name\":\"pulse\"}", + "name": "pulse" + } +} +``` + +**Output** + +```json +{ + "record": { + "name": "pulse" + } +} +``` + +## ExplodeFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter`. + +The `ExplodeFilter` can be used to explode an array or list field into separate records. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|----------------------------------------------|--------|-----------|------------| +| `source` | The input field on which to apply the filter | string | *message* | medium | + +### Examples + +The following example shows the usage of **ExplodeFilter**. + +```properties +filters=Explode +filters.Explode.type=io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter +filters.Explode.source=measurements +``` + +**Input (single record)** + +```json +{ + "record": { + "id": "captor-0001", + "date": "2020-08-06T17:00:00", + "measurements": [ + 38, + 40, + 42, + 37 + ] + } +} +``` + +**Output (multiple records)** + +```json +{ + "record": { + "id": "captor-0001", + "date": "2020-08-06T17:00:00", + "measurements": 38 + } +} +{ + "record": { + "id": "captor-0001", + "date": "2020-08-06T17:00:00", + "measurements": 40 + } +} +{ + "record": { + "id": "captor-0001", + "date": "2020-08-06T17:00:00", + "measurements": 42 + } +} +{ + "record": { + "id": "captor-0001", + "date": "2020-08-06T17:00:00", + "measurements": 37 + } +} +``` + +## FailFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.FailFilter`. + +The fail filter can be used to throw an exception with a provided error message. +For example, this can be useful to stop processing a file when a non-conform record is read. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|-----------------------------------------------------------------------------------------------------------------------------------------|---------|---------|------------| +| `if` | Condition to apply the filter on the current record. | string | *-* | high | +| `invert` | Invert the boolean value return from the filter condition. | boolean | *false* | medium | +| `message` | The error message thrown by the filter. ([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | string | *-* | high | + +### Examples + +The following example shows the usage of **FailFilter** to stop processing a file when a field is equals to `null`. + +```properties +filters=Fail +filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter +filters.Fail.if={{ is_null($.user_id) }} +filters.Fail.message=Invalid row, user_id is missing : {{ $value }} +``` + +## GrokFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter`. + +The `GrokFilter` allows you to parse unstructured data like applications logs to extract structured and meaningful data +fields. + +The `GrokFilter`is based on: https://github.com/streamthoughts/kafka-connect-transform-grok + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|----------------------|-----------------------------------------------|---------|-----------|------------| +| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high | +| `pattern` | The Grok pattern to match. | string | *-* | high | +| `overwrite` | The fields to overwrite. | list | medium | +| `patternDefinitions` | Custom pattern definitions. | list | *-* | low | +| `patternsDir` | List of user-defined pattern directories | string | *-* | low | +| `source` | The input field on which to apply the filter | string | *message* | medium | + +### Examples + +The following example shows the usage of **GrokFilter** to parse and extract fields from application log message. + +```properties +filters=ParseLog4jLog +filters.ParseLog4jLog.pattern="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" +filters.ParseLog4jLog.overwrite="message" +filters.ParseLog4jLog.source="message" +filters.ParseLog4jLog.type="io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter" +filters.ParseLog4jLog.ignoreFailure="true" +``` + +## GroupRowFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter`. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|------------------------|--------------------------------------------------------|---------|-----------|------------| +| `fields` | List of fields used to regroup records | list | high | +| `max.buffered.records` | The maximum number of records to group (default : -1). | integer | *-1* | high | +| `target` | The target array field to put the grouped field | integer | *records* | high | + +### Examples + +```properties +``` + +## JoinFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.JoinFilter`. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|----------------------------------------------|-------------------------------------------------------------------------------------------------------|---------|------------| +| `field` | The field to get the date from | string([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `target` | The target field | string([ScEL supported](/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)) | *-* | high | +| `separator` | The separator used for joining array values. | string | *,* | high | + +### Examples + +## JSONFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter`. + +The `JSONFilter` parses an input json field. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------|------------------------------------------------------------------------------------------------|---------|-----------|------------| +| `overwrite` | The fields to overwrite | list | *-* | medium | +| `source` | The input field on which to apply the filter | string | *message* | medium | +| `target` | The target field to put the parsed JSON data | string | *-* | high | +| `charset` | The charset to be used for reading the source field (if source if of type `BYTES` | string | *UTF-8* | medium | +| `explode.array` | A boolean that specifies whether to explode arrays into separate records | boolean | *false* | medium | +| `merge` | boolean that specifies whether to merge the JSON object into the top level of the input record | boolean | *false* | medium | + +### Examples + +```properties +filters=MyJsonFilter +filters.MyJsonFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter +filters.MyJsonFilter.source=message +filters.MyJsonFilter.target=payload +``` + +## MultiRowFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter`. + +The `MultiRowFilter` joins multiple lines into a single Struct using a regex pattern. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|----------------------|------------------------------------------------|---------|---------|------------| +| `negate` | Negate the regexp pattern (if not matched). | boolean | *-* | medium | +| `pattern` | The pattern to match multiline | string | *-* | high | +| `patternDefinitions` | Custom pattern definitions. | list | *-* | low | +| `patternsDir` | List of user-defined pattern directories | string | *-* | low | +| `separator` | The character to be used to concat multi lines | string | "\\n" | high | + +## MoveFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter`. + +The `MoveFilter` moves an existing record field's value to a specific target path. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|--------------------------------|--------|---------|------------| +| `source` | The path of the field to move" | string | *-* | high | +| `target` | The path to move the field | string | *-* | high | + +### Examples + +The following example shows the usage of the `MoveFilter`. + +```properties +filters=MyMoveFilter +filters.MyMoveFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter +filters.MyMoveFilter.source=field.child +filters.MyMoveFilter.target=moved +``` + +**Input** + +```json +{ + "record": { + "field": { + "child": "foo" + } + } +} +``` + +**Output** + +```json +{ + "record": { + "moved": "foo" + } +} +``` + +## NullValueFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter`. + +The `NullValueFilter` is used to empty a record-value to null. + +### Example + +```properties +filters=NullValueIfDeleteOp +filters.NullValueIfDeleteOp.type=io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter +filters.NullValueIfDeleteOp.if={{ equals($value.op, 'DELETE') }} +``` + +## RenameFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter`. + +The `RenameFilter` is used to rename a specified field. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------|---------|---------|------------| +| `field` | The field to rename | string | *-* | high | +| `target` | The target name | string | *-* | high | +| `ignoreMissing` | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | *true* | high | + +### Examples + +```properties +filters=RenameInputField +filters.RenameInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter +filters.RenameInputField.field=input +filters.RenameInputField.target=renamed +``` + +**Input** + +```json +{ + "record": { + "input": "foo" + } +} +``` + +**Output** + +```json +{ + "record": { + "renamed": "foo" + } +} +``` + +## SplitFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter`. + +The `SplitFilter` splits a field's value of type string into an array by using a specific separator. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------|-------------------------------------------------------------------|--------|---------|------------| +| `split` | The comma-separated list of fields to split | string | *-* | high | +| `separator` | The separator used for splitting a message field's value to array | string | *,* | high | +| `target` | The target field to put the parsed JSON data | string | *-* | high | + +### Example + +**Configuration** + +```properties +filters=SplitInputField +filters.SplitInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter +filters.SplitInputField.split=input +filters.SplitInputField.separator=, +``` + +**Input** + +```json +{ + "record": { + "input": "val0,val1,val2" + } +} +``` + +**Output** + +```json +{ + "record": { + "input": "val0,val1,val2", + "output": [ + "val0", + "val1", + "val2" + ] + } +} +``` + +## XmlToJsonFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter`. + +The `XmlToJsonFilter` parses and converts an XML record-field it to a JSON string. +This is filter is based on the `org.json.XML` library. + +### Configuration + +| Configuration | Description | Type | Default | Importance | +|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-------------|------------| +| `source` | The input field on which to apply the filter. | string | `"message"` | high | +| `source.charset` | The charset to be used for reading the source. | string | `"UTF-8"` | high | +| `xml.parser.keep.strings` | When parsing the XML into JSON, specifies if values should be kept as strings (true), or if they should try to be guessed into JSON values (numeric, boolean, string) | boolean | `false` | high | +| `xml.parser.cDataTagName` | The name of the key in a JSON Object that indicates a CDATA section. | string | `"value"` | high | + +### Example + +**Configuration** + +```properties +filters=XmlToJson +filters.XmlToJson.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter +filters.XmlToJson.xml.parser.keep.strings=false +filters.XmlToJson.xml.parser.cDataTagName=data +``` + +## XmlToStructFilter + +The following provides usage information for : `io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter`. + +The `XmlToStructFilter` parses an XML record-field into STRUCT + +### Configuration + +| Configuration | Since | Description | Type | Default | Importance | +|-----------------------------------------------|-------|---------------------------------------------------------------------------------------------------------------------------------------|-----------|-------------|------------| +| `source` | | The input field on which to apply the filter. | string | `"message"` | High | +| `xml.force.array.on.fields` | | The comma-separated list of fields for which an array-type must be forced | `List` | | High | +| `xml.parser.validating.enabled` | | Specifies that the parser will validate documents as they are parsed. | `boolean` | `false` | Low | +| `xml.parser.namespace.aware.enabled` | | Specifies that the XML parser will provide support for XML namespaces. | `boolean` | `false` | Low | +| `xml.exclude.empty.elements` | | Specifies that the reader should exclude element having no field. | `boolean` | `false` | Low | +| `xml.exclude.node.attributes` | | Specifies that the reader should exclude all node attributes. | `boolean` | `false` | Low | +| `xml.exclude.node.attributes.in.namespaces` | | Specifies that the reader should only exclude node attributes in the defined list of namespaces. | `List` | `false` | Low | +| `xml.data.type.inference.enabled` | | Specifies that the reader should try to infer the type of data nodes. | `boolean` | `false` | High | +| `xml.attribute.prefix` | | If set, the name of attributes will be prepended with the specified prefix when they are added to a record. | `string` | `""` | Low | +| `xml.content.field.name` | 2.5.0 | Specifies the name to be used for naming the field that will contain the value of a TextNode element having attributes. | `string` | `value` | Low | +| `xml.field.name.characters.regex.pattern` | 2.5.0 | Specifies the regex pattern to use for matching the characters in XML element name to replace when converting a document to a struct. | `string` | `[.\-]' | Low | +| `xml.field.name.character.string.replacement` | 2.5.0 | Specifies the replacement string to be used when converting a document to a struct. | `string` | `_` | Low | +| `xml.force.content.field.for.paths` | 2.5.0 | The comma-separated list of field for which a content-field must be forced. | `List` | | Low | +### Example + +**Configuration** + +```properties +filters=XmlToStruct +filters.ParseXmlDocument.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter +filters.ParseXmlDocument.source=message +filters.ParseXmlDocument.xml.parser.validating.enabled=true +filters.ParseXmlDocument.xml.parser.namespace.aware.enabled=true +filters.ParseXmlDocument.xml.exclude.empty.elements=true +filters.ParseXmlDocument.xml.data.type.inference.enabled=true +``` \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/handling-failures.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/handling-failures.md new file mode 100644 index 000000000..e9b12ab5a --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/handling-failures.md @@ -0,0 +1,77 @@ +--- +date: 2022-03-02 +title: "Handling Failures" +linkTitle: "Handling Failures" +weight: 70 +description: > + Learn how to handle failures thrown while processing of records by the filter chain. +--- + +The connector provides some mechanisms to handle failures while executing filters. + +By default, the filters chain will immediately fail after an exception is thrown. +But, you can also configure each filter to either ignore errors or to branch to a sub filters-chain. + +## Configuration + +| Configuration | Description | Type | Default | Importance | +|-----------------|-----------------------------------------------------------------------------------|---------|---------|------------| +| `withOnFailure` | List of filters aliases to apply on each data after failure (order is important). | list | *-* | medium | +| `ignoreFailure` | Ignore failure and continue pipeline filters | boolean | *false* | medium | + + +## Ignoring failure + +By setting the property `ignoreFailure` to `true`, the filter will be ignored if an exception is thrown. + +In that case, the exception is written to the output logs and current data record is simply forwarded to the next filter in the chain. + +Using `ignoreFailure=true` can be recommended for optional filters. + +### Example + +In the below example, the filter with alias `Log4jGrokFilter` will be skip in case of failure. + +``` +filters=Log4jGrokFilter + +filters.Log4jGrokFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter +filters.Log4jGrokFilter.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" +filters.Log4jGrokFilter.source=message +filters.Log4jGrokFilter.ignoreFailure=true +``` + +## Defining error filter chain + +A more sophisticated way to handle failures is to define a sub filters-chain on each concern filters. + +Sub-filter chains can be defined using the property `withOnFailure`. + +### Accessing exception data + +Within an error filter chain, some additional fields are available to each filter context. + +| Predefined Fields / ScEL | Description | Type | +|------------------------------|-------------------------------------------------|----------| +| `$error.exceptionMessage` | The exception message | `string` | +| `$error.exceptionStacktrace` | The exception stack-trace | `string` | +| `$error.exceptionClassName` | The exception class name | `string` | +| `$error.filter` | The name of the filter that threw the exception | `string` | + +### Example + +In the below example, an `errorMessage` field is added to the record value if the filter with alias Log4jGrokFilter fails. + +``` +filters=Log4jGrokFilter + +filters.Log4jGrokFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter +filters.Log4jGrokFilter.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" +filters.Log4jGrokFilter.source=message +filters.Log4jGrokFilter.overwrite=message +filters.Log4jGrokFilter.withOnFailure=AppendError + +filters.AppendError.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter +filters.AppendError.field=$.exceptionMessage +filters.AppendError.value={{ $error.exceptionMessage }} +``` diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/installation.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/installation.md new file mode 100644 index 000000000..6e8dfc66a --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/installation.md @@ -0,0 +1,49 @@ +--- +date: 2022-06-07 +title: "Installation" +linkTitle: "Installation" +weight: 10 +description: > + How to install Connect File Pulse +--- + + +**Connect FilePulse** can be installed either from [GitHub Releases Page](https://github.com/streamthoughts/kafka-connect-file-pulse/releases) or from [Confluent Hub](https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse). + +{{% alert title="Caution" color="warning" %}} +You should note that the connector downloaded from Confluent Hub may not reflect the latest available version. +{{% /alert %}} + +**Confluent Hub CLI installation** + +Use the [Confluent Hub client](https://docs.confluent.io/current/confluent-hub/client.html) to install this connector with: + +```bash +confluent-hub install streamthoughts/kafka-connect-file-pulse:latest +``` + +**Download Installation** + +Download the distribution ZIP file for the latest available version. + +**Example :** + +```bash +export VERSION=2.6.0 +wget https://github.com/streamthoughts/kafka-connect-file-pulse/releases/download/v$VERSION/streamthoughts-kafka-connect-file-pulse-$VERSION.zip +``` + +Extract it into one of the directories that is listed on the `plugin.path` worker configuration property. + +You can also use the Confluent Hub CLI for installing it. + +```bash +confluent-hub install --no-prompt streamthoughts-kafka-connect-file-pulse-$VERSION.zip +``` + +{{% alert title="Important" color="info" %}} +When you run Connect workers in **distributed mode**, the connector-plugin must be installed **on each of machines** running Kafka Connect. +{{% /alert %}} + + + diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/offsets.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/offsets.md new file mode 100644 index 000000000..433b8195e --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/offsets.md @@ -0,0 +1,18 @@ +--- +date: 2022-03-02 +title: "Identifying Files" +linkTitle: "Identifying Files" +weight: 45 +description: > + Learn how Kafka Connect FilePulse uniquely identifies files. +--- + +Kafka Connect FilePulse uses a pluggable interface called [`SourceOffsetPolicy`](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/SourceOffsetPolicy.java) for +uniquely identifying files. Basically, the implementation passed in the connector's configuration is used for computing a unique identifier which is +used by Kafka Connect to persist the position of the connector for each file (i.e., the offsets saved in the `connect-offsets` topic). + +By default, Kafka Connect FilePulse use the default implementation `DefaultSourceOffsetPolicy` which accepts the following configuration: + +| Configuration | Description | Type | Default | Importance | +|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------|------------| +| `offset.attributes.string` | A separated list of attributes, using '+' character as separator, to be used for uniquely identifying an object file; must be one of [name, path, lastModified, inode, hash, uri] (e.g: name+hash). Note that order doesn't matter. | `string` | `uri` | HIGH | diff --git a/docs/content/en/docs/Archives/v2.10.x/Developer Guide/tracking-files-status.md b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/tracking-files-status.md new file mode 100644 index 000000000..2ffd8cb22 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Developer Guide/tracking-files-status.md @@ -0,0 +1,80 @@ +--- +date: 2022-06-08 +title: "Tracking File Status" +linkTitle: "Tracking File Status" +weight: 90 +description: > + Learn how Connect FilePulse tracks the processing of each input file. +--- + +Connect File Pulse uses an internal topic (*default:`connect-file-pulse-status`*) to track the current state of files being processed. +This topic is used internally by Tasks to communicate to the SourceConnector instance, but you can easily use it to monitor files progression. + +## The message format +Status event are published into JSON with the following schema : + +```json +{ + "hostname": { + "type": "string", + "description": "The machine from which the source file is read." + }, + "status": { + "type": "string", + "description": "The current status" + }, + "metadata": { + "name": { + "type": "string", + "description": "The file name." + }, + "path": { + "type": "string", + "description": "The file absolute path." + }, + "size": { + "type": "int", + "description": "The file size." + }, + "lastModified": { + "type": "long", + "description": "The file last-modified property." + }, + "inode": { + "type": "int", + "description": "The file inode" + }, + "hash": { + "type": "int", + "description": "CRC32" + } + }, + "offset": { + "position": { + "type": "long", + "description": "The current position in the source file (default : -1)." + }, + "rows": { + "type": "long", + "description": "The number of rows already read from the source file (default : -1)." + }, + "timestamp": { + "type": "long", + "description": "The offset timestamp" + } + } +} +``` + +## List of File Status + +An object file can be in the following states : + +* \[1\] **SCHEDULED** : The file has been scheduled by the connector monitoring thread. +* \[2\] **INVALID** : The file can't be scheduled because it is not readable. +* \[2\] **STARTED** : The file is starting to be read by a task. +* \[3\] **READING** : The task is still processing the file. An event is wrote into Kafka while committing offsets. +* \[4\] **FAILED** : The file processing failed. +* \[4\] **COMPLETED** : The task completes the processing of the file. +* \[4\] **COMMITTED** : The task committed the offsets of the completed file. +* \[5\] **CLEANED** : The file has been successfully clean up by the connector (depends on the configured policy). \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Examples/_index.md b/docs/content/en/docs/Archives/v2.10.x/Examples/_index.md new file mode 100755 index 000000000..694fa0d65 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Examples/_index.md @@ -0,0 +1,33 @@ +--- +date: 2021-05-12 +title: "Tutorials and Shared Resources" +linkTitle: "Tutorials and Shared Resources" +weight: 60 +description: > + A summary of recommended walk-throughs, blog posts, examples and shared resources about Connect File Pulse + +--- + +## Presentations + + +## Blog Posts & Release Announcements + +* 2021-03-26 | [Loading CSV data into Confluent Cloud using the FilePulse connector](https://rmoff.net/2021/03/26/loading-csv-data-into-confluent-cloud-using-the-filepulse-connector/) +* 2020-10-01 | [Ingesting XML data into Kafka - Option 3: Kafka Connect FilePulse connector](https://rmoff.net/2020/10/01/ingesting-xml-data-into-kafka-option-3-kafka-connect-filepulse-connector/) +* 2020-09-10 | [Streaming data into Kafka S01/E03 - Loading JSON file](https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e03-loading-json-file-3d76) +* 2020-08-19 | [Streaming data into Kafka S01/E02 — Loading XML file](https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e02-loading-xml-file-529i) +* 2020-08-12 | [Streaming data into Kafka S01/E01 - Loading CSV file](https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5) +* 2020-01-24 | [Kafka Connect File Pulse, One Connector To Ingest Them All](https://medium.com/streamthoughts/kafka-connect-filepulse-one-connector-to-ingest-them-all-faed018a725c) + + +## Demonstration projects + + +## Code examples + + + + + + diff --git a/docs/content/en/docs/Archives/v2.10.x/FAQ/_index.md b/docs/content/en/docs/Archives/v2.10.x/FAQ/_index.md new file mode 100644 index 000000000..5d884313d --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/FAQ/_index.md @@ -0,0 +1,76 @@ +--- +date: 2020-05-25 +title: "FAQ" +linkTitle: "FAQ" +weight: 97 +description: > + The most frequently asked questions ? +--- + +## Could we deployed FilePulse connector in distributed mode ? + +Connect File Pulse must be running locally to the machine hosting files to be ingested. It is recommend to deploy your connector in distributed mode. Multiple Kafka Connect workers can be deployed on the same machine and participating in the same cluster. The configured input directory will be scanned by the JVM running the SourceConnector. Then, all detected files will be scheduled amongs the tasks spread on your local cluster. + +## Is FilePulse connector fault-tolerant ? + +Connect File Pulse guarantees no data loss by leveraging Kafka Connect fault-tolerance capabilities. +Each task keeps a trace of the file offset of the last record written into Kafka. In case of a restart, tasks will continue where they stopped before crash. +Note, that some duplicates maybe written into Kafka. + +## Is FilePulse connector could be used in place of other solutions like Logstash ? + +Connect File Pulse has some features which are similar to the ones provided by Logstash [codecs](https://www.elastic.co/guide/en/logstash/current/codec-plugins.html)/[filters](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html). Filters like GrokFilter are actually strongly inspired from Logstash. For example you can use it to parse non-structured data like application logs. + +However, Connect File Pulse has not to be originally designed to collect dynamic application log files. + +## Is FilePulse connector support SASL/SSL authentication mechanisms and can be deployed on Confluent Cloud ? + +Yes, FilePulse connector can be deployed on any Kafka Cluster. However, the connector currently requires the use of an internal topic +to perform synchronization between the _SourceConnector_ instance and the _SourceTasks_ that process files. To do that, FilePulse +will create an internal producer and consumer that you need to configure when running against a secured Kafka Cluster. + +To override the default configuration for the internal consumer and producer clients, +you can use one of the following override prefixes : + +* `tasks.file.status.storage.consumer.` +* `tasks.file.status.storage.producer.` + +Example: +```json +{ +"tasks.file.status.storage.bootstrap.servers" : "CCLOUD_BROKER_SERVICE_URI:9092", +"tasks.file.status.storage.topic" : "connect-file-pulse-status", +"tasks.file.status.storage.producer.security.protocol" : "SASL_SSL", +"tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm": "https", +"tasks.file.status.storage.producer.sasl.mechanism" : "PLAIN", +"tasks.file.status.storage.producer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_API_KEY\" password=\"CCLOUD_API_SECRET\";", +"tasks.file.status.storage.producer.request.timeout.ms" : "20000", +"tasks.file.status.storage.consumer.security.protocol" : "SASL_SSL", +"tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm": "https", +"tasks.file.status.storage.consumer.sasl.mechanism" : "PLAIN", +"tasks.file.status.storage.consumer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_API_KEY\" password=\"CCLOUD_API_SECRET\";", +"tasks.file.status.storage.consumer.request.timeout.ms" : "20000" +} +``` + +## What are the differences between FilePulse connector and others Kafka connectors for streaming files ? + +The following table shows a simple comparison between Connect File Pulse and other solutions : [Connect Spooldir](https://github.com/jcustenborder/kafka-connect-spooldir) and Connect [FileStreams](https://github.com/apache/kafka/tree/trunk/connect/file) + +| | Connect FilePulse | Connect Spooldir | Connect FileStreams | +|:--- | :---: | :---: | :---: | +| **Connector Type** | source | source | source / sink | +| **License** |Apache License 2.0 |Apache License 2.0| Apache License 2.0 | +| **Available on Confluent Hub** | YES | YES | YES | +| **Docker image** | YES | NO | NO | +| **Delivery semantics** | At-least-once | At-least-once | At-most-once | +| **Usable in production** | YES | YES | NO | +| **Supported file formats(out-of-the box)** | Delimited, Binary, JSON, Avro, XML (limited) | Delimited, JSON | Text file | YES | NO | +| **Support recursive directory scan** | YES | NO | NO | +| **Support Archive and Compressed files** | YES (`GZIP`, `TAR`, `ZIP`) | NO | NO | +| **Source partitions** | Configurable (filename, path, filename+hash) | filename | filename | +| **Support for multi-tasks** | YES | YES | NO | +| **Support for worker distributed mode** | YES (requires a shared volume) | NO | NO | +| **Support for streaming log files** | YES | NO | YES | +| **Support for transformation** | Single Message Transforms
Processing Filters (Grok, Append, JSON, etc) | Single Message Transforms| * Single Message Transforms +| **Support for tracking processing progress of files** | YES (using an internal topic) | NO | NO | \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Getting started/_index.md b/docs/content/en/docs/Archives/v2.10.x/Getting started/_index.md new file mode 100644 index 000000000..21373714b --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Getting started/_index.md @@ -0,0 +1,163 @@ +--- +date: 2022-06-06 +title: "Getting Started" +linkTitle: "Getting Started" +weight: 10 +description: > + Get started with Connect File Pulse through a step by step tutorial. +--- + +In this tutorial we will explore how to deploy a basic Connect File Pulse connector step by step. + +The prerequisites for this tutorial are : + +* IDE or Text editor. +* Maven 3+ +* Docker (for running a Kafka Cluster 3.x). + +## Start Docker Environment + +Set the following environment variable to execute next commands. + +```bash +export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/ +``` + +**1 ) Run Confluent Platforms with Connect File Pulse** + +```bash +curl -sSL $GITHUB_REPO_MASTER/docker-compose.yml -o docker-compose.yml +docker-compose up -d +``` + +**2 ) Verify that Connect Worker is running (optional)** + +```bash +docker-compose logs "connect-file-pulse" +``` + +**3 ) Check that Connect File Pulse plugin is correctly loaded (optional)** + +```bash +curl -sX GET http://localhost:8083/connector-plugins | grep FilePulseSourceConnector +``` + +## Examples + +### Logs Parsing (Log4j) + +This example starts a new connector instance to parse the Kafka Connect container log4j logs before writing them into a configured topic. + +**1 ) Start a new connector instance** + +```bash + +curl -sSL $GITHUB_REPO_MASTER/examples/connect-file-pulse-quickstart-log4j.json -o connect-file-pulse-quickstart-log4j.json + +$ curl -sX PUT http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j/config \ +-d @connect-file-pulse-quickstart-log4j.json \ +--header "Content-Type: application/json" | jq +``` + +**2 ) Check connector status** +```bash +curl -X GET http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j/status | jq +``` + +**3 ) Consume output topics** +```bash +$ docker exec -it -e KAFKA_OPTS="" connect kafka-avro-console-consumer \ +--topic connect-file-pulse-quickstart-log4j \ +--from-beginning \ +--bootstrap-server broker:29092 \ +--property schema.registry.url=http://schema-registry:8081 +``` + +(output) +```json +... +{"logdate":{"string":"2022-06-06 14:10:34,193"},"loglevel":{"string":"INFO"},"message":{"string":"[task-thread-connect-file-pulse-quickstart-log4j-0] Started FilePulse source task (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask)"}} +{"logdate":{"string":"2022-06-06 14:10:34,193"},"loglevel":{"string":"INFO"},"message":{"string":"[task-thread-connect-file-pulse-quickstart-log4j-0] WorkerSourceTask{id=connect-file-pulse-quickstart-log4j-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)"}} +{"logdate":{"string":"2022-06-06 14:10:34,194"},"loglevel":{"string":"INFO"},"message":{"string":"[task-thread-connect-file-pulse-quickstart-log4j-0] WorkerSourceTask{id=connect-file-pulse-quickstart-log4j-0} Executing source task (org.apache.kafka.connect.runtime.WorkerSourceTask)"}} +{"logdate":{"string":"2022-06-06 14:10:34,695"},"loglevel":{"string":"INFO"},"message":{"string":"[task-thread-connect-file-pulse-quickstart-log4j-0] Completed all object files. FilePulse source task is transitioning to IDLE state while waiting for new reconfiguration request from source connector. (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask)"}} +... +``` + +**4) Observe Connect status** + +Connect File Pulse use an internal topic to track the current state of files being processing. + +```bash +$ docker exec -it -e KAFKA_OPTS="" connect kafka-console-consumer \ +--topic connect-file-pulse-status \ +--from-beginning \ +--bootstrap-server broker:29092 +``` + +(output) +```json +{"metadata":{"uri":"file:/var/log/kafka/kafka-connect.log","name":"kafka-connect.log","contentLength":110900,"lastModified":1654524649569,"contentDigest":{"digest":"2445040665","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":33827724,"system.hostname":"5c1e920f9a28"}},"offset":{"position":-1,"rows":0,"timestamp":1654524649578},"status":"SCHEDULED"} +{"metadata":{"uri":"file:/var/log/kafka/kafka-connect.log","name":"kafka-connect.log","contentLength":111122,"lastModified":1654524649597,"contentDigest":{"digest":"1755089447","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":33827724,"system.hostname":"5c1e920f9a28"}},"offset":{"position":0,"rows":0,"timestamp":1654524649604},"status":"STARTED"} +{"metadata":{"uri":"file:/var/log/kafka/kafka-connect.log","name":"kafka-connect.log","contentLength":111122,"lastModified":1654524649597,"contentDigest":{"digest":"1755089447","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":33827724,"system.hostname":"5c1e920f9a28"}},"offset":{"position":111530,"rows":1271,"timestamp":1654524654094},"status":"READING"} +{"metadata":{"uri":"file:/var/log/kafka/kafka-connect.log","name":"kafka-connect.log","contentLength":111122,"lastModified":1654524649597,"contentDigest":{"digest":"1755089447","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":33827724,"system.hostname":"5c1e920f9a28"}},"offset":{"position":112158,"rows":1274,"timestamp":1654524664011},"status":"READING"} +{"metadata":{"uri":"file:/var/log/kafka/kafka-connect.log","name":"kafka-connect.log","contentLength":111122,"lastModified":1654524649597,"contentDigest":{"digest":"1755089447","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":33827724,"system.hostname":"5c1e920f9a28"}},"offset":{"position":112786,"rows":1277,"timestamp":1654524674029},"status":"READING"} +``` + +**5 ) Stop all containers** +```bash +docker-compose down +``` + +### Parsing a CSV file + +This example starts a new connector instance that parse a CSV file and filter rows based on column's values before writing record into Kafka. + +**1 ) Start a new connector instance** + +```bash +$ curl -sSL $GITHUB_REPO_MASTER/examples/connect-file-pulse-quickstart-csv.json -o connect-file-pulse-quickstart-csv.json + +$ curl -sX PUT http://localhost:8083/connectors/connect-file-pulse-quickstart-csv/config \ +-d @connect-file-pulse-quickstart-csv.json \ +--header "Content-Type: application/json" | jq +``` + +**2 ) Copy example csv file into container** + +```bash +$ curl -sSL $GITHUB_REPO_MASTER/datasets/quickstart-musics-dataset.csv -o quickstart-musics-dataset.csv +$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples +$ docker cp quickstart-musics-dataset.csv connect://tmp/kafka-connect/examples/quickstart-musics-dataset.csv +``` + +**3 ) Check connector status** +```bash +$ curl -X GET http://localhost:8083/connectors/connect-file-pulse-quickstart-csv/status | jq +``` + +**4 ) Check for task completion** + +``` +docker logs --tail="all" -f connect | grep "source task is transitioning to IDLE state" +``` + +**5 ) Consume output topics** +```bash +$ docker exec -it connect kafka-avro-console-consumer \ +--topic connect-file-pulse-quickstart-csv \ +--from-beginning \ +--bootstrap-server broker:29092 \ +--property schema.registry.url=http://schema-registry:8081 +``` + +(output) +```json +{"title":{"string":"40"},"album":{"string":"War"},"duration":{"string":"02:38"},"release":{"string":"1983"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"Acrobat"},"album":{"string":"Achtung Baby"},"duration":{"string":"04:30"},"release":{"string":"1991"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"Bullet the Blue Sky"},"album":{"string":"The Joshua Tree"},"duration":{"string":"04:31"},"release":{"string":"1987"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"Drowning Man"},"album":{"string":"War"},"duration":{"string":"04:14"},"release":{"string":"1983"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"Even Better Than the Real Thing"},"album":{"string":"Achtung Baby"},"duration":{"string":"03:41"},"release":{"string":"1991"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"Exit"},"album":{"string":"The Joshua Tree"},"duration":{"string":"04:13"},"release":{"string":"1987"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +{"title":{"string":"In God's Country"},"album":{"string":"The Joshua Tree"},"duration":{"string":"02:56"},"release":{"string":"1987"},"artist":{"string":"U2"},"type":{"string":"Rock"}} +... +``` diff --git a/docs/content/en/docs/Archives/v2.10.x/Overview/Contribution guidelines/_index.md b/docs/content/en/docs/Archives/v2.10.x/Overview/Contribution guidelines/_index.md new file mode 100644 index 000000000..f9a9aca38 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Overview/Contribution guidelines/_index.md @@ -0,0 +1,18 @@ +--- +date: 2020-05-22 +title: "Contribution Guidelines" +linkTitle: "Contribution Guidelines" +weight: 10 +description: > + How to become a Connect File Pulse contributor +--- + +## Contributing + +Any feedback, bug reports and PRs are greatly appreciated! + +- Source Code: [https://github.com/streamthoughts/kafka-connect-file-pulse](https://github.com/streamthoughts/kafka-connect-file-pulse) +- Issue Tracker: [https://github.com/streamthoughts/kafka-connect-file-pulse/issues](https://github.com/streamthoughts/kafka-connect-file-pulse/issues) +- Coding Guidelines: [https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/CONTRIBUTING.md](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/CONTRIBUTING.md) + + diff --git a/docs/content/en/docs/Archives/v2.10.x/Overview/FilePulse/_index.md b/docs/content/en/docs/Archives/v2.10.x/Overview/FilePulse/_index.md new file mode 100644 index 000000000..b9b8496f3 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Overview/FilePulse/_index.md @@ -0,0 +1,50 @@ +--- +date: 2020-05-20 +title: "What is Connect FilePulse ?" +linkTitle: "Connect FilePulse" +weight: 10 +description: > + An introduction to Connect File Pulse +--- + +## What is it? + +**Connect FilePulse** is a polyvalent, scalable and reliable, Apache Kafka Connect plugin that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka™. + +{{% alert title="About Connect" color="info" %}} +**[Kafka Connect](https://kafka.apache.org/documentation/#connect)** is a tool for scalably and reliably streaming data between Apache Kafka and other systems. (source: [Apache documentation](https://kafka.apache.org/documentation/#connect)). +{{% /alert %}} + +### Key Features + +Connect FilePulse provides a set of built-in features for streaming local files into Kafka. This includes, among other things: + +* Support for recursive scanning of local directories. +* Reading and writing files into Kafka line by line. +* Support multiple input file formats (e.g: CSV, Avro, XML). +* Parsing and transforming data using built-in or custom processing filters. +* Error handler definition +* Monitoring files while they are being written into Kafka +* Support pluggable strategies to clean up completed files +* Etc. + + +## Why do I want it? + +Connect FilePulse helps you stream local files into Apache Kafka. + +* **What is it good for?**: Connect FilePulse lets you define complex pipelines to transform and structure your data before integration into Kafka. + +* **What is it not good for?**: Connect FilePulse is maybe not the best solution for collecting application log files. + +## Where should I go next? + +Give your users next steps from the Overview. For example: + +* **Getting Started**: [Get started with Connect FilePulse](/kafka-connect-file-pulse/docs/getting-started/) +* **Examples**: [Check out some example code!](/kafka-connect-file-pulse/docs/examples/) + + + + + diff --git a/docs/content/en/docs/Archives/v2.10.x/Overview/_index.md b/docs/content/en/docs/Archives/v2.10.x/Overview/_index.md new file mode 100644 index 000000000..e1d402d14 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Overview/_index.md @@ -0,0 +1,8 @@ +--- +date: 2020-05-09 +title: "Overview" +linkTitle: "Overview" +weight: 1 +description: > + Information about Connect File Pulse software, community, docs, and events. +--- \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Project Info/_index.md b/docs/content/en/docs/Archives/v2.10.x/Project Info/_index.md new file mode 100644 index 000000000..9b5a052a5 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Project Info/_index.md @@ -0,0 +1,8 @@ +--- +date: 2020-05-09 +title: "Project Info" +linkTitle: "Project Info" +weight: 110 +description: > + Various information about the project for developers +--- \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/Project Info/getting_the_code.md b/docs/content/en/docs/Archives/v2.10.x/Project Info/getting_the_code.md new file mode 100644 index 000000000..3a030fc27 --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/Project Info/getting_the_code.md @@ -0,0 +1,35 @@ +--- +date: 2020-05-09 +title: "Getting the code" +linkTitle: "Getting the code" +weight: 10 +description: > + How to get and build the code ? +--- + +## Prerequisites for building Connect File Pulse + +* Git +* Maven (we recommend version 3.5.3) +* Java 11 + +## Building Connect File Pulse + +The code of Connect File Pulse is kept in GitHub. You can check it out like this: + +```bash +$ git clone https://github.com/streamthoughts/kafka-connect-file-pulse.git +``` + + +The project uses Maven, you can build it like this: + +```bash +$ cd kafka-connect-file-pulse +$ mvn clean package -DskipTests +``` + +{{% alert title="Confluent Hub" color="info" %}} +Connect File Pulse is packaged into an archive +file compatible with [confluent-hub client](https://docs.confluent.io/current/connect/managing/confluent-hub/client.html). +{{% /alert %}} \ No newline at end of file diff --git a/docs/content/en/docs/Archives/v2.10.x/_index.md b/docs/content/en/docs/Archives/v2.10.x/_index.md new file mode 100644 index 000000000..2b924f4be --- /dev/null +++ b/docs/content/en/docs/Archives/v2.10.x/_index.md @@ -0,0 +1,6 @@ +--- +title: "Docs Release v2.10.x" +linkTitle: "v2.10.x" +url: "/v2-10/docs" +--- +This section is where the user documentation for Connect File Pulse lives - all the information that users need to understand and successfully use Connect File Pulse. diff --git a/pom.xml b/pom.xml index 61dde54ad..03795cb2c 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ io.streamthoughts kafka-connect-filepulse-reactor pom - 2.10.0-SNAPSHOT + 2.10.0 Kafka Connect Source File Pulse Reactor Connect File Pulse is a multipurpose source connector for streaming files from a local filesystem to