Skip to content

Commit

Permalink
[SONTD] Add 802.11Q VLAN select for VLAN untagged/access setting
Browse files Browse the repository at this point in the history
  • Loading branch information
Son T Dang committed Dec 3, 2024
1 parent 429a294 commit 8c8d2e6
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 106 deletions.
4 changes: 4 additions & 0 deletions custom_components/tplink_easy_smart/binary_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ def _handle_coordinator_update(self) -> None:
self._attr_extra_state_attributes[
"speed_config"
] = DISPLAYED_PORT_SPEED.get(port_info.speed_config)
if port_info.port_based_vlanid:
self._attr_extra_state_attributes["current_vlan"] = 'VLAN-' + str(port_info.port_based_vlanid)
elif port_info.pvid_1q_vlanid:
self._attr_extra_state_attributes["current_vlan"] = 'VLAN-' + str(port_info.pvid_1q_vlanid)
else:
self._attr_available = False
self._attr_is_on = None
Expand Down
16 changes: 10 additions & 6 deletions custom_components/tplink_easy_smart/client/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class PortState:
flow_control_actual: bool
speed_config: PortSpeed
speed_actual: PortSpeed

port_based_vlanid: int
pvid_1q_vlanid: int

# ---------------------------
# PortPoeState
Expand Down Expand Up @@ -148,17 +149,20 @@ class PoeState:


# ---------------------------
# PortVLAN
# PortBasedVLAN
# ---------------------------
@dataclass
class PortVLAN:
class PortBasedVLAN:
vlanid: int
ports: list[int]


# ---------------------------
# PortVLAN
# IEEE1QVLAN
# ---------------------------
@dataclass
class VLAN:
class IEEE1QVLAN:
vlanid: int
ports: list[int]
ungtag_ports: list[int]
tag_ports: list[int]
notmem_ports: list[int]
3 changes: 3 additions & 0 deletions custom_components/tplink_easy_smart/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
URL_PORTS_SETTINGS_GET: Final = "PortSettingRpm.htm"
URL_POE_SETTINGS_GET: Final = "PoeConfigRpm.htm"
URL_VLAN_PORT_BASED_GET: Final = "VlanPortBasicRpm.htm"
URL_VLAN_8021Q_GET: Final = "Vlan8021QRpm.htm"

URL_PORT_SETTINGS_SET: Final = "port_setting.cgi"
URL_POE_SETTINGS_SET: Final = "poe_global_config.cgi"
URL_POE_PORT_SETTINGS_SET: Final = "poe_port_config.cgi"
URL_VLAN_PORT_BASED_SET: Final = "pvlanSet.cgi"
URL_VLAN_1Q_SET: Final = "qvlanSet.cgi"
URL_VLAN_PVID_SET: Final = "vlanPvidSet.cgi"

FEATURE_POE: Final = "feature_poe"
172 changes: 147 additions & 25 deletions custom_components/tplink_easy_smart/client/tplink_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
PortSpeed,
PortState,
TpLinkSystemInfo,
PortVLAN,
VLAN,
PortBasedVLAN,
IEEE1QVLAN,
)
from .const import (
FEATURE_POE,
Expand All @@ -29,6 +29,9 @@
URL_PORTS_SETTINGS_GET,
URL_VLAN_PORT_BASED_GET,
URL_VLAN_PORT_BASED_SET,
URL_VLAN_8021Q_GET,
URL_VLAN_1Q_SET,
URL_VLAN_PVID_SET
)
from .coreapi import TpLinkWebApi, VariableType
from .utils import TpLinkFeaturesDetector
Expand Down Expand Up @@ -136,53 +139,50 @@ def get_value(key: str) -> str | None:
hardware=get_value("hardwareStr"),
)

async def get_port_vlan_info(self) -> (list[PortVLAN], {int: VLAN}):
async def get_port_based_vlan_info(self) -> (list[int], {int: PortBasedVLAN}):
"""Return the port states."""
data = await self._core_api.get_variable(
URL_VLAN_PORT_BASED_GET, "pvlan_ds", VariableType.Dict
)

ports: list[PortVLAN] = []
vlans: {int: VLAN} = {}
ports: list[int] = []
vlans: {int: PortBasedVLAN} = {}

enable = data.get("state")
if not enable:
return (ports, vlans)
return (None, None)

port_num = data.get("portNum")
if not port_num:
return (ports, vlans)
return (None, None)

vids = data.get("vids")
if not vids:
return (ports, vlans)
return (None, None)

mbrs = data.get("mbrs")
if not mbrs:
return (ports, vlans)
return (None, None)

port_idx = 1
while port_idx <= port_num:
done = False
for (idx, vid) in enumerate(vids):
mbr = mbrs[idx]
val = int(mbr) & (1 << (port_idx - 1))
_LOGGER.debug("get_port_vlan_info port=%s, vlan=%s, mbr=%s, val=%s", port_idx, vid, mbr, val)
_LOGGER.debug("get_port_based_vlan_info port=%s, vlan=%s, mbr=%s, val=%s", port_idx, vid, mbr, val)

if val != 0:
port_vlan = PortVLAN(
vlanid=vid,
)
_LOGGER.debug("get_port_vlan_info port_number=%s,vlan=%s", port_idx, vid)
port_vlan = vid
_LOGGER.debug("get_port_based_vlan_info port_number=%s,vlan=%s", port_idx, vid)
ports.append(port_vlan)
done = True
break;

# if not, then the port belongs to default VLAN-1
if not done:
port_vlan = PortVLAN(
vlanid=1,
)
_LOGGER.debug("get_port_vlan_info port=%s,vlan=1", port_idx)
port_vlan = 1
_LOGGER.debug("get_port_based_vlan_info port=%s,vlan=1", port_idx)
ports.append(port_vlan)

port_idx += 1
Expand All @@ -196,7 +196,7 @@ async def get_port_vlan_info(self) -> (list[PortVLAN], {int: VLAN}):
for (i, val) in enumerate(reversed(out)):
if val != 0: vlan_ports.append(i + 1)

vlan = VLAN(vid, vlan_ports)
vlan = PortBasedVLAN(vid, vlan_ports)
vlans[vid] = vlan

return (ports, vlans)
Expand Down Expand Up @@ -235,6 +235,8 @@ async def get_port_states(self) -> list[PortState]:
enabled=enabled_flags[number - 1] == 1,
flow_control_config=fc_config_flags[number - 1] == 1,
flow_control_actual=fc_actual_flags[number - 1] == 1,
port_based_vlanid=None,
pvid_1q_vlanid=None
)
result.append(state)

Expand Down Expand Up @@ -411,13 +413,12 @@ async def set_port_poe_settings(
result = await self._core_api.post(URL_POE_PORT_SETTINGS_SET, data)
_LOGGER.debug("POE_PORT_SETTINGS_SET_RESULT: %s", result)


async def set_vlan_ports(
async def set_port_based_vlan(
self,
vlanid: int,
port_list: list,
) -> None:
"""Change ports VLAN."""
"""Change ports PortBased VLAN."""
selPortsStr: string = '';
for port_number in port_list:
selPortsStr += f"selPorts={port_number}&"
Expand All @@ -429,13 +430,134 @@ async def set_vlan_ports(
)
await self._core_api.get(URL_VLAN_PORT_BASED_SET, query=query)

async def del_vlan(
async def del_port_based_vlan(
self,
vlanid: int,
) -> None:
"""Delete VLAN."""
"""Delete PortBased VLAN."""
query: str = (
f"selVlans={vlanid}&"
f"pvlan_del=Delete"
)
await self._core_api.get(URL_VLAN_PORT_BASED_SET, query=query)
await self._core_api.get(URL_VLAN_PORT_BASED_SET, query=query)

async def get_1q_vlan_info(self) -> (list[int], {int: IEEE1QVLAN}):
"""Return the port 1Q VLAN."""
data = await self._core_api.get_variable(
URL_VLAN_8021Q_GET, "qvlan_ds", VariableType.Dict
)

untag_ports: list[int] = []
vlans: {int: IEEE1QVLAN} = {}

enable = data.get("state")
if not enable:
return (None, None)

port_num = data.get("portNum")
if not port_num:
return (None, None)

vids = data.get("vids")
if not vids:
return (None, None)

untagMbrs = data.get("untagMbrs")
if not untagMbrs:
return (None, None)

tagMbrs = data.get("tagMbrs")
if not tagMbrs:
return (None, None)

port_idx = 1
while port_idx <= port_num:
done = False
for (idx, vid) in enumerate(vids):
untagMbr = untagMbrs[idx]
val = int(untagMbr) & (1 << (port_idx - 1))
_LOGGER.debug("get_1q_vlan_info port=%s, vlan=%s, untagMbr=%s, val=%s", port_idx, vid, untagMbr, val)

if val != 0:
port_vlan = vid
_LOGGER.debug("get_1q_vlan_info port_number=%s,vlan=%s", port_idx, vid)
untag_ports.append(port_vlan)
done = True
break;

# if not, then the port belongs to default VLAN-1
if not done:
port_vlan = 1
_LOGGER.debug("get_1q_vlan_info port=%s,vlan=1", port_idx)
untag_ports.append(port_vlan)

port_idx += 1

for (idx, vid) in enumerate(vids):
untagMbr = untagMbrs[idx]
vlan_untag_ports: list[int] = []
if untagMbr != 0:
bits = int(max(8, math.log(untagMbr, 2)+1))
out = [1 if untagMbr & (1 << (bits-1-n)) else 0 for n in range(bits)]
for (i, val) in enumerate(reversed(out)):
if val != 0: vlan_untag_ports.append(i + 1)

tagMbr = tagMbrs[idx]
vlan_tag_ports: list[int] = []
if tagMbr != 0:
bits = int(max(8, math.log(tagMbr, 2)+1))
out = [1 if tagMbr & (1 << (bits-1-n)) else 0 for n in range(bits)]
for (i, val) in enumerate(reversed(out)):
if val != 0: vlan_tag_ports.append(i + 1)

full_list = [*range(1, 8, 1)]
vlan_notmem_ports = [x for x in full_list if (x not in vlan_untag_ports and x not in vlan_tag_ports)]
vlan = IEEE1QVLAN(vid, vlan_untag_ports, vlan_tag_ports, vlan_notmem_ports)
vlans[vid] = vlan

return (untag_ports, vlans)


async def set_1q_untag_vlan(
self,
vlanid: int,
ungtag_ports: list[int],
tag_ports: list[int],
notmem_ports: list[int],
) -> None:
"""Change port members 802.11Q VLAN."""

_LOGGER.error("set_1q_untag_vlan vlanid=%s", vlanid)
_LOGGER.error("set_1q_untag_vlan ungtag_ports=%s", ungtag_ports)
_LOGGER.error("set_1q_untag_vlan tag_ports=%s", tag_ports)
_LOGGER.error("set_1q_untag_vlan notmem_ports=%s", notmem_ports)

selPortsStr: string = '';
for port in ungtag_ports:
selPortsStr += f"selType_{port}=0&"
for port in tag_ports:
selPortsStr += f"selType_{port}=1&"
for port in notmem_ports:
selPortsStr += f"selType_{port}=2&"

query: str = (
f"vid={vlanid}&"
f"{selPortsStr}"
f"qvlan_add=Add%2FModify"
)
await self._core_api.get(URL_VLAN_1Q_SET, query=query)


async def set_1q_port_pvid(
self,
number: int,
pvid: int
) -> None:
"""Change port PVID."""
pbm = str(pow(2, number - 1))

query: str = (
f"pbm={pbm}&"
f"pvid={pvid}"
)
await self._core_api.get(URL_VLAN_PVID_SET, query=query)
Loading

0 comments on commit 8c8d2e6

Please sign in to comment.