Skip to content

Commit

Permalink
Ngsi LD
Browse files Browse the repository at this point in the history
  • Loading branch information
Javierlj committed Dec 3, 2019
1 parent c9570d4 commit 74ce143
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class Entity(id: String, `type`: String, attrs: Map[String, Attribute]) ex
* @param context Entity type
* @param attrs List of attributes
*/
case class EntityLD(id: String, `type`: String,context: Any, attrs: Map[String, AttributeLD]) extends scala.Serializable
case class EntityLD(id: String, `type`: String, attrs: Map[String, Map[String,Any]],context: Any) extends scala.Serializable

/**
* HttpBody
Expand All @@ -53,13 +53,6 @@ case class HttpBody( data: Seq[Map[String,Any]], subscriptionId: String ) extend
*/
case class Attribute(`type`: Any, value: Any, metadata:Any ) extends scala.Serializable

/**
* Attribute of an entity
* @param `type` Type of attribute
* @param value Value of the attribute
* @param unitCode Metadata map
*/
case class AttributeLD(`type`: Any, value: Any, unitCode: Any ) extends scala.Serializable


/**
Expand All @@ -79,14 +72,5 @@ object MapToAttributeConverter{
values.get("metadata").orNull)
}

def unapplyLD(values: Map[String,Any]) : AttributeLD = {

AttributeLD(
values.get("type").orNull,
values.get("value").orNull,
values.get("unitCode").orNull
)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
* @author @sonsoleslp
* @param tryPort try to use this point, if this point is used then try a new port
*/
final class OrionSourceLD(
final class NGSILDSource(
tryPort: Int/*,
callbackUrl: Option[String] = None*/
) extends RichParallelSourceFunction[NgsiEventLD] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ abstract class OrionHttpHandlerInterface(
val service = headerEntries.get(SERVICE_HEADER).getValue()
val servicePath = headerEntries.get(SERVICE_PATH_HEADER).getValue()


// Retrieve body content and convert from Byte array to String
val content = req.content()
val byteBufUtil = ByteBufUtil.readBytes(content.alloc, content, content.readableBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD])
private lazy val logger = LoggerFactory.getLogger(getClass)

override def parseMessage(req : FullHttpRequest) : NgsiEventLD = {

try {
// Retrieve headers
val headerEntries = req.headers().entries()
Expand All @@ -30,25 +31,22 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD])
// Parse Body from JSON string to object and retrieve entities
val dataObj = parse(jsonBodyString).extract[HttpBody]
val parsedEntities = dataObj.data

val subscriptionId = dataObj.subscriptionId
val entities = parsedEntities.map(entity => {
// Retrieve entity id
val entityId = entity("id").toString


// Retrieve entity type
val entityType = entity("type").toString


//Retrieve entity context
val entityContext = entity("@context")
println(entityType)
val entityContext = entity.getOrElse("@context",Array("https://schema.lab.fiware.org/ld/context"))
// Retrieve attributes
val attrs = entity.filterKeys(x => x != "id" & x!= "type" & x!= "@context" )
//Convert attributes to Attribute objects
.transform((k,v) => MapToAttributeConverter
.unapplyLD(v.asInstanceOf[Map[String,Any]]))
EntityLD(entityId, entityType,entityContext, attrs)
.transform((k,v) => (v.asInstanceOf[Map[String,Any]]))
EntityLD(entityId, entityType, attrs,entityContext)
})
// Generate timestamp
val creationTime = System.currentTimeMillis
Expand All @@ -66,3 +64,5 @@ class OrionHttpHandlerLD(sc: SourceContext[NgsiEventLD])
sc.collect(ngsiEvent)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ object FlinkJobTestLD {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSourceLD(ConstantsTestLD.Port))
val eventStream = env.addSource(new NGSILDSource(ConstantsTestLD.Port))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
val pres = entity.attrs("pressure").value.asInstanceOf[Number].floatValue()
val temp = entity.attrs("temperature")("value").asInstanceOf[BigInt].floatValue()
val pres = entity.attrs("pressure")("value").asInstanceOf[BigInt].floatValue()
EntityNode(entity.id, temp, pres)
})
.keyBy("id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ package object simulatedNotification {
var maxTempVal = 0.0
var maxPresVal = 0.0
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ package object simulatedNotificationLD {
var maxTempVal = 0.0
var maxPresVal = 0.0
}

0 comments on commit 74ce143

Please sign in to comment.