Skip to content

Commit

Permalink
Release v0.15.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Jul 30, 2020
2 parents 49dd380 + d692347 commit c732833
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
7 changes: 7 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
This changelog is used to track all major changes to WTFIX.


## v0.15.1 (2020-07-28)

**Fixes**

- Fix cancellation of various `asyncio` tasks causing the pipeline to hang during shutdown.


## v0.15.0 (2020-07-28)

**Enhancements**
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="wtfix",
version="0.15.0",
version="0.15.1",
author="John Cass",
author_email="john.cass77@gmail.com",
description="The Pythonic Financial Information eXchange (FIX) client for humans.",
Expand Down
12 changes: 6 additions & 6 deletions wtfix/apps/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ async def _send_channel_reader(self):
# Cancellation request received - close connections....
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

with await self.redis_pool as conn:
await conn.unsubscribe(self.SEND_CHANNEL)

self.redis_pool.close()
await self.redis_pool.wait_closed() # Closing all open connections

except aioredis.ChannelClosedError:
# Shutting down...
logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.")
Expand All @@ -86,3 +80,9 @@ async def stop(self, *args, **kwargs):
if self._channel_reader_task is not None:
self._channel_reader_task.cancel()
await self._channel_reader_task

with await self.redis_pool as conn:
await conn.unsubscribe(self.SEND_CHANNEL)

self.redis_pool.close()
await self.redis_pool.wait_closed() # Closing all open connections
20 changes: 10 additions & 10 deletions wtfix/apps/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ async def stop(self, *args, **kwargs):
self._listener_task.cancel()
await self._listener_task

if self.writer is not None:
logger.info(
f"{self.name}: Initiating disconnect from "
f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..."
)

self.writer.close()
await self.writer.wait_closed()
logger.info(f"{self.name}: Session closed!")

async def listen(self):
"""
Listen for new messages that are sent by the server.
Expand Down Expand Up @@ -238,16 +248,6 @@ async def listen(self):
# Cancellation request received - close writer
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

if self.writer is not None:
logger.info(
f"{self.name}: Initiating disconnect from "
f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..."
)

self.writer.close()
await self.writer.wait_closed()
logger.info(f"{self.name}: Session closed!")

except Exception as e:
logger.error(f"{self.name}: Unexpected error {e}. Initiating shutdown...")
asyncio.create_task(self.pipeline.stop())
Expand Down

0 comments on commit c732833

Please sign in to comment.