Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-2676 Add load balancer tests in EVG #625

Merged
merged 14 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ functions:
if [ -n "${SETDEFAULTENCODING}" ]; then
export SETDEFAULTENCODING="${SETDEFAULTENCODING}"
fi
if [ -n "${test_loadbalancer}" ]; then
export TEST_LOADBALANCER=1
export SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}"
export MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}"
fi

PYTHON_BINARY=${PYTHON_BINARY} \
GREEN_FRAMEWORK=${GREEN_FRAMEWORK} \
Expand Down Expand Up @@ -788,6 +793,22 @@ functions:
-v \
--fault revoked

"run load-balancer":
- command: shell.exec
params:
script: |
DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start
- command: expansions.update
params:
file: lb-expansion.yml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How/where is this being used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


"stop load-balancer":
- command: shell.exec
params:
script: |
cd ${DRIVERS_TOOLS}/.evergreen
DRIVERS_TOOLS=${DRIVERS_TOOLS} bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop

"teardown_docker":
- command: shell.exec
params:
Expand Down Expand Up @@ -1537,6 +1558,13 @@ tasks:
- func: "run aws auth test with aws EC2 credentials"
- func: "run aws ECS auth test"

- name: load-balancer-test
commands:
- func: "bootstrap mongo-orchestration"
vars:
TOPOLOGY: "sharded_cluster"
- func: "run load-balancer"
- func: "run tests"
# }}}
- name: "coverage-report"
tags: ["coverage"]
Expand Down Expand Up @@ -1941,6 +1969,16 @@ axes:
variables:
ORCHESTRATION_FILE: "versioned-api-testing.json"

# Run load balancer tests?
- id: loadbalancer
display_name: "Load Balancer"
values:
- id: "enabled"
display_name: "Load Balancer"
variables:
test_loadbalancer: true
batchtime: 10080 # 7 days

buildvariants:
- matrix_name: "tests-all"
matrix_spec:
Expand Down Expand Up @@ -2463,6 +2501,17 @@ buildvariants:
- name: "aws-auth-test-4.4"
- name: "aws-auth-test-latest"

- matrix_name: "load-balancer"
matrix_spec:
platform: ubuntu-18.04
mongodb-version: ["latest"]
auth-ssl: "*"
python-version: ["3.6", "3.9"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the reason for only testing these Pythons? What about pypy? We needn't change it, but a comment with the rationale will be useful and maybe an accompanying ticket if the testing needs to be expanded at a later point.

Copy link
Member Author

@ShaneHarvey ShaneHarvey May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it's just for simplicity. I opened: https://jira.mongodb.org/browse/PYTHON-2731

I might just add all python versions while doing that ticket.

loadbalancer: "*"
display_name: "Load Balancer ${platform} ${python-version} ${mongodb-version} ${auth-ssl}"
tasks:
- name: "load-balancer-test"

- matrix_name: "Release"
matrix_spec:
platform: [ubuntu-20.04, windows-64-vsMulti-small, macos-1014]
Expand Down
12 changes: 11 additions & 1 deletion .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ fi
if [ "$SSL" != "nossl" ]; then
export CLIENT_PEM="$DRIVERS_TOOLS/.evergreen/x509gen/client.pem"
export CA_PEM="$DRIVERS_TOOLS/.evergreen/x509gen/ca.pem"

if [ -n "$TEST_LOADBALANCER" ]; then
export SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}&tls=true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the tls=true be part of the evergreen variable containing the URI instead of having to patch it up here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cribbed this from the Java driver. I think the underlying issue is that MO does not yet add SSL/TLS parameters to the uri it reports: 10gen/mongo-orchestration#287

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

export MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}&tls=true"
fi
fi

# For createvirtualenv.
Expand Down Expand Up @@ -191,7 +196,12 @@ if [ -z "$GREEN_FRAMEWORK" ]; then
# causing this script to exit.
$PYTHON -c "from bson import _cbson; from pymongo import _cmessage"
fi
$PYTHON $COVERAGE_ARGS setup.py $C_EXTENSIONS test $TEST_ARGS $OUTPUT

if [ -n "$TEST_LOADBALANCER" ]; then
$PYTHON -m xmlrunner discover -s test/load_balancer -v --locals -o $XUNIT_DIR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does --locals do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It prints each function's local variables in stack traces: https://docs.python.org/3/library/unittest.html#cmdoption-unittest-locals

So this opaque trace:

FAIL: test_a_connection_can_be_shared_by_a_transaction_and_a_cursor (test_load_balancer.TestUnifiedTransactions)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1087, in test_case
    self.run_scenario(spec)
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1074, in run_scenario
    self.check_events(spec.get('expectEvents', []))
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1026, in check_events
    self.match_evaluator.match_event(
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 581, in match_event
    self.test.assertIsInstance(actual, ConnectionCheckedInEvent)
AssertionError: ConnectionReadyEvent(('127.0.0.1', 8001), 2) is not an instance of <class 'pymongo.monitoring.ConnectionCheckedInEvent'>

Becomes much more diagnosable:

FAIL: test_a_connection_can_be_shared_by_a_transaction_and_a_cursor (test_load_balancer.TestUnifiedTransactions)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1087, in test_case
    self.run_scenario(spec)
    self = <test_load_balancer.TestUnifiedTransactions testMethod=test_a_connection_can_be_shared_by_a_transaction_and_a_cursor>
    spec = {'description': 'a connection can be shared by a transaction and a cursor', 'operations': [{'name': 'startTransaction', 'object': 'session0'}, {'name': 'insertOne', 'object': 'collection0', 'arguments': {'document': {'x': 1}, 'session': 'session0'}}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'createFindCursor', 'object': 'collection0', 'arguments': {'filter': {}, 'batchSize': 2, 'session': 'session0'}, 'saveResultAsEntity': 'cursor0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'close', 'object': 'cursor0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'abortTransaction', 'object': 'session0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 0}}], 'expectEvents': [{'client': 'client0', 'events': [{'commandStartedEvent': {'commandName': 'insert'}}, {'commandStartedEvent': {'commandName': 'find'}}, {'commandStartedEvent': {'commandName': 'killCursors'}}, {'commandStartedEvent': {'commandName': 'abortTransaction'}}]}, {'client': 'client0', 'eventType': 'cmap', 'events': [{'connectionReadyEvent': {}}, {'connectionCheckedOutEvent': {}}, {'connectionCheckedInEvent': {}}]}]}
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1074, in run_scenario
    self.check_events(spec.get('expectEvents', []))
    run_on_spec = []
    self = <test_load_balancer.TestUnifiedTransactions testMethod=test_a_connection_can_be_shared_by_a_transaction_and_a_cursor>
    skip_reason = None
    spec = {'description': 'a connection can be shared by a transaction and a cursor', 'operations': [{'name': 'startTransaction', 'object': 'session0'}, {'name': 'insertOne', 'object': 'collection0', 'arguments': {'document': {'x': 1}, 'session': 'session0'}}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'createFindCursor', 'object': 'collection0', 'arguments': {'filter': {}, 'batchSize': 2, 'session': 'session0'}, 'saveResultAsEntity': 'cursor0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'close', 'object': 'cursor0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 1}}, {'name': 'abortTransaction', 'object': 'session0'}, {'name': 'assertNumberConnectionsCheckedOut', 'object': 'testRunner', 'arguments': {'client': 'client0', 'connections': 0}}], 'expectEvents': [{'client': 'client0', 'events': [{'commandStartedEvent': {'commandName': 'insert'}}, {'commandStartedEvent': {'commandName': 'find'}}, {'commandStartedEvent': {'commandName': 'killCursors'}}, {'commandStartedEvent': {'commandName': 'abortTransaction'}}]}, {'client': 'client0', 'eventType': 'cmap', 'events': [{'connectionReadyEvent': {}}, {'connectionCheckedOutEvent': {}}, {'connectionCheckedInEvent': {}}]}]}
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 1026, in check_events
    self.match_evaluator.match_event(
    actual_events = [ConnectionReadyEvent(('127.0.0.1', 8001), 1), ConnectionCheckedOutEvent(('127.0.0.1', 8001), 1), ConnectionReadyEvent(('127.0.0.1', 8001), 2), ConnectionCheckedOutEvent(('127.0.0.1', 8001), 2), ConnectionCheckedInEvent(('127.0.0.1', 8001), 2), ConnectionCheckedInEvent(('127.0.0.1', 8001), 1)]
    client_name = 'client0'
    event_spec = {'client': 'client0', 'eventType': 'cmap', 'events': [{'connectionReadyEvent': {}}, {'connectionCheckedOutEvent': {}}, {'connectionCheckedInEvent': {}}]}
    event_type = 'cmap'
    events = [{'connectionReadyEvent': {}}, {'connectionCheckedOutEvent': {}}, {'connectionCheckedInEvent': {}}]
    expected_event = {'connectionCheckedInEvent': {}}
    idx = 2
    listener = <test.unified_format.EventListenerUtil object at 0x7fc94603f3a0>
    self = <test_load_balancer.TestUnifiedTransactions testMethod=test_a_connection_can_be_shared_by_a_transaction_and_a_cursor>
    spec = [{'client': 'client0', 'events': [{'commandStartedEvent': {'commandName': 'insert'}}, {'commandStartedEvent': {'commandName': 'find'}}, {'commandStartedEvent': {'commandName': 'killCursors'}}, {'commandStartedEvent': {'commandName': 'abortTransaction'}}]}, {'client': 'client0', 'eventType': 'cmap', 'events': [{'connectionReadyEvent': {}}, {'connectionCheckedOutEvent': {}}, {'connectionCheckedInEvent': {}}]}]
  File "/Users/shane/git/mongo-python-driver/test/unified_format.py", line 581, in match_event
    self.test.assertIsInstance(actual, ConnectionCheckedInEvent)
    actual = ConnectionReadyEvent(('127.0.0.1', 8001), 2)
    event_type = 'cmap'
    expectation = {'connectionCheckedInEvent': {}}
    name = 'connectionCheckedInEvent'
    self = <test.unified_format.MatchEvaluatorUtil object at 0x7fc945968cd0>
    spec = {}
AssertionError: ConnectionReadyEvent(('127.0.0.1', 8001), 2) is not an instance of <class 'pymongo.monitoring.ConnectionCheckedInEvent'>

else
$PYTHON $COVERAGE_ARGS setup.py $C_EXTENSIONS test $TEST_ARGS $OUTPUT
fi
else
# --no_ext has to come before "test" so there is no way to toggle extensions here.
$PYTHON green_framework_test.py $GREEN_FRAMEWORK $OUTPUT
Expand Down
18 changes: 10 additions & 8 deletions pymongo/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def abort_transaction(self):
pass
finally:
self._transaction.state = _TxnState.ABORTED
self._unpin_mongos()
self._unpin()

def _finish_transaction_with_retry(self, command_name):
"""Run commit or abort with one retry after any retryable error.
Expand Down Expand Up @@ -779,13 +779,13 @@ def _pinned_address(self):
return self._transaction.pinned_address
return None

def _pin_mongos(self, server):
"""Pin this session to the given mongos Server."""
def _pin(self, server):
"""Pin this session to the given Server."""
self._transaction.sharded = True
self._transaction.pinned_address = server.description.address

def _unpin_mongos(self):
"""Unpin this session from any pinned mongos address."""
def _unpin(self):
"""Unpin this session from any pinned Server."""
self._transaction.pinned_address = None

def _txn_read_preference(self):
Expand Down Expand Up @@ -906,9 +906,11 @@ def get_server_session(self, session_timeout_minutes):
return _ServerSession(self.generation)

def return_server_session(self, server_session, session_timeout_minutes):
self._clear_stale(session_timeout_minutes)
if not server_session.timed_out(session_timeout_minutes):
self.return_server_session_no_lock(server_session)
if session_timeout_minutes is not None:
self._clear_stale(session_timeout_minutes)
if server_session.timed_out(session_timeout_minutes):
return
self.return_server_session_no_lock(server_session)

def return_server_session_no_lock(self, server_session):
# Discard sessions from an old pool to avoid duplicate sessions in the
Expand Down
19 changes: 11 additions & 8 deletions pymongo/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,15 +1197,16 @@ def _select_server(self, server_selector, session, address=None):
server = topology.select_server(server_selector)
# Pin this session to the selected server if it's performing a
# sharded transaction.
if server.description.mongos and (session and
session.in_transaction):
session._pin_mongos(server)
if (server.description.server_type in (
SERVER_TYPE.Mongos, SERVER_TYPE.LoadBalancer)
and session and session.in_transaction):
session._pin(server)
return server
except PyMongoError as exc:
# Server selection errors in a transaction are transient.
if session and session.in_transaction:
exc._add_error_label("TransientTransactionError")
session._unpin_mongos()
session._unpin()
raise

def _socket_for_writes(self, session):
Expand Down Expand Up @@ -1350,7 +1351,7 @@ def is_retrying():
_add_retryable_write_error(exc, max_wire_version)
retryable_error = exc.has_error_label("RetryableWriteError")
if retryable_error:
session._unpin_mongos()
session._unpin()
if is_retrying() or not retryable_error:
raise
if bulk:
Expand Down Expand Up @@ -1965,7 +1966,7 @@ def _add_retryable_write_error(exc, max_wire_version):
class _MongoClientErrorHandler(object):
"""Handle errors raised when executing an operation."""
__slots__ = ('client', 'server_address', 'session', 'max_wire_version',
'sock_generation', 'completed_handshake')
'sock_generation', 'completed_handshake', 'service_id')

def __init__(self, client, server, session):
self.client = client
Expand All @@ -1978,11 +1979,13 @@ def __init__(self, client, server, session):
# of the pool at the time the connection attempt was started."
self.sock_generation = server.pool.generation
self.completed_handshake = False
self.service_id = None

def contribute_socket(self, sock_info):
"""Provide socket information to the error handler."""
self.max_wire_version = sock_info.max_wire_version
self.sock_generation = sock_info.generation
self.service_id = sock_info.service_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is service_id when there is no LB?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always None when not in LB mode. When in LB mode there is a single connection pool and service_id is used to identify connections to the same host (and differentiate connections to different hosts). Take a peak at #628 and https://github.com/mongodb/specifications/blob/master/source/load-balancers/load-balancers.rst#error-handling for more info.

self.completed_handshake = True

def __enter__(self):
Expand All @@ -2001,9 +2004,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if issubclass(exc_type, PyMongoError):
if (exc_val.has_error_label("TransientTransactionError") or
exc_val.has_error_label("RetryableWriteError")):
self.session._unpin_mongos()
self.session._unpin()

err_ctx = _ErrorContext(
exc_val, self.max_wire_version, self.sock_generation,
self.completed_handshake)
self.completed_handshake, self.service_id)
self.client._topology.handle_error(self.server_address, err_ctx)
80 changes: 61 additions & 19 deletions pymongo/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,16 @@ def register(listener):
class _CommandEvent(object):
"""Base class for command events."""

__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id")
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id",
"__service_id")

def __init__(self, command_name, request_id, connection_id, operation_id):
def __init__(self, command_name, request_id, connection_id, operation_id,
service_id=None):
self.__cmd_name = command_name
self.__rqst_id = request_id
self.__conn_id = connection_id
self.__op_id = operation_id
self.__service_id = service_id

@property
def command_name(self):
Expand All @@ -535,6 +538,14 @@ def connection_id(self):
"""The address (host, port) of the server this command was sent to."""
return self.__conn_id

@property
def service_id(self):
"""The service_id this command was sent to, or ``None``.

.. versionadded:: 3.12
"""
return self.__service_id

@property
def operation_id(self):
"""An id for this series of events or None."""
Expand All @@ -551,15 +562,17 @@ class CommandStartedEvent(_CommandEvent):
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
- `service_id`: The service_id this command was sent to, or ``None``.
"""
__slots__ = ("__cmd", "__db")

def __init__(self, command, database_name, *args):
def __init__(self, command, database_name, *args, service_id=None):
if not command:
raise ValueError("%r is not a valid command" % (command,))
# Command name must be first key.
command_name = next(iter(command))
super(CommandStartedEvent, self).__init__(command_name, *args)
super(CommandStartedEvent, self).__init__(
command_name, *args, service_id=service_id)
if command_name.lower() in _SENSITIVE_COMMANDS:
self.__cmd = {}
else:
Expand All @@ -577,9 +590,12 @@ def database_name(self):
return self.__db

def __repr__(self):
return "<%s %s db: %r, command: %r, operation_id: %s>" % (
self.__class__.__name__, self.connection_id, self.database_name,
self.command_name, self.operation_id)
return (
"<%s %s db: %r, command: %r, operation_id: %s, "
"service_id: %s>") % (
self.__class__.__name__, self.connection_id,
self.database_name, self.command_name, self.operation_id,
self.service_id)


class CommandSucceededEvent(_CommandEvent):
Expand All @@ -593,13 +609,15 @@ class CommandSucceededEvent(_CommandEvent):
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
- `service_id`: The service_id this command was sent to, or ``None``.
"""
__slots__ = ("__duration_micros", "__reply")

def __init__(self, duration, reply, command_name,
request_id, connection_id, operation_id):
request_id, connection_id, operation_id, service_id=None):
super(CommandSucceededEvent, self).__init__(
command_name, request_id, connection_id, operation_id)
command_name, request_id, connection_id, operation_id,
service_id=service_id)
self.__duration_micros = _to_micros(duration)
if command_name.lower() in _SENSITIVE_COMMANDS:
self.__reply = {}
Expand All @@ -617,9 +635,12 @@ def reply(self):
return self.__reply

def __repr__(self):
return "<%s %s command: %r, operation_id: %s, duration_micros: %s>" % (
self.__class__.__name__, self.connection_id,
self.command_name, self.operation_id, self.duration_micros)
return (
"<%s %s command: %r, operation_id: %s, duration_micros: %s, "
"service_id: %s>") % (
self.__class__.__name__, self.connection_id,
self.command_name, self.operation_id, self.duration_micros,
self.service_id)


class CommandFailedEvent(_CommandEvent):
Expand All @@ -633,11 +654,12 @@ class CommandFailedEvent(_CommandEvent):
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
- `service_id`: The service_id this command was sent to, or ``None``.
"""
__slots__ = ("__duration_micros", "__failure")

def __init__(self, duration, failure, *args):
super(CommandFailedEvent, self).__init__(*args)
def __init__(self, duration, failure, *args, service_id=None):
super(CommandFailedEvent, self).__init__(*args, service_id=service_id)
self.__duration_micros = _to_micros(duration)
self.__failure = failure

Expand All @@ -654,9 +676,10 @@ def failure(self):
def __repr__(self):
return (
"<%s %s command: %r, operation_id: %s, duration_micros: %s, "
"failure: %r>" % (
"failure: %r, service_id: %s>") % (
self.__class__.__name__, self.connection_id, self.command_name,
self.operation_id, self.duration_micros, self.failure))
self.operation_id, self.duration_micros, self.failure,
self.service_id)


class _PoolEvent(object):
Expand Down Expand Up @@ -721,10 +744,29 @@ class PoolClearedEvent(_PoolEvent):
:Parameters:
- `address`: The address (host, port) pair of the server this Pool is
attempting to connect to.
- `service_id`: The service_id this command was sent to, or ``None``.

.. versionadded:: 3.9
"""
__slots__ = ()
__slots__ = ("__service_id",)

def __init__(self, address, service_id=None):
super(PoolClearedEvent, self).__init__(address)
self.__service_id = service_id

@property
def service_id(self):
"""Connections with this service_id are cleared.

When service_id is ``None``, all connections in the pool are cleared.

.. versionadded:: 3.12
"""
return self.__service_id

def __repr__(self):
return '%s(%r, %r)' % (
self.__class__.__name__, self.address, self.__service_id)


class PoolClosedEvent(_PoolEvent):
Expand Down Expand Up @@ -1508,10 +1550,10 @@ def publish_pool_ready(self, address):
except Exception:
_handle_exception()

def publish_pool_cleared(self, address):
def publish_pool_cleared(self, address, service_id):
"""Publish a :class:`PoolClearedEvent` to all pool listeners.
"""
event = PoolClearedEvent(address)
event = PoolClearedEvent(address, service_id)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_cleared(event)
Expand Down
Loading