Skip to content

Commit

Permalink
Update usage.md
Browse files Browse the repository at this point in the history
  • Loading branch information
Javierlj committed Mar 18, 2020
1 parent c36874d commit b4b007b
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,54 @@ The received data is a DataStream of objects of the class **`NgsiEvent`**. This

- **`metadata`**: Additional metadata.


### NGSILDSource

- Import dependency.

```scala
import org.fiware.cosmos.orion.flink.connector.{NGSILDSource}
```

- Add source to Flink Environment. Indicate what port you want to listen to (e.g. 9001).

```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val eventStream = env.addSource(new NGSILDSource(9001))
```

- Parse the received data.

```scala
val processedDataStream = eventStream.
.flatMap(event => event.entities)
// ...processing
```

The received data is a DataStream of objects of the class **`NgsiEvent`**. This class has the following attributes:

- **`creationTime`**: Timestamp of arrival.

- **`service`**: FIWARE service extracted from the HTTP headers.

- **`servicePath`**: FIWARE service path extracted from the HTTP headers.

- **`entities`**: Sequence of entites included in the message. Each entity has the following attributes:

- **`id`**: Identifier of the entity.

- **`type`**: Node type.

- **`attrs`**: Map of attributes in which the key is the attribute name and the value is an object with the
following properties:

- **`type`**: Type of value (Float, Int,...).

- **`value`**: Value of the attribute.

- **`@context`**: Map of terms to URIs providing an unambiguous definition.


### OrionSink

- Import dependency.
Expand Down

0 comments on commit b4b007b

Please sign in to comment.