Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zigpy network scanning #251

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import asyncio
import logging
from typing import AsyncGenerator

import zigpy.zcl
import zigpy.zdo
Expand Down Expand Up @@ -248,6 +249,51 @@ async def set_tx_power(self, dbm: int) -> None:
"Requested TX power %d was adjusted to %d", dbm, rsp.StatusOrPower
)

async def network_scan(
self, channels: t.Channels, duration_exp: int
) -> AsyncGenerator[zigpy.types.NetworkBeacon, None]:
# XXX: Network scanning only works if the stack is not running!
await self._znp.reset()

async with self._znp.capture_responses(
[
c.ZDO.BeaconNotifyInd.Callback(partial=True),
c.ZDO.NwkDiscoveryCnf.Callback(partial=True),
]
) as updates:
await self._znp.request(
c.ZDO.NetworkDiscoveryReq.Req(
Channels=channels,
ScanDuration=duration_exp,
),
RspStatus=t.Status.SUCCESS,
)

while True:
update = await updates.get()

if isinstance(update, c.ZDO.NwkDiscoveryCnf.Callback):
break

for beacon in update.Beacons:
yield zigpy.types.NetworkBeacon(
pan_id=beacon.PanId,
extended_pan_id=beacon.ExtendedPanId,
channel=beacon.Channel,
permit_joining=beacon.PermitJoining,
stack_profile=beacon.StackProfile,
nwk_update_id=beacon.UpdateId,
lqi=beacon.LQI,
src=beacon.Src,
depth=beacon.Depth,
router_capacity=beacon.RouterCapacity,
device_capacity=beacon.DeviceCapacity,
protocol_version=beacon.ProtocolVersion,
# Only thing not supported is `rssi`
)

await self.start_network()

def get_dst_address(self, cluster: zigpy.zcl.Cluster) -> zdo_t.MultiAddress:
"""
Helper to get a dst address for bind/unbind operations.
Expand Down
Loading