This project was written as a submission for a coding task.
- MQTT Messaging Integration: Implement MQTT messaging via RabbitMQ. Emit MQTT messages every second with a "status" field containing a random value between 0 and 6.
- Message Processing: Develop a server script to process incoming MQTT messages. Store the processed messages in MongoDB.
- Data Retrieval Endpoint: Create an endpoint to accept start and end times. Return the count of each status within the specified time range using MongoDB's aggregate pipeline.
The following need to be in place before starting the setup
Python : 3.10.12
RabbitMQ : 3.13.6
Erlang : 26.x
-
The python requirements can be installed using the requirements.txt file, eg:
pip install -r requirements.txt
-
.env
file needs to be created for the project to load variables, example schema can be found in .env.example
The Project document called for 3 processes to be made Here are the respective files
- Message Integration -> status_broadcaster.py
- Command:
python status_boadcaster.py -d
- Message Processing -> message_processor.py
- Command:
python message_processor.py -d
- Data Retrieval Endpoint -> data_endpoint.py
- Command:
uvicorn data_endpoint:app --host 0.0.0.0 --port 8000
-
The status broadcaster script when run will broadcast/publish one message with a random status value (0 to 6) per second to the Rabbit MQ Queue. eg.
{"status": 3}
-
The message_processor script when executed, will start fetching and commiting messages from the queue to the MongoDB collection with the timestamp of the message and waits 10 seconds if the queue is exhausted.
-
The data endpoint is a FastAPI app that exposes a endpoint
- This endpoint accepts a post request with
start_time
andend_time
- Returned data looks as follows:
{ "0": 1, "1": 4, "2": 3, "3": 1, "4": 3, "5": 3, "6": 2 }
- This endpoint accepts a post request with