Skip to content

Commit

Permalink
Conbee III support (#232)
Browse files Browse the repository at this point in the history
* Initial support for the Conbee III

* Add a delay when changing network state

* Do not write security mode for Conbee III

* Skip restoring neighbors for current CB3 firmwares

* Account for EmberZNet ZDO energy scanning bug

* Fix logic for neighbor restoration for CB2

* Fix existing unit tests

* Add a new unit test for CB3 energy scanning

* Add unit test for wrong device state callback handling

* Fix new unit tests

* Remove unnecessary `_change_network_state` from `load_network_info`

* Add a `probe` method

* Fix probing schema

* Handle state change polling failures gracefully

* Bump minimum zigpy version to 0.60.0

* Use new zigpy probing methods

* Use zigpy watchdog

* Fix API using removed config

* Implement `permit_with_link_key`

* Remove watchdog from unit tests

* Fix unit tests

* Parse model info during `load_network_info`
  • Loading branch information
puddly authored Nov 16, 2023
1 parent 7264e6a commit ecaf376
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 248 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"voluptuous",
"zigpy>=0.54.1",
"zigpy>=0.60.0",
'async-timeout; python_version<"3.11"',
]

Expand Down
44 changes: 44 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,3 +949,47 @@ async def test_add_neighbour(api, mock_command_rsp):
mac_capability_flags=0x12,
)
]


async def test_cb3_device_state_callback_bug(api, mock_command_rsp):
mock_command_rsp(
command_id=deconz_api.CommandId.version,
params={"reserved": t.uint8_t(0)},
rsp={
"status": deconz_api.Status.SUCCESS,
"frame_length": t.uint16_t(9),
"version": deconz_api.FirmwareVersion(0x26450900),
},
replace=True,
)

await api.connect()

device_state = deconz_api.DeviceState(
network_state=deconz_api.NetworkState2.CONNECTED,
device_state=deconz_api.DeviceStateFlags.APSDE_DATA_CONFIRM,
)

assert api._device_state != device_state

_, rx_schema = deconz_api.COMMAND_SCHEMAS[deconz_api.CommandId.device_state]
api.data_received(
deconz_api.Command(
command_id=deconz_api.CommandId.device_state,
seq=api._seq,
payload=t.serialize_dict(
{
"status": deconz_api.Status.SUCCESS,
"frame_length": t.uint16_t(8),
"device_state": device_state,
"reserved1": t.uint8_t(0),
"reserved2": t.uint8_t(0),
},
rx_schema,
),
).serialize()
)

await asyncio.sleep(0.01)

assert api._device_state == device_state
185 changes: 81 additions & 104 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import zigpy.application
import zigpy.config
import zigpy.device
from zigpy.types import EUI64, Channels
from zigpy.types import EUI64, Channels, KeyData
import zigpy.zdo.types as zdo_t

from zigpy_deconz import types as t
Expand Down Expand Up @@ -41,6 +41,7 @@ def api():
return_value=deconz_api.DeviceState(deconz_api.NetworkState.CONNECTED)
)
api.write_parameter = AsyncMock()
api.firmware_version = deconz_api.FirmwareVersion(0x26580700)

# So the protocol version is effectively infinite
api._protocol_version.__ge__.return_value = True
Expand Down Expand Up @@ -112,7 +113,7 @@ def addr_nwk_and_ieee(nwk, ieee):
return addr


@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_WAIT", 0.001)
@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_POLL_TIME", 0.001)
@pytest.mark.parametrize(
"proto_ver, target_state, returned_state",
[
Expand Down Expand Up @@ -200,16 +201,12 @@ async def test_connect_failure(app):


async def test_disconnect(app):
reset_watchdog_task = app._reset_watchdog_task = MagicMock()
api_close = app._api.close = MagicMock()

await app.disconnect()

assert app._api is None
assert app._reset_watchdog_task is None

assert api_close.call_count == 1
assert reset_watchdog_task.cancel.call_count == 1


async def test_disconnect_no_api(app):
Expand All @@ -224,19 +221,34 @@ async def test_disconnect_close_error(app):
await app.disconnect()


async def test_permit_with_key_not_implemented(app):
with pytest.raises(NotImplementedError):
await app.permit_with_key(node=MagicMock(), code=b"abcdef")
async def test_permit_with_link_key(app):
app._api.write_parameter = AsyncMock()
app.permit = AsyncMock()

await app.permit_with_link_key(
node=t.EUI64.convert("00:11:22:33:44:55:66:77"),
link_key=KeyData.convert("aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd"),
)

assert app._api.write_parameter.mock_calls == [
mock.call(
deconz_api.NetworkParameter.link_key,
deconz_api.LinkKey(
ieee=t.EUI64.convert("00:11:22:33:44:55:66:77"),
key=KeyData.convert("aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd"),
),
)
]

assert app.permit.mock_calls == [mock.call(mock.ANY)]


async def test_deconz_dev_add_to_group(app, nwk, device_path):
group = MagicMock()
app._groups = MagicMock()
app._groups.add_group.return_value = group

deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
deconz.endpoints = {
0: sentinel.zdo,
1: sentinel.ep1,
Expand All @@ -254,9 +266,7 @@ async def test_deconz_dev_add_to_group(app, nwk, device_path):
async def test_deconz_dev_remove_from_group(app, nwk, device_path):
group = MagicMock()
app.groups[sentinel.grp_id] = group
deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
deconz.endpoints = {
0: sentinel.zdo,
1: sentinel.ep1,
Expand All @@ -268,38 +278,16 @@ async def test_deconz_dev_remove_from_group(app, nwk, device_path):


def test_deconz_props(nwk, device_path):
deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
assert deconz.manufacturer is not None
assert deconz.model is not None


@pytest.mark.parametrize(
"name, firmware_version, device_path",
[
("ConBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyUSB0"),
("ConBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyUSB0"),
("RaspBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyS0"),
("RaspBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyS0"),
("RaspBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyAMA0"),
("RaspBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyAMA0"),
],
)
def test_deconz_name(nwk, name, firmware_version, device_path):
deconz = application.DeconzDevice(
firmware_version, device_path, app, sentinel.ieee, nwk
)
assert deconz.model == name


async def test_deconz_new(app, nwk, device_path, monkeypatch):
mock_init = AsyncMock()
monkeypatch.setattr(zigpy.device.Device, "_initialize", mock_init)

deconz = await application.DeconzDevice.new(
app, sentinel.ieee, nwk, deconz_api.FirmwareVersion(0), device_path
)
deconz = await application.DeconzDevice.new(app, sentinel.ieee, nwk, "Conbee II")
assert isinstance(deconz, application.DeconzDevice)
assert mock_init.call_count == 1
mock_init.reset_mock()
Expand All @@ -311,9 +299,7 @@ async def test_deconz_new(app, nwk, device_path, monkeypatch):
22: MagicMock(),
}
app.devices[sentinel.ieee] = mock_dev
deconz = await application.DeconzDevice.new(
app, sentinel.ieee, nwk, deconz_api.FirmwareVersion(0), device_path
)
deconz = await application.DeconzDevice.new(app, sentinel.ieee, nwk, "Conbee II")
assert isinstance(deconz, application.DeconzDevice)
assert mock_init.call_count == 0

Expand Down Expand Up @@ -346,18 +332,21 @@ def test_tx_confirm_unexpcted(app, caplog):

async def test_reset_watchdog(app):
"""Test watchdog."""
with patch.object(app._api, "write_parameter") as mock_api:
dog = asyncio.create_task(app._reset_watchdog())
await asyncio.sleep(0.3)
dog.cancel()
assert mock_api.call_count == 1
app._api.protocol_version = application.PROTO_VER_WATCHDOG
app._api.get_device_state = AsyncMock()
app._api.write_parameter = AsyncMock()

with patch.object(app._api, "write_parameter") as mock_api:
mock_api.side_effect = zigpy_deconz.exception.CommandError
dog = asyncio.create_task(app._reset_watchdog())
await asyncio.sleep(0.3)
dog.cancel()
assert mock_api.call_count == 1
await app._watchdog_feed()
assert len(app._api.get_device_state.mock_calls) == 1
assert len(app._api.write_parameter.mock_calls) == 1

app._api.protocol_version = application.PROTO_VER_WATCHDOG - 1
app._api.get_device_state.reset_mock()
app._api.write_parameter.reset_mock()

await app._watchdog_feed()
assert len(app._api.get_device_state.mock_calls) == 1
assert len(app._api.write_parameter.mock_calls) == 0


async def test_force_remove(app):
Expand Down Expand Up @@ -426,11 +415,8 @@ async def test_delayed_scan():
app.topology.scan.assert_called_once_with(devices=[coord])


@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_WAIT", 0.001)
@pytest.mark.parametrize("support_watchdog", [False, True])
async def test_change_network_state(app, support_watchdog):
app._reset_watchdog_task = MagicMock()

@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_POLL_TIME", 0.001)
async def test_change_network_state(app):
app._api.get_device_state = AsyncMock(
side_effect=[
deconz_api.DeviceState(deconz_api.NetworkState.OFFLINE),
Expand All @@ -439,25 +425,11 @@ async def test_change_network_state(app, support_watchdog):
]
)

if support_watchdog:
app._api._protocol_version = application.PROTO_VER_WATCHDOG
app._api.protocol_version = application.PROTO_VER_WATCHDOG
else:
app._api._protocol_version = application.PROTO_VER_WATCHDOG - 1
app._api.protocol_version = application.PROTO_VER_WATCHDOG - 1

old_watchdog_task = app._reset_watchdog_task
cancel_mock = app._reset_watchdog_task.cancel = MagicMock()
app._api._protocol_version = application.PROTO_VER_WATCHDOG
app._api.protocol_version = application.PROTO_VER_WATCHDOG

await app._change_network_state(deconz_api.NetworkState.CONNECTED, timeout=0.01)

if support_watchdog:
assert cancel_mock.call_count == 1
assert app._reset_watchdog_task is not old_watchdog_task
else:
assert cancel_mock.call_count == 0
assert app._reset_watchdog_task is old_watchdog_task


ENDPOINT = zdo_t.SimpleDescriptor(
endpoint=None,
Expand Down Expand Up @@ -552,43 +524,14 @@ async def read_param(param_id, index):
)


@patch("zigpy_deconz.zigbee.application.asyncio.sleep", new_callable=AsyncMock)
@patch(
"zigpy_deconz.zigbee.application.ControllerApplication.initialize",
side_effect=[RuntimeError(), None],
)
@patch(
"zigpy_deconz.zigbee.application.ControllerApplication.connect",
side_effect=[RuntimeError(), None, None],
)
async def test_reconnect(mock_connect, mock_initialize, mock_sleep, app):
assert app._reconnect_task is None
app.connection_lost(RuntimeError())

assert app._reconnect_task is not None
await app._reconnect_task

assert mock_connect.call_count == 3
assert mock_initialize.call_count == 2


async def test_disconnect_during_reconnect(app):
assert app._reconnect_task is None
app.connection_lost(RuntimeError())
await asyncio.sleep(0)
await app.disconnect()

assert app._reconnect_task is None


async def test_reset_network_info(app):
app.form_network = AsyncMock()
await app.reset_network_info()

app.form_network.assert_called_once()


async def test_energy_scan(app):
async def test_energy_scan_conbee_2(app):
with mock.patch.object(
zigpy.application.ControllerApplication,
"energy_scan",
Expand All @@ -601,6 +544,40 @@ async def test_energy_scan(app):
assert results == {c: c * 3 for c in Channels.ALL_CHANNELS}


async def test_energy_scan_conbee_3(app):
app._api.firmware_version = deconz_api.FirmwareVersion(0x26580900)

type(app)._device = AsyncMock()

app._device.zdo.Mgmt_NWK_Update_req = AsyncMock(
side_effect=zigpy.exceptions.DeliveryError("error")
)

with pytest.raises(zigpy.exceptions.DeliveryError):
await app.energy_scan(channels=Channels.ALL_CHANNELS, duration_exp=0, count=1)

app._device.zdo.Mgmt_NWK_Update_req = AsyncMock(
side_effect=[
asyncio.TimeoutError(),
list(
{
"Status": zdo_t.Status.SUCCESS,
"ScannedChannels": Channels.ALL_CHANNELS,
"TotalTransmissions": 0,
"TransmissionFailures": 0,
"EnergyValues": [i for i in range(11, 26 + 1)],
}.values()
),
]
)

results = await app.energy_scan(
channels=Channels.ALL_CHANNELS, duration_exp=0, count=1
)

assert results == {c: c for c in Channels.ALL_CHANNELS}


async def test_channel_migration(app):
app._api.write_parameter = AsyncMock()
app._change_network_state = AsyncMock()
Expand Down
Loading

0 comments on commit ecaf376

Please sign in to comment.