Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions MessageProcessorModule/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lotus.iot.mpm</groupId>
<artifactId>message-processor-module</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>

<properties>
<java.version>1.8</java.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>

</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
</pluginRepositories>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.lotus.iot.mpm;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.lotus.iot.mpm.api.objectmodel.request.SubscribeRequest;
import com.lotus.iot.mpm.api.objectmodel.response.AllAssetsInStateResponse;
import com.lotus.iot.mpm.api.objectmodel.response.AssetCurrentStatusResponse;
import com.lotus.iot.mpm.api.objectmodel.response.GeneralResponse;
import com.lotus.iot.mpm.eventhandler.AssetStatusChangeQueue;
import com.lotus.iot.mpm.eventhandler.EventHandlerFacade;
import com.lotus.iot.mpm.eventhandler.objectmodel.AssetStatusEvent;
import com.lotus.iot.mpm.eventhandler.objectmodel.SampleListener;
import com.lotus.iot.mpm.handler.MessageHandler;
import com.lotus.iot.mpm.objectmodel.AssetStatusMessage;
import com.lotus.iot.mpm.objectmodel.type.StatusType;
import com.lotus.iot.mpm.report.ReportHandler;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* @author Niloufar Mazloumpour
* @mail niloufar@mazloumpour.net
* @since 2018-05-22
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class);

subscribeSomeListeners(applicationContext);

if (args.length > 0) {
File file = new File(args[0]);
sendMessages(file, applicationContext);
}

waitForProcessToFinish(applicationContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary to wait here for 10ms? Doesn't look like sendMessage() called asynchronously so the execution would not proceed untill the for loop finishes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this waiting time so the application will finish all its process before I start to get reports. It is not necessary for the outcome but I wanted the console output to be clear and readable. First program will do all the process and print notifications in the console. Then it will print the reports.
I could have also divide output to separate files for this matter.


System.out.println();
System.out.println("-------------------");
System.out.println("Do some Reporting..");
System.out.println("-------------------");
System.out.println();

currentStatusReport(applicationContext);

assetsInStateReport(applicationContext);
}

private static void subscribeSomeListeners(ConfigurableApplicationContext applicationContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for the whole ConfigurableApplicationContext here? Looks like you're just using an instance of an EventHandlerFacade

EventHandlerFacade eventHandlerFacade = applicationContext.getBean(EventHandlerFacade.class);
List<String> rows = loadFromFile("subscribe.txt");
for (String row : rows) {
String[] subscribeInfo = row.split(",");
try {
AssetStatusEvent event = AssetStatusEvent.valueOf(subscribeInfo[1]);
String assetId = subscribeInfo[0];
SubscribeRequest subscribeRequest = new SubscribeRequest(assetId, event, new SampleListener());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would subscribe to events for a particular asset only. We need to be able to subscribe to status changes of any asset since we don't know about possible assets in advance. Also, given that there could be many assets, we would require a lot of subscriptions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have ask you to clarify this for me. Actually, in a real situation I would ask what does "subscribe to certain asset status events" mean exactly. But here I made an assumption that it means "subscribe to a specific asset rather than all assets". My mistake.

eventHandlerFacade.subscribe(subscribeRequest);
} catch (IllegalArgumentException e) {
System.out.println(String.format("AssetStatusEvent %s does not exist", subscribeInfo[1]));
}
}
}

private static void assetsInStateReport(ConfigurableApplicationContext applicationContext) {
ReportHandler reportHandler = applicationContext.getBean(ReportHandler.class);
List<String> statusTypeList = loadFromFile("all_assets_in_state_report.txt");
for (String status : statusTypeList) {
GeneralResponse response = reportHandler.assetsInState(status);
try {
AllAssetsInStateResponse allAssetsInStateResponse = (AllAssetsInStateResponse) response;
List<String> assetIdList = allAssetsInStateResponse.getAssetIdList();
System.out.println(String.format("\nThere are %s assets which currently have an %s status:", assetIdList.size(), status));
assetIdList.forEach(id -> System.out.println("'" + id + "'"));
} catch (ClassCastException e) {
System.out.println(String.format("\nstatus: %s, %s", status, response.getMessage()));
}
}
}

private static void currentStatusReport(ConfigurableApplicationContext applicationContext) {
ReportHandler reportHandler = applicationContext.getBean(ReportHandler.class);
List<String> assetIdList = loadFromFile("asset_status_report.txt");
for (String assetId : assetIdList) {
GeneralResponse response = reportHandler.assetCurrentStatus(assetId);
try {
AssetCurrentStatusResponse assetCurrentStatusResponse = (AssetCurrentStatusResponse) response;
StatusType currentStatus = assetCurrentStatusResponse.getStatusType();
System.out.println(String.format("Current status for asset '%s' is '%s'", assetId, currentStatus));
} catch (ClassCastException e) {
System.out.println(String.format("assetId: %s, %s", assetId, response.getMessage()));

}
}
}

private static List<String> loadFromFile(String fileName) {
try {
Resource resource = new ClassPathResource("input/" + fileName);
InputStream inputStream = resource.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
Stream<String> lines = bufferedReader.lines();
return lines.collect(Collectors.toList());
} catch (IOException e) {
return new ArrayList<>();
}
}

private static void waitForProcessToFinish(ConfigurableApplicationContext applicationContext) {
AssetStatusChangeQueue assetStatusChangeQueue = applicationContext.getBean(AssetStatusChangeQueue.class);

while (assetStatusChangeQueue.size() > 0) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static void sendMessages(File file, ConfigurableApplicationContext applicationContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for the whole ConfigurableApplicationContext here? Looks like you're just using an instance of a MessageHandler

try {
Gson gson = new Gson();
FileReader fileReader = new FileReader(file);
JsonParser parser = new JsonParser();
Object obj = parser.parse(fileReader);
JsonArray jsonArray = (JsonArray) obj;
for (JsonElement element : jsonArray) {
AssetStatusMessage message = gson.fromJson(element, AssetStatusMessage.class);
MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This messageHandler is going to be redefined on every JSON message since it's in the loop. Why not pull it outside of the for loop?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed it.

messageHandler.assetStatus(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a message handler I'd call this method:

messageHandler.handle(message);

usually we use nouns for constructors but methods are named after their actions using verbs

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your advice, I will keep that in mind.

}
} catch (FileNotFoundException e) {
System.out.println(String.format("file not found, %s", file));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.lotus.iot.mpm.api;

import com.lotus.iot.mpm.api.objectmodel.response.GeneralResponse;
import com.lotus.iot.mpm.api.objectmodel.response.GeneralResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Niloufar Mazloumpour
* @mail niloufar@mazloumpour.net
* @since 2018-05-22
*/
@RestController
public abstract class AbstractController {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractController.class);

ResponseEntity generateResponse(GeneralResponse response) {
return new ResponseEntity<>(response, HttpStatus.valueOf(response.getStatus()));
}

@ExceptionHandler(value = Exception.class)
@ResponseBody
@ResponseStatus(value = HttpStatus.BAD_REQUEST)
public ResponseEntity handleException(Exception e) {
LOGGER.error(e.getMessage());
return generateResponse(GeneralResponseCode.INVALID_INPUT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.lotus.iot.mpm.api;

import com.lotus.iot.mpm.api.objectmodel.response.GeneralResponse;
import com.lotus.iot.mpm.report.ReportHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Niloufar Mazloumpour
* @mail niloufar@mazloumpour.net
* @since 2018-05-22
*/
@RestController
@RequestMapping("report")
public class ReportController extends AbstractController {
private final ReportHandler reportHandler;

@Autowired
public ReportController(ReportHandler reportHandler) {
this.reportHandler = reportHandler;
}

@RequestMapping(value = "/asset/{asset_id}/status", method = RequestMethod.GET)
public ResponseEntity assetCurrentStatus(@PathVariable("asset_id") String assetId) {
GeneralResponse response = reportHandler.assetCurrentStatus(assetId);
return generateResponse(response);
}

@RequestMapping(value = "/asset/status/{status_type}/all", method = RequestMethod.GET)
public ResponseEntity assetsInStatus(@PathVariable("status_type") String statusType) {
GeneralResponse response = reportHandler.assetsInState(statusType);
return generateResponse(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.lotus.iot.mpm.api;

import com.lotus.iot.mpm.api.objectmodel.request.SubscribeRequest;
import com.lotus.iot.mpm.api.objectmodel.response.GeneralResponseCode;
import com.lotus.iot.mpm.eventhandler.EventHandlerFacade;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Niloufar Mazloumpour
* @mail niloufar@mazloumpour.net
* @since 2018-05-22
*/
@RestController
@RequestMapping("subscription")
public class SubscriptionController extends AbstractController {
private EventHandlerFacade eventHandlerFacade;

@Autowired
public SubscriptionController(EventHandlerFacade eventHandlerFacade) {
this.eventHandlerFacade = eventHandlerFacade;
}

@RequestMapping(value = "asset/event/subscribe", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity subscribeToAssetStatusEvent(@RequestBody SubscribeRequest subscribeRequest) {
eventHandlerFacade.subscribe(subscribeRequest);
return generateResponse(GeneralResponseCode.SUCCESS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.lotus.iot.mpm.api.objectmodel.request;

import com.lotus.iot.mpm.eventhandler.objectmodel.AssetStatusEvent;
import com.lotus.iot.mpm.eventhandler.objectmodel.SampleListener;
import com.lotus.iot.mpm.objectmodel.Observer;


/**
* @author Niloufar Mazloumpour
* @mail niloufar@mazloumpour.net
* @since 2018-05-22
*/
public class SubscribeRequest {
private String assetId;
private AssetStatusEvent statusEvent;
private SampleListener observer;

public SubscribeRequest() {
}

public SubscribeRequest(String assetId, AssetStatusEvent statusEvent, SampleListener observer) {
this.assetId = assetId;
this.statusEvent = statusEvent;
this.observer = observer;
}

public String getAssetId() {
return assetId;
}

public AssetStatusEvent getStatusEvent() {
return statusEvent;
}

public Observer getObserver() {
return observer;
}
}
Loading