9
9
10
10
import asyncio
11
11
from functools import partial
12
+ from ipaddress import IPv4Address , IPv4Network
12
13
import logging
13
14
from typing import TYPE_CHECKING
14
15
15
16
import netifaces
16
17
from xknx .knxip import (
17
18
HPAI ,
19
+ DIBDeviceInformation ,
18
20
DIBServiceFamily ,
19
21
DIBSuppSVCFamilies ,
20
22
KNXIPFrame ,
21
23
KNXIPServiceType ,
22
24
SearchRequest ,
23
25
SearchResponse ,
24
26
)
27
+ from xknx .telegram import IndividualAddress
25
28
26
29
from .udp_client import UDPClient
27
30
@@ -43,6 +46,7 @@ def __init__(
43
46
local_ip : str ,
44
47
supports_tunnelling : bool = False ,
45
48
supports_routing : bool = False ,
49
+ individual_address : IndividualAddress | None = None ,
46
50
):
47
51
"""Initialize GatewayDescriptor class."""
48
52
self .name = name
@@ -52,16 +56,11 @@ def __init__(
52
56
self .local_ip = local_ip
53
57
self .supports_routing = supports_routing
54
58
self .supports_tunnelling = supports_tunnelling
59
+ self .individual_address = individual_address
55
60
56
61
def __str__ (self ) -> str :
57
62
"""Return object as readable string."""
58
- return (
59
- f'<GatewayDescriptor name="{ self .name } " '
60
- f'addr="{ self .ip_addr } :{ self .port } " '
61
- f'local="{ self .local_ip } @{ self .local_interface } " '
62
- f'routing="{ self .supports_routing } " '
63
- f'tunnelling="{ self .supports_tunnelling } " />'
64
- )
63
+ return f"{ self .individual_address } - { self .name } @ { self .ip_addr } :{ self .port } "
65
64
66
65
67
66
class GatewayScanFilter :
@@ -146,12 +145,15 @@ async def _send_search_requests(self) -> None:
146
145
try :
147
146
af_inet = netifaces .ifaddresses (interface )[netifaces .AF_INET ]
148
147
ip_addr = af_inet [0 ]["addr" ]
149
- await self ._search_interface (interface , ip_addr )
148
+ netmask = af_inet [0 ]["netmask" ]
149
+ await self ._search_interface (interface , ip_addr , netmask )
150
150
except KeyError :
151
151
logger .info ("Could not connect to an KNX/IP device on %s" , interface )
152
152
continue
153
153
154
- async def _search_interface (self , interface : str , ip_addr : str ) -> None :
154
+ async def _search_interface (
155
+ self , interface : str , ip_addr : str , netmask : str
156
+ ) -> None :
155
157
"""Send a search request on a specific interface."""
156
158
logger .debug ("Searching on %s / %s" , interface , ip_addr )
157
159
@@ -163,7 +165,7 @@ async def _search_interface(self, interface: str, ip_addr: str) -> None:
163
165
)
164
166
165
167
udp_client .register_callback (
166
- partial (self ._response_rec_callback , interface = interface ),
168
+ partial (self ._response_rec_callback , interface = interface , netmask = netmask ),
167
169
[KNXIPServiceType .SEARCH_RESPONSE ],
168
170
)
169
171
await udp_client .connect ()
@@ -177,13 +179,25 @@ async def _search_interface(self, interface: str, ip_addr: str) -> None:
177
179
udp_client .send (KNXIPFrame .init_from_body (search_request ))
178
180
179
181
def _response_rec_callback (
180
- self , knx_ip_frame : KNXIPFrame , udp_client : UDPClient , interface : str = ""
182
+ self ,
183
+ knx_ip_frame : KNXIPFrame ,
184
+ udp_client : UDPClient ,
185
+ interface : str = "" ,
186
+ netmask : str = "" ,
181
187
) -> None :
182
188
"""Verify and handle knxipframe. Callback from internal udpclient."""
183
189
if not isinstance (knx_ip_frame .body , SearchResponse ):
184
190
logger .warning ("Could not understand knxipframe" )
185
191
return
186
192
193
+ address : IPv4Address = IPv4Address (knx_ip_frame .body .control_endpoint .ip_addr )
194
+ network : IPv4Network = IPv4Network (
195
+ f"{ udp_client .local_addr [0 ]} /{ netmask } " , False
196
+ )
197
+
198
+ if address not in network :
199
+ return
200
+
187
201
gateway = GatewayDescriptor (
188
202
name = knx_ip_frame .body .device_name ,
189
203
ip_addr = knx_ip_frame .body .control_endpoint .ip_addr ,
@@ -202,6 +216,16 @@ def _response_rec_callback(
202
216
except StopIteration :
203
217
pass
204
218
219
+ try :
220
+ device_infos = next (
221
+ device_infos
222
+ for device_infos in knx_ip_frame .body .dibs
223
+ if isinstance (device_infos , DIBDeviceInformation )
224
+ )
225
+ gateway .individual_address = device_infos .individual_address
226
+ except StopIteration :
227
+ pass
228
+
205
229
self ._add_found_gateway (gateway )
206
230
207
231
def _add_found_gateway (self , gateway : GatewayDescriptor ) -> None :
0 commit comments