Skip to content

Commit

Permalink
Implemented worker for updating game server state
Browse files Browse the repository at this point in the history
  • Loading branch information
Relrin committed Aug 8, 2018
1 parent 0eb0519 commit 5adda02
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 2 deletions.
4 changes: 2 additions & 2 deletions game-servers-pool/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sanic_mongodb_ext import MongoDbExtension
from sanic_amqp_ext import AmqpExtension

from app.workers import GetServerWorker, RegisterServerWorker
from app.workers import GetServerWorker, RegisterServerWorker, UpdateServerWorker


app = Sanic('microservice-auth')
Expand All @@ -17,7 +17,7 @@
# RabbitMQ workers
app.amqp.register_worker(GetServerWorker(app))
app.amqp.register_worker(RegisterServerWorker(app))

app.amqp.register_worker(UpdateServerWorker(app))

# Public API
async def health_check(request):
Expand Down
53 changes: 53 additions & 0 deletions game-servers-pool/app/game_servers/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ class RequestGetServerSchema(Schema):
]
)

class Meta:
ordered = True


class RetrieveGameServerSchema(GameServer.schema.as_marshmallow_schema()):

class Meta:
model = GameServer
ordered = True
fields = (
'host',
'port',
Expand Down Expand Up @@ -87,6 +91,7 @@ def validate_id(self, value):

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'host',
Expand All @@ -95,3 +100,51 @@ class Meta:
'credentials',
'game_mode',
)


class UpdateGameServerSchema(Schema):
id = fields.String(
required=True
)
freed_slots = fields.Integer(
load_from="freed-slots",
allow_none=False,
required=True,
validate=[
validate.Range(min=1, error="The value must be positive integer.")
]
)

@validates('id')
def validate_id(self, value):
if not ObjectId.is_valid(value):
raise ValidationError(
"'{}' is not a valid ObjectId, it must be a 12-byte "
"input or a 24-character hex string.".format(value)
)

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'freed_slots',
)


class SimpleGameServerSchema(Schema):
id = fields.String(
dump_only=True
)
available_slots = fields.Integer(
dump_only=True,
dump_to="available-slots",
)

class Meta:
model = GameServer
ordered = True
fields = (
'id',
'available_slots',
)
1 change: 1 addition & 0 deletions game-servers-pool/app/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from app.workers.get_server import GetServerWorker # NOQA
from app.workers.microservice_register import MicroserviceRegisterWorker # NOQA
from app.workers.register_server import RegisterServerWorker # NOQA
from app.workers.update_server import UpdateServerWorker # NOQA
100 changes: 100 additions & 0 deletions game-servers-pool/app/workers/update_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json

from aioamqp import AmqpClosedConnection
from bson import ObjectId
from marshmallow import ValidationError
from sanic_amqp_ext import AmqpWorker
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
from sage_utils.wrappers import Response


class UpdateServerWorker(AmqpWorker):
QUEUE_NAME = 'game-servers-pool.server.update'
REQUEST_EXCHANGE_NAME = 'open-matchmaking.game-server-pool.server.update.direct'
RESPONSE_EXCHANGE_NAME = 'open-matchmaking.responses.direct'
CONTENT_TYPE = 'application/json'

def __init__(self, app, *args, **kwargs):
super(UpdateServerWorker, self).__init__(app, *args, **kwargs)
from app.game_servers.documents import GameServer
from app.game_servers.schemas import UpdateGameServerSchema, SimpleGameServerSchema
self.game_server_document = GameServer
self.schema = UpdateGameServerSchema
self.response_schema = SimpleGameServerSchema

async def validate_data(self, raw_data):
try:
data = json.loads(raw_data.strip())
except json.decoder.JSONDecodeError:
data = {}
deserializer = self.schema()
result = deserializer.load(data)
if result.errors:
raise ValidationError(result.errors)

return result.data

async def update_game_server(self, raw_data):
try:
data = await self.validate_data(raw_data)
except ValidationError as exc:
return Response.from_error(VALIDATION_ERROR, exc.normalized_messages())

document_id = ObjectId(data['id'])
document = await self.game_server_document.find_one({'_id': document_id})

if not document:
return Response.from_error(
NOT_FOUND_ERROR,
"The requested game server was not found."
)

document.available_slots += data['freed_slots']
await document.commit()

serializer = self.response_schema()
return Response.with_content(serializer.dump(document).data)

async def process_request(self, channel, body, envelope, properties):
response = await self.update_game_server(body)
response.data[Response.EVENT_FIELD_NAME] = properties.correlation_id

if properties.reply_to:
await channel.publish(
json.dumps(response.data),
exchange_name=self.RESPONSE_EXCHANGE_NAME,
routing_key=properties.reply_to,
properties={
'content_type': self.CONTENT_TYPE,
'delivery_mode': 2,
'correlation_id': properties.correlation_id
},
mandatory=True
)

await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

async def consume_callback(self, channel, body, envelope, properties):
self.app.loop.create_task(self.process_request(channel, body, envelope, properties))

async def run(self, *args, **kwargs):
try:
_transport, protocol = await self.connect()
except AmqpClosedConnection as exc:
print(exc)
return

channel = await protocol.channel()
await channel.queue_declare(
queue_name=self.QUEUE_NAME,
durable=True,
passive=False,
auto_delete=False
)
await channel.queue_bind(
queue_name=self.QUEUE_NAME,
exchange_name=self.REQUEST_EXCHANGE_NAME,
routing_key=self.QUEUE_NAME
)
await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME)

0 comments on commit 5adda02

Please sign in to comment.