diff --git a/src/main/scala/org/fiware/cosmos/orion/flink/connector/ClassTypes.scala b/src/main/scala/org/fiware/cosmos/orion/flink/connector/ClassTypes.scala index a66e185..c792bf9 100644 --- a/src/main/scala/org/fiware/cosmos/orion/flink/connector/ClassTypes.scala +++ b/src/main/scala/org/fiware/cosmos/orion/flink/connector/ClassTypes.scala @@ -36,7 +36,7 @@ case class Entity(id: String, `type`: String, attrs: Map[String, Attribute]) ex * @param context Entity type * @param attrs List of attributes */ -case class EntityLD(id: String, `type`: String,context: Any, attrs: Map[String, AttributeLD]) extends scala.Serializable +case class EntityLD(id: String, `type`: String, attrs: Map[String, Map[String,Any]],context: Any) extends scala.Serializable /** * HttpBody @@ -53,13 +53,6 @@ case class HttpBody( data: Seq[Map[String,Any]], subscriptionId: String ) extend */ case class Attribute(`type`: Any, value: Any, metadata:Any ) extends scala.Serializable -/** - * Attribute of an entity - * @param `type` Type of attribute - * @param value Value of the attribute - * @param unitCode Metadata map - */ -case class AttributeLD(`type`: Any, value: Any, unitCode: Any ) extends scala.Serializable /** @@ -79,14 +72,5 @@ object MapToAttributeConverter{ values.get("metadata").orNull) } - def unapplyLD(values: Map[String,Any]) : AttributeLD = { - - AttributeLD( - values.get("type").orNull, - values.get("value").orNull, - values.get("unitCode").orNull - ) - } - } diff --git a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionSourceLD.scala b/src/main/scala/org/fiware/cosmos/orion/flink/connector/NGSILDSource.scala similarity index 98% rename from src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionSourceLD.scala rename to src/main/scala/org/fiware/cosmos/orion/flink/connector/NGSILDSource.scala index 6b4146c..cae938e 100644 --- a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionSourceLD.scala +++ b/src/main/scala/org/fiware/cosmos/orion/flink/connector/NGSILDSource.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont * @author @sonsoleslp * @param tryPort try to use this point, if this point is used then try a new port */ -final class OrionSourceLD( +final class NGSILDSource( tryPort: Int/*, callbackUrl: Option[String] = None*/ ) extends RichParallelSourceFunction[NgsiEventLD] { diff --git a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerInterface.scala b/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerInterface.scala index 1ef01c0..e8af4bc 100644 --- a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerInterface.scala +++ b/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerInterface.scala @@ -98,6 +98,7 @@ abstract class OrionHttpHandlerInterface( val service = headerEntries.get(SERVICE_HEADER).getValue() val servicePath = headerEntries.get(SERVICE_PATH_HEADER).getValue() + // Retrieve body content and convert from Byte array to String val content = req.content() val byteBufUtil = ByteBufUtil.readBytes(content.alloc, content, content.readableBytes) diff --git a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerLD.scala b/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerLD.scala index 908aa76..eebe7d8 100644 --- a/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerLD.scala +++ b/src/main/scala/org/fiware/cosmos/orion/flink/connector/OrionHttpHandlerLD.scala @@ -13,6 +13,7 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD]) private lazy val logger = LoggerFactory.getLogger(getClass) override def parseMessage(req : FullHttpRequest) : NgsiEventLD = { + try { // Retrieve headers val headerEntries = req.headers().entries() @@ -30,25 +31,22 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD]) // Parse Body from JSON string to object and retrieve entities val dataObj = parse(jsonBodyString).extract[HttpBody] val parsedEntities = dataObj.data + val subscriptionId = dataObj.subscriptionId val entities = parsedEntities.map(entity => { // Retrieve entity id val entityId = entity("id").toString - // Retrieve entity type val entityType = entity("type").toString - //Retrieve entity context - val entityContext = entity("@context") - println(entityType) + val entityContext = entity.getOrElse("@context",Array("https://schema.lab.fiware.org/ld/context")) // Retrieve attributes val attrs = entity.filterKeys(x => x != "id" & x!= "type" & x!= "@context" ) //Convert attributes to Attribute objects - .transform((k,v) => MapToAttributeConverter - .unapplyLD(v.asInstanceOf[Map[String,Any]])) - EntityLD(entityId, entityType,entityContext, attrs) + .transform((k,v) => (v.asInstanceOf[Map[String,Any]])) + EntityLD(entityId, entityType, attrs,entityContext) }) // Generate timestamp val creationTime = System.currentTimeMillis @@ -66,3 +64,5 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD]) sc.collect(ngsiEvent) } } + + diff --git a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/FlinkJobTestLD.scala b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/FlinkJobTestLD.scala index 4706d97..31343a9 100644 --- a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/FlinkJobTestLD.scala +++ b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/FlinkJobTestLD.scala @@ -24,13 +24,13 @@ object FlinkJobTestLD { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Create Orion Source. Receive notifications on port 9001 - val eventStream = env.addSource(new OrionSourceLD(ConstantsTestLD.Port)) + val eventStream = env.addSource(new NGSILDSource(ConstantsTestLD.Port)) // Process event stream val processedDataStream = eventStream .flatMap(event => event.entities) .map(entity => { - val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue() - val pres = entity.attrs("pressure").value.asInstanceOf[Number].floatValue() + val temp = entity.attrs("temperature")("value").asInstanceOf[BigInt].floatValue() + val pres = entity.attrs("pressure")("value").asInstanceOf[BigInt].floatValue() EntityNode(entity.id, temp, pres) }) .keyBy("id") diff --git a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotification.scala b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotification.scala index 1921e88..6b2bfaa 100644 --- a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotification.scala +++ b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotification.scala @@ -39,3 +39,4 @@ package object simulatedNotification { var maxTempVal = 0.0 var maxPresVal = 0.0 } + diff --git a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotificationLD.scala b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotificationLD.scala index 6abfceb..148b1e0 100644 --- a/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotificationLD.scala +++ b/src/test/scala/org.fiware.cosmos.orion.flink.connector.tests/SimulatedNotificationLD.scala @@ -49,3 +49,4 @@ package object simulatedNotificationLD { var maxTempVal = 0.0 var maxPresVal = 0.0 } +