Skip to content

Commit

Permalink
Merge pull request #4 from OpenMatchmaking/feature-update-game-server
Browse files Browse the repository at this point in the history
Updating available slots of the game server
  • Loading branch information
Relrin authored Aug 9, 2018
2 parents 0eb0519 + da5408d commit b560d40
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 2 deletions.
4 changes: 2 additions & 2 deletions game-servers-pool/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sanic_mongodb_ext import MongoDbExtension
from sanic_amqp_ext import AmqpExtension

from app.workers import GetServerWorker, RegisterServerWorker
from app.workers import GetServerWorker, RegisterServerWorker, UpdateServerWorker


app = Sanic('microservice-auth')
Expand All @@ -17,7 +17,7 @@
# RabbitMQ workers
app.amqp.register_worker(GetServerWorker(app))
app.amqp.register_worker(RegisterServerWorker(app))

app.amqp.register_worker(UpdateServerWorker(app))

# Public API
async def health_check(request):
Expand Down
53 changes: 53 additions & 0 deletions game-servers-pool/app/game_servers/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ class RequestGetServerSchema(Schema):
]
)

class Meta:
ordered = True


class RetrieveGameServerSchema(GameServer.schema.as_marshmallow_schema()):

class Meta:
model = GameServer
ordered = True
fields = (
'host',
'port',
Expand Down Expand Up @@ -87,6 +91,7 @@ def validate_id(self, value):

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'host',
Expand All @@ -95,3 +100,51 @@ class Meta:
'credentials',
'game_mode',
)


class UpdateGameServerSchema(Schema):
id = fields.String(
required=True
)
freed_slots = fields.Integer(
load_from="freed-slots",
allow_none=False,
required=True,
validate=[
validate.Range(min=1, error="The value must be positive integer.")
]
)

@validates('id')
def validate_id(self, value):
if not ObjectId.is_valid(value):
raise ValidationError(
"'{}' is not a valid ObjectId, it must be a 12-byte "
"input or a 24-character hex string.".format(value)
)

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'freed_slots',
)


class SimpleGameServerSchema(Schema):
id = fields.String(
dump_only=True
)
available_slots = fields.Integer(
dump_only=True,
dump_to="available-slots",
)

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'available_slots',
)
1 change: 1 addition & 0 deletions game-servers-pool/app/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from app.workers.get_server import GetServerWorker # NOQA
from app.workers.microservice_register import MicroserviceRegisterWorker # NOQA
from app.workers.register_server import RegisterServerWorker # NOQA
from app.workers.update_server import UpdateServerWorker # NOQA
100 changes: 100 additions & 0 deletions game-servers-pool/app/workers/update_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json

from aioamqp import AmqpClosedConnection
from bson import ObjectId
from marshmallow import ValidationError
from sanic_amqp_ext import AmqpWorker
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
from sage_utils.wrappers import Response


class UpdateServerWorker(AmqpWorker):
QUEUE_NAME = 'game-servers-pool.server.update'
REQUEST_EXCHANGE_NAME = 'open-matchmaking.game-server-pool.server.update.direct'
RESPONSE_EXCHANGE_NAME = 'open-matchmaking.responses.direct'
CONTENT_TYPE = 'application/json'

def __init__(self, app, *args, **kwargs):
super(UpdateServerWorker, self).__init__(app, *args, **kwargs)
from app.game_servers.documents import GameServer
from app.game_servers.schemas import UpdateGameServerSchema, SimpleGameServerSchema
self.game_server_document = GameServer
self.schema = UpdateGameServerSchema
self.response_schema = SimpleGameServerSchema

async def validate_data(self, raw_data):
try:
data = json.loads(raw_data.strip())
except json.decoder.JSONDecodeError:
data = {}
deserializer = self.schema()
result = deserializer.load(data)
if result.errors:
raise ValidationError(result.errors)

return result.data

async def update_game_server(self, raw_data):
try:
data = await self.validate_data(raw_data)
except ValidationError as exc:
return Response.from_error(VALIDATION_ERROR, exc.normalized_messages())

document_id = ObjectId(data['id'])
document = await self.game_server_document.find_one({'_id': document_id})

if not document:
return Response.from_error(
NOT_FOUND_ERROR,
"The requested game server was not found."
)

document.available_slots += data['freed_slots']
await document.commit()

serializer = self.response_schema()
return Response.with_content(serializer.dump(document).data)

async def process_request(self, channel, body, envelope, properties):
response = await self.update_game_server(body)
response.data[Response.EVENT_FIELD_NAME] = properties.correlation_id

if properties.reply_to:
await channel.publish(
json.dumps(response.data),
exchange_name=self.RESPONSE_EXCHANGE_NAME,
routing_key=properties.reply_to,
properties={
'content_type': self.CONTENT_TYPE,
'delivery_mode': 2,
'correlation_id': properties.correlation_id
},
mandatory=True
)

await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

async def consume_callback(self, channel, body, envelope, properties):
self.app.loop.create_task(self.process_request(channel, body, envelope, properties))

async def run(self, *args, **kwargs):
try:
_transport, protocol = await self.connect()
except AmqpClosedConnection as exc:
print(exc)
return

channel = await protocol.channel()
await channel.queue_declare(
queue_name=self.QUEUE_NAME,
durable=True,
passive=False,
auto_delete=False
)
await channel.queue_bind(
queue_name=self.QUEUE_NAME,
exchange_name=self.REQUEST_EXCHANGE_NAME,
routing_key=self.QUEUE_NAME
)
await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME)
164 changes: 164 additions & 0 deletions game-servers-pool/tests/test_update_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import pytest
from sage_utils.amqp.clients import RpcAmqpClient
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
from sage_utils.wrappers import Response

from app.game_servers.documents import GameServer
from app.workers.update_server import UpdateServerWorker


REQUEST_QUEUE = UpdateServerWorker.QUEUE_NAME
REQUEST_EXCHANGE = UpdateServerWorker.REQUEST_EXCHANGE_NAME
RESPONSE_EXCHANGE = UpdateServerWorker.RESPONSE_EXCHANGE_NAME


@pytest.mark.asyncio
async def test_worker_returns_an_updated_information_about_slots(sanic_server):
await GameServer.collection.delete_many({})

game_server = GameServer(**{
'host': '127.0.0.1',
'port': 9000,
'available_slots': 100,
'credentials': {
'token': 'super_secret_token'
},
'game_mode': '1v1'
})
await game_server.commit()

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
data = {
'id': str(game_server.id),
'freed-slots': 10
}
response = await client.send(payload=data)

assert Response.EVENT_FIELD_NAME in response.keys()
assert Response.CONTENT_FIELD_NAME in response.keys()
content = response[Response.CONTENT_FIELD_NAME]

assert set(content.keys()) == {'id', 'available-slots'}
assert content['id'] == str(game_server.id)
assert content['available-slots'] == game_server.available_slots + data['freed-slots']

servers_count = await GameServer.collection.count_documents({})
assert servers_count == 1

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_not_found_error_for_non_existing_game_server(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
data = {
'id': '5b6a085123cf24aef53b4c78',
'freed-slots': 10
}
response = await client.send(payload=data)

assert Response.ERROR_FIELD_NAME in response.keys()
error = response[Response.ERROR_FIELD_NAME]

assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
assert error[Response.ERROR_TYPE_FIELD_NAME] == NOT_FOUND_ERROR

assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
assert error[Response.ERROR_DETAILS_FIELD_NAME] == 'The requested game server ' \
'was not found.'

servers_count = await GameServer.collection.count_documents({})
assert servers_count == 0

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_a_validation_error_for_missing_fields(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={})

assert Response.ERROR_FIELD_NAME in response.keys()
error = response[Response.ERROR_FIELD_NAME]

assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR

assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2

for field in ['id', 'freed-slots']:
assert field in error[Response.ERROR_DETAILS_FIELD_NAME]
assert len(error[Response.ERROR_DETAILS_FIELD_NAME][field]) == 1
assert error[Response.ERROR_DETAILS_FIELD_NAME][field][0] == 'Missing data for ' \
'required field.'

servers_count = await GameServer.collection.count_documents({})
assert servers_count == 0

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_a_validation_error_for_invalid_id_and_slots_type(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
data = {
'id': 'INVALID_OBJECT_ID',
'freed-slots': 'INVALID_VALUE'
}
response = await client.send(payload=data)

assert Response.ERROR_FIELD_NAME in response.keys()
error = response[Response.ERROR_FIELD_NAME]

assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR

assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2

assert 'id' in error[Response.ERROR_DETAILS_FIELD_NAME]
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['id']) == 1
assert error[Response.ERROR_DETAILS_FIELD_NAME]['id'][0] == "'INVALID_OBJECT_ID' is not a " \
"valid ObjectId, it must be a " \
"12-byte input or a " \
"24-character hex string."

assert 'freed-slots' in error[Response.ERROR_DETAILS_FIELD_NAME]
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots']) == 1
assert error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots'][0] == 'Not a valid integer.'

servers_count = await GameServer.collection.count_documents({})
assert servers_count == 0

await GameServer.collection.delete_many({})

0 comments on commit b560d40

Please sign in to comment.