Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[packages]
asyncio = ">=3.4.3"

[requires]
python_version = ">=3.11.2"
31 changes: 31 additions & 0 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ A tool for scanning and auditing KNXnet/IP gateways on IP driven networks. KNXne

KNXmap heavily relies on the [asyncio](https://docs.python.org/3/library/asyncio.html) module and therefore requires Python 3.4 or newer. There are just a few optional dependencies that are required for some special features.

## Usage
## Install
```
sudo apt-get install python3.11-venv
python3 -m venv .venv
pip install pipenv
pipenv sync
```

## Usage
```
python setup.py install
knxmap -h
. .venv/bin/activate
# python main.py -h
```

## Documentation
Expand Down
90 changes: 39 additions & 51 deletions knxmap/bus/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ def datagram_received(self, data, addr):
knx_msg.set_peer(addr)
LOGGER.trace_incoming(knx_msg)
knx_service_type = knx_msg.header.get('service_type') >> 8
if knx_service_type is 0x02: # Core
if knx_service_type == 0x02: # Core
self.handle_core_services(knx_msg)
elif knx_service_type is 0x03: # Device Management
elif knx_service_type == 0x03: # Device Management
self.handle_configuration_services(knx_msg)
elif knx_service_type is 0x04: # Tunnelling
elif knx_service_type == 0x04: # Tunnelling
self.handle_tunnel_services(knx_msg)
else:
LOGGER.error('Service not implemented: {}'.format(
Expand Down Expand Up @@ -284,7 +284,7 @@ def handle_tunnel_services(self, knx_msg):
if cemi_tpci_type == CEMI_TPCI_TYPES.get('UCD'):
# TODO: will this even happen?
# TODO: doesn't it need to use knx_src instead of knx_dst?
if knx_msg.cemi.tpci.status is 1:
if knx_msg.cemi.tpci.status == 1:
# TODO: why checking status here? pls document why
if knx_dst in self.target_futures.keys():
if not self.target_futures[knx_dst].done():
Expand Down Expand Up @@ -347,7 +347,7 @@ def handle_tunnel_services(self, knx_msg):

elif isinstance(knx_msg, KnxTunnellingAck):
# TODO: do we have to increase any sequence here?
LOGGER.debug('Tunnelling ACK reqceived')
LOGGER.debug('Tunnelling ACK received')
if knx_msg.status:
LOGGER.error('An error occured during frame transmission')
else:
Expand Down Expand Up @@ -443,13 +443,12 @@ def knx_tpci_disconnect(self, target):
LOGGER.trace_outgoing(tunnel_request)
self.transport.sendto(tunnel_request.get_message())

@asyncio.coroutine
def get_device_type(self, target):
async def get_device_type(self, target):
"""A helper function that just returns the device type
returned by A_DeviceDescriptor_Read as an integer. This
can be used e.g. to determine whether a type requires
authorization (System 2/System7) or not (System 1)."""
descriptor = yield from self.apci_device_descriptor_read(target)
descriptor = await self.apci_device_descriptor_read(target)
if not descriptor:
return False
try:
Expand All @@ -459,14 +458,13 @@ def get_device_type(self, target):
_, desc_type, _ = KnxMessage.parse_device_descriptor(dev_desc)
return desc_type

@asyncio.coroutine
def apci_device_descriptor_read(self, target):
async def apci_device_descriptor_read(self, target):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_device_descriptor_read(
sequence=self.tpci_seq_counts.get(target))
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest):
cemi = value.cemi
if cemi.apci.apci_type == CEMI_APCI_TYPES.get('A_DeviceDescriptor_Response') and \
Expand All @@ -475,8 +473,7 @@ def apci_device_descriptor_read(self, target):
else:
return False

@asyncio.coroutine
def apci_property_value_read(self, target, object_index=0, property_id=0x0f,
async def apci_property_value_read(self, target, object_index=0, property_id=0x0f,
num_elements=1, start_index=1):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_property_value_read(
Expand All @@ -486,16 +483,15 @@ def apci_property_value_read(self, target, object_index=0, property_id=0x0f,
num_elements=num_elements,
start_index=start_index)
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[4:]
else:
return False

@asyncio.coroutine
def apci_property_description_read(self, target, object_index=0, property_id=0x0f,
async def apci_property_description_read(self, target, object_index=0, property_id=0x0f,
num_elements=1, start_index=1):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_property_description_read(
Expand All @@ -505,31 +501,30 @@ def apci_property_description_read(self, target, object_index=0, property_id=0x0
num_elements=num_elements,
start_index=start_index)
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[4:]
else:
return False

@asyncio.coroutine
def apci_memory_read(self, target, memory_address=0x0060, read_count=1):
async def apci_memory_read(self, target, memory_address=0x0060, read_count=1):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_memory_read(
sequence=self.tpci_seq_counts.get(target),
memory_address=memory_address,
read_count=read_count)
LOGGER.trace_outgoing(tunnel_request)
knx_msg = yield from self.send_data(tunnel_request.get_message(), target)
knx_msg = await self.send_data(tunnel_request.get_message(), target)
# TODO: if that works, it should be implemented for all APCI functions!
if not isinstance(knx_msg, KnxTunnellingRequest) or \
knx_msg.cemi.apci.apci_type == CEMI_APCI_TYPES.get('A_Memory_Response') or \
int.from_bytes(knx_msg.cemi.data[:2], 'big') == memory_address:
# Put the response back in the queue
if not isinstance(knx_msg, bool):
self.response_queue.append(knx_msg)
yield from asyncio.sleep(.3)
await asyncio.sleep(.3)
knx_msg = None
for response in self.response_queue:
if isinstance(response, KnxTunnellingRequest) and \
Expand All @@ -540,14 +535,13 @@ def apci_memory_read(self, target, memory_address=0x0060, read_count=1):
self.response_queue.remove(response)
if not knx_msg:
LOGGER.debug('No proper response received')
yield from self.tpci_send_ncd(target)
await self.tpci_send_ncd(target)
if knx_msg and knx_msg.cemi.data:
return knx_msg.cemi.data[2:]
else:
return False

@asyncio.coroutine
def apci_memory_write(self, target, memory_address=0x0060, write_count=1,
async def apci_memory_write(self, target, memory_address=0x0060, write_count=1,
data=b'\x00'):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_memory_write(
Expand All @@ -556,32 +550,30 @@ def apci_memory_write(self, target, memory_address=0x0060, write_count=1,
write_count=write_count,
data=data)
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[2:]
else:
return False

@asyncio.coroutine
def apci_key_write(self, target, level, key):
async def apci_key_write(self, target, level, key):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_key_write(
sequence=self.tpci_seq_counts.get(target),
level=level,
key=key)
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[2:]
else:
return False

@asyncio.coroutine
def apci_authenticate(self, target, key=0xffffffff):
async def apci_authenticate(self, target, key=0xffffffff):
"""Send an A_Authorize_Request to target with the
supplied key. Returns the access level as an int
or False if an error occurred."""
Expand All @@ -590,60 +582,56 @@ def apci_authenticate(self, target, key=0xffffffff):
sequence=self.tpci_seq_counts.get(target),
key=key)
LOGGER.trace_outgoing(tunnel_request)
auth = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
auth = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(auth, KnxTunnellingRequest):
return int.from_bytes(auth.cemi.data, 'big')
else:
return False

@asyncio.coroutine
def apci_group_value_write(self, target, value=0):
async def apci_group_value_write(self, target, value=0):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_group_value_write(value=value)
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
value = await self.send_data(tunnel_request.get_message(), target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[4:]
else:
return False

@asyncio.coroutine
def apci_individual_address_read(self, target):
async def apci_individual_address_read(self, target):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_individual_address_read(
sequence=self.tpci_seq_counts.get(target))
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[4:]
else:
return False

@asyncio.coroutine
def apci_user_manufacturer_info_read(self, target):
async def apci_user_manufacturer_info_read(self, target):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_user_manufacturer_info_read(
sequence=self.tpci_seq_counts.get(target))
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
yield from self.tpci_send_ncd(target)
value = await self.send_data(tunnel_request.get_message(), target)
await self.tpci_send_ncd(target)
if isinstance(value, KnxTunnellingRequest) and \
value.cemi.data:
return value.cemi.data[4:]
else:
return False

@asyncio.coroutine
def apci_restart(self, target):
async def apci_restart(self, target):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.apci_restart(
sequence=self.tpci_seq_counts.get(target))
LOGGER.trace_outgoing(tunnel_request)
value = yield from self.send_data(tunnel_request.get_message(), target)
value = await self.send_data(tunnel_request.get_message(), target)
if isinstance(value, KnxTunnellingRequest):
return True
else:
Expand Down
Loading