From 5adda024a6a8df0c3fcde65cddab0c2c0cf8ea8d Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Tue, 7 Aug 2018 20:00:17 +0300 Subject: [PATCH 1/2] Implemented worker for updating game server state --- game-servers-pool/app/__init__.py | 4 +- game-servers-pool/app/game_servers/schemas.py | 53 ++++++++++ game-servers-pool/app/workers/__init__.py | 1 + .../app/workers/update_server.py | 100 ++++++++++++++++++ 4 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 game-servers-pool/app/workers/update_server.py diff --git a/game-servers-pool/app/__init__.py b/game-servers-pool/app/__init__.py index bb192eb..6df79b3 100644 --- a/game-servers-pool/app/__init__.py +++ b/game-servers-pool/app/__init__.py @@ -3,7 +3,7 @@ from sanic_mongodb_ext import MongoDbExtension from sanic_amqp_ext import AmqpExtension -from app.workers import GetServerWorker, RegisterServerWorker +from app.workers import GetServerWorker, RegisterServerWorker, UpdateServerWorker app = Sanic('microservice-auth') @@ -17,7 +17,7 @@ # RabbitMQ workers app.amqp.register_worker(GetServerWorker(app)) app.amqp.register_worker(RegisterServerWorker(app)) - +app.amqp.register_worker(UpdateServerWorker(app)) # Public API async def health_check(request): diff --git a/game-servers-pool/app/game_servers/schemas.py b/game-servers-pool/app/game_servers/schemas.py index 06ea75a..af4db66 100644 --- a/game-servers-pool/app/game_servers/schemas.py +++ b/game-servers-pool/app/game_servers/schemas.py @@ -27,11 +27,15 @@ class RequestGetServerSchema(Schema): ] ) + class Meta: + ordered = True + class RetrieveGameServerSchema(GameServer.schema.as_marshmallow_schema()): class Meta: model = GameServer + ordered = True fields = ( 'host', 'port', @@ -87,6 +91,7 @@ def validate_id(self, value): class Meta: model = GameServer + ordered = True fields = ( 'id', 'host', @@ -95,3 +100,51 @@ class Meta: 'credentials', 'game_mode', ) + + +class UpdateGameServerSchema(Schema): + id = fields.String( + required=True + ) + freed_slots = fields.Integer( + load_from="freed-slots", + allow_none=False, + required=True, + validate=[ + validate.Range(min=1, error="The value must be positive integer.") + ] + ) + + @validates('id') + def validate_id(self, value): + if not ObjectId.is_valid(value): + raise ValidationError( + "'{}' is not a valid ObjectId, it must be a 12-byte " + "input or a 24-character hex string.".format(value) + ) + + class Meta: + model = GameServer + ordered = True + fields = ( + 'id', + 'freed_slots', + ) + + +class SimpleGameServerSchema(Schema): + id = fields.String( + dump_only=True + ) + available_slots = fields.Integer( + dump_only=True, + dump_to="available-slots", + ) + + class Meta: + model = GameServer + ordered = True + fields = ( + 'id', + 'available_slots', + ) diff --git a/game-servers-pool/app/workers/__init__.py b/game-servers-pool/app/workers/__init__.py index b88d7fb..a3f2975 100644 --- a/game-servers-pool/app/workers/__init__.py +++ b/game-servers-pool/app/workers/__init__.py @@ -1,3 +1,4 @@ from app.workers.get_server import GetServerWorker # NOQA from app.workers.microservice_register import MicroserviceRegisterWorker # NOQA from app.workers.register_server import RegisterServerWorker # NOQA +from app.workers.update_server import UpdateServerWorker # NOQA diff --git a/game-servers-pool/app/workers/update_server.py b/game-servers-pool/app/workers/update_server.py new file mode 100644 index 0000000..0c31157 --- /dev/null +++ b/game-servers-pool/app/workers/update_server.py @@ -0,0 +1,100 @@ +import json + +from aioamqp import AmqpClosedConnection +from bson import ObjectId +from marshmallow import ValidationError +from sanic_amqp_ext import AmqpWorker +from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR +from sage_utils.wrappers import Response + + +class UpdateServerWorker(AmqpWorker): + QUEUE_NAME = 'game-servers-pool.server.update' + REQUEST_EXCHANGE_NAME = 'open-matchmaking.game-server-pool.server.update.direct' + RESPONSE_EXCHANGE_NAME = 'open-matchmaking.responses.direct' + CONTENT_TYPE = 'application/json' + + def __init__(self, app, *args, **kwargs): + super(UpdateServerWorker, self).__init__(app, *args, **kwargs) + from app.game_servers.documents import GameServer + from app.game_servers.schemas import UpdateGameServerSchema, SimpleGameServerSchema + self.game_server_document = GameServer + self.schema = UpdateGameServerSchema + self.response_schema = SimpleGameServerSchema + + async def validate_data(self, raw_data): + try: + data = json.loads(raw_data.strip()) + except json.decoder.JSONDecodeError: + data = {} + deserializer = self.schema() + result = deserializer.load(data) + if result.errors: + raise ValidationError(result.errors) + + return result.data + + async def update_game_server(self, raw_data): + try: + data = await self.validate_data(raw_data) + except ValidationError as exc: + return Response.from_error(VALIDATION_ERROR, exc.normalized_messages()) + + document_id = ObjectId(data['id']) + document = await self.game_server_document.find_one({'_id': document_id}) + + if not document: + return Response.from_error( + NOT_FOUND_ERROR, + "The requested game server was not found." + ) + + document.available_slots += data['freed_slots'] + await document.commit() + + serializer = self.response_schema() + return Response.with_content(serializer.dump(document).data) + + async def process_request(self, channel, body, envelope, properties): + response = await self.update_game_server(body) + response.data[Response.EVENT_FIELD_NAME] = properties.correlation_id + + if properties.reply_to: + await channel.publish( + json.dumps(response.data), + exchange_name=self.RESPONSE_EXCHANGE_NAME, + routing_key=properties.reply_to, + properties={ + 'content_type': self.CONTENT_TYPE, + 'delivery_mode': 2, + 'correlation_id': properties.correlation_id + }, + mandatory=True + ) + + await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) + + async def consume_callback(self, channel, body, envelope, properties): + self.app.loop.create_task(self.process_request(channel, body, envelope, properties)) + + async def run(self, *args, **kwargs): + try: + _transport, protocol = await self.connect() + except AmqpClosedConnection as exc: + print(exc) + return + + channel = await protocol.channel() + await channel.queue_declare( + queue_name=self.QUEUE_NAME, + durable=True, + passive=False, + auto_delete=False + ) + await channel.queue_bind( + queue_name=self.QUEUE_NAME, + exchange_name=self.REQUEST_EXCHANGE_NAME, + routing_key=self.QUEUE_NAME + ) + await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False) + await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME) From da5408de065e097fd32a198ccb6dc4e702cceef2 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Wed, 8 Aug 2018 19:52:12 +0300 Subject: [PATCH 2/2] Implemented tests for updated game server info worker --- game-servers-pool/tests/test_update_server.py | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 game-servers-pool/tests/test_update_server.py diff --git a/game-servers-pool/tests/test_update_server.py b/game-servers-pool/tests/test_update_server.py new file mode 100644 index 0000000..54e77f5 --- /dev/null +++ b/game-servers-pool/tests/test_update_server.py @@ -0,0 +1,164 @@ +import pytest +from sage_utils.amqp.clients import RpcAmqpClient +from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR +from sage_utils.wrappers import Response + +from app.game_servers.documents import GameServer +from app.workers.update_server import UpdateServerWorker + + +REQUEST_QUEUE = UpdateServerWorker.QUEUE_NAME +REQUEST_EXCHANGE = UpdateServerWorker.REQUEST_EXCHANGE_NAME +RESPONSE_EXCHANGE = UpdateServerWorker.RESPONSE_EXCHANGE_NAME + + +@pytest.mark.asyncio +async def test_worker_returns_an_updated_information_about_slots(sanic_server): + await GameServer.collection.delete_many({}) + + game_server = GameServer(**{ + 'host': '127.0.0.1', + 'port': 9000, + 'available_slots': 100, + 'credentials': { + 'token': 'super_secret_token' + }, + 'game_mode': '1v1' + }) + await game_server.commit() + + client = RpcAmqpClient( + sanic_server.app, + routing_key=REQUEST_QUEUE, + request_exchange=REQUEST_EXCHANGE, + response_queue='', + response_exchange=RESPONSE_EXCHANGE + ) + data = { + 'id': str(game_server.id), + 'freed-slots': 10 + } + response = await client.send(payload=data) + + assert Response.EVENT_FIELD_NAME in response.keys() + assert Response.CONTENT_FIELD_NAME in response.keys() + content = response[Response.CONTENT_FIELD_NAME] + + assert set(content.keys()) == {'id', 'available-slots'} + assert content['id'] == str(game_server.id) + assert content['available-slots'] == game_server.available_slots + data['freed-slots'] + + servers_count = await GameServer.collection.count_documents({}) + assert servers_count == 1 + + await GameServer.collection.delete_many({}) + + +@pytest.mark.asyncio +async def test_worker_returns_not_found_error_for_non_existing_game_server(sanic_server): + await GameServer.collection.delete_many({}) + + client = RpcAmqpClient( + sanic_server.app, + routing_key=REQUEST_QUEUE, + request_exchange=REQUEST_EXCHANGE, + response_queue='', + response_exchange=RESPONSE_EXCHANGE + ) + data = { + 'id': '5b6a085123cf24aef53b4c78', + 'freed-slots': 10 + } + response = await client.send(payload=data) + + assert Response.ERROR_FIELD_NAME in response.keys() + error = response[Response.ERROR_FIELD_NAME] + + assert Response.ERROR_TYPE_FIELD_NAME in error.keys() + assert error[Response.ERROR_TYPE_FIELD_NAME] == NOT_FOUND_ERROR + + assert Response.ERROR_DETAILS_FIELD_NAME in error.keys() + assert error[Response.ERROR_DETAILS_FIELD_NAME] == 'The requested game server ' \ + 'was not found.' + + servers_count = await GameServer.collection.count_documents({}) + assert servers_count == 0 + + await GameServer.collection.delete_many({}) + + +@pytest.mark.asyncio +async def test_worker_returns_a_validation_error_for_missing_fields(sanic_server): + await GameServer.collection.delete_many({}) + + client = RpcAmqpClient( + sanic_server.app, + routing_key=REQUEST_QUEUE, + request_exchange=REQUEST_EXCHANGE, + response_queue='', + response_exchange=RESPONSE_EXCHANGE + ) + response = await client.send(payload={}) + + assert Response.ERROR_FIELD_NAME in response.keys() + error = response[Response.ERROR_FIELD_NAME] + + assert Response.ERROR_TYPE_FIELD_NAME in error.keys() + assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR + + assert Response.ERROR_DETAILS_FIELD_NAME in error.keys() + assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2 + + for field in ['id', 'freed-slots']: + assert field in error[Response.ERROR_DETAILS_FIELD_NAME] + assert len(error[Response.ERROR_DETAILS_FIELD_NAME][field]) == 1 + assert error[Response.ERROR_DETAILS_FIELD_NAME][field][0] == 'Missing data for ' \ + 'required field.' + + servers_count = await GameServer.collection.count_documents({}) + assert servers_count == 0 + + await GameServer.collection.delete_many({}) + + +@pytest.mark.asyncio +async def test_worker_returns_a_validation_error_for_invalid_id_and_slots_type(sanic_server): + await GameServer.collection.delete_many({}) + + client = RpcAmqpClient( + sanic_server.app, + routing_key=REQUEST_QUEUE, + request_exchange=REQUEST_EXCHANGE, + response_queue='', + response_exchange=RESPONSE_EXCHANGE + ) + data = { + 'id': 'INVALID_OBJECT_ID', + 'freed-slots': 'INVALID_VALUE' + } + response = await client.send(payload=data) + + assert Response.ERROR_FIELD_NAME in response.keys() + error = response[Response.ERROR_FIELD_NAME] + + assert Response.ERROR_TYPE_FIELD_NAME in error.keys() + assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR + + assert Response.ERROR_DETAILS_FIELD_NAME in error.keys() + assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2 + + assert 'id' in error[Response.ERROR_DETAILS_FIELD_NAME] + assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['id']) == 1 + assert error[Response.ERROR_DETAILS_FIELD_NAME]['id'][0] == "'INVALID_OBJECT_ID' is not a " \ + "valid ObjectId, it must be a " \ + "12-byte input or a " \ + "24-character hex string." + + assert 'freed-slots' in error[Response.ERROR_DETAILS_FIELD_NAME] + assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots']) == 1 + assert error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots'][0] == 'Not a valid integer.' + + servers_count = await GameServer.collection.count_documents({}) + assert servers_count == 0 + + await GameServer.collection.delete_many({})