From d94e3c92056b031d41fb4fa827ad31b8e0b25826 Mon Sep 17 00:00:00 2001 From: Alexandru Cheltuitor Date: Mon, 7 Dec 2020 16:22:39 +0000 Subject: [PATCH 1/3] Add support for protonvpn-nm-lib v.0.3.0-1 As server_manager was refactored, this new improved handling method should be more flexible to use and implement. --- debian/changelog | 6 ++++++ protonvpn_cli/cli_dialog.py | 13 ++++++++----- protonvpn_cli/cli_wrapper.py | 17 ++++++++++++----- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/debian/changelog b/debian/changelog index a4d3e2f..723baa3 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +protonvpn-cli (3.2.0-1) UNRELEASED; urgency=low + + * Add support for protonvpn-nm-lib v0.3.0-1 + + -- Proton Technologies AG Mon, 07 Dec 2020 16:17:34 +0000 + protonvpn-cli (3.1.0-7) UNRELEASED; urgency=low * Add support for protonvpn-nm-lib v0.2.0-1 diff --git a/protonvpn_cli/cli_dialog.py b/protonvpn_cli/cli_dialog.py index c6a3dad..4fad99b 100644 --- a/protonvpn_cli/cli_dialog.py +++ b/protonvpn_cli/cli_dialog.py @@ -115,7 +115,7 @@ def display_country(self): def sort_servers(self): country_servers = self.countries[self.country] - other_servers = {} + non_match_tier_servers = {} match_tier_servers = {} for server in country_servers: @@ -125,12 +125,15 @@ def sort_servers(self): if tier == self.user_tier: match_tier_servers[server] = tier continue - elif (tier > self.user_tier or tier < self.user_tier) and not tier == 3: - other_servers[server] = tier + elif ( + (tier > self.user_tier or tier < self.user_tier) + and not tier == 3 + ): + non_match_tier_servers[server] = tier sorted_dict = dict( sorted( - other_servers.items(), + non_match_tier_servers.items(), key=lambda s: s[1], reverse=True ) @@ -242,7 +245,7 @@ def generate_country_dict(self, server_manager, servers): """ countries = {} for server in servers: - country = server_manager.extract_country_name(server["ExitCountry"]) + country = server_manager.extract_country_name(server["ExitCountry"]) # noqa if country not in countries.keys(): countries[country] = [] countries[country].append(server["Name"]) diff --git a/protonvpn_cli/cli_wrapper.py b/protonvpn_cli/cli_wrapper.py index a43eea6..461a65c 100644 --- a/protonvpn_cli/cli_wrapper.py +++ b/protonvpn_cli/cli_wrapper.py @@ -747,14 +747,21 @@ def get_cert_filename_and_domain(self, protocol, command): servername, protocol = self.protonvpn_dialog.start( self.session ) - - return self.server_manager.direct( - self.session, protocol, servername + command = ["servername", servername] + return self.server_manager.generate( + _method=self.CLI_COMMAND_DICT[command[0]], + command=command, + session=self.session, + protocol=protocol ) - return self.CLI_COMMAND_DICT[command[0]]( - self.session, protocol, command + return self.server_manager.generate( + _method=self.CLI_COMMAND_DICT[command[0]], + command=command, + session=self.session, + protocol=protocol ) + except (KeyError, TypeError, ValueError) as e: logger.exception("[!] Error: {}".format(e)) print("\nError: {}".format(e)) From f798b3ae38732ab57d64619c31f16fe4f316436e Mon Sep 17 00:00:00 2001 From: Alexandru Cheltuitor Date: Thu, 10 Dec 2020 11:57:58 +0000 Subject: [PATCH 2/3] Refactor configurations; Move to own class Implement compatibility changes for protonvpn-nm-lib 0.3.0 --- protonvpn_cli/cli_configure.py | 163 +++++++++++++++++++++ protonvpn_cli/cli_wrapper.py | 259 ++++++++------------------------- 2 files changed, 220 insertions(+), 202 deletions(-) create mode 100644 protonvpn_cli/cli_configure.py diff --git a/protonvpn_cli/cli_configure.py b/protonvpn_cli/cli_configure.py new file mode 100644 index 0000000..434557a --- /dev/null +++ b/protonvpn_cli/cli_configure.py @@ -0,0 +1,163 @@ + +import inspect +import sys +import time + +from protonvpn_nm_lib.constants import (FLAT_SUPPORTED_PROTOCOLS, + SUPPORTED_PROTOCOLS, + KillswitchStatusEnum, + ProtocolImplementationEnum, + UserSettingEnum, UserSettingStatusEnum) +from protonvpn_nm_lib.logger import logger + + +class CLIConfigure(): + def __init__(self, user_conf_manager, ks_manager): + self.user_conf_manager = user_conf_manager + self.ks_manager = ks_manager + + def set_protocol(self, args): + """Set default protocol setting. + + Args: + Namespace (object): list objects with cli args + """ + logger.info("Setting protocol to: {}".format(args)) + protocol_value = [args[1].pop()].pop() + + try: + index = FLAT_SUPPORTED_PROTOCOLS.index(protocol_value) + except ValueError: + logger.error("Select option is incorrect.") + print( + "\nSelected option \"{}\" is either incorrect ".format( + protocol_value + ) + "or protocol is (yet) not supported" + ) + sys.exit(1) + + protocol = FLAT_SUPPORTED_PROTOCOLS[index] + self.user_conf_manager.update_default_protocol( + protocol + ) + + logger.info("Default protocol has been updated.") + + if protocol in SUPPORTED_PROTOCOLS[ProtocolImplementationEnum.OPENVPN]: + protocol = "OpenVPN (" + protocol.upper() + ")" + + print("\nDefault connection protocol has been updated to {}".format( + protocol + )) + sys.exit() + + def set_dns(self, args): + """Set DNS setting. + + Args: + Namespace (object): list objects with cli args + """ + logger.info("Setting dns to: {}".format(args)) + dns_command = args[0] + + custom_dns_list = [] + + if dns_command == "list": + logger.info("Displaying custom DNS list") + user_configs = self.user_conf_manager.get_user_configurations() + dns_settings = user_configs[UserSettingEnum.CONNECTION]["dns"] + if len(dns_settings["custom_dns"]) > 0: + custom_dns_list = ", ".join(dns_settings["custom_dns"].split()) + print( + "\n{}".format( + "No custom DNS found" + if not len(dns_settings["custom_dns"]) else + "Custom DNS servers: " + custom_dns_list + ) + ) + sys.exit() + + reminder = "These changes will apply the next time you connect to VPN." # noqa + confirmation_message = "\nDNS automatic configuration enabled.\n" + reminder # noqa + user_choice = UserSettingStatusEnum.ENABLED + if dns_command == "ip": + user_choice = UserSettingStatusEnum.CUSTOM + custom_dns_ips = args[1] + if len(custom_dns_ips) > 3: + logger.error("More then 3 custom DNS IPs were provided") + print( + "\nYou provided more then 3 DNS servers. " + "Please enter up to 3 DNS server IPs." + ) + sys.exit(1) + for dns in custom_dns_ips: + if not self.user_conf_manager.is_valid_ip(dns): + logger.error("{} is an invalid IP".format(dns)) + print( + "\n{0} is invalid. " + "Please provide a valid IP DNS server.".format(dns) + ) + sys.exit(1) + + custom_dns_list = " ".join(dns for dns in custom_dns_ips) + print_custom_dns_list = ", ".join(dns for dns in custom_dns_ips) + confirmation_message = "\nDNS will be managed by "\ + "the provided custom IPs: \n\t{}\n{}".format( + print_custom_dns_list, + reminder + ) + + logger.info(confirmation_message) + + self.user_conf_manager.update_dns(user_choice, custom_dns_list) + print(confirmation_message) + sys.exit() + + def set_killswitch(self, args): + """Set kill switch setting. + + Args: + Namespace (object): list objects with cli args + """ + logger.info("Setting kill switch to: {}".format(args)) + user_choice_options_dict = dict( + always_on=KillswitchStatusEnum.HARD, + on=KillswitchStatusEnum.SOFT, + off=KillswitchStatusEnum.DISABLED + ) + contextual_conf_msg = { + KillswitchStatusEnum.HARD: "Always-on kill switch has been enabled.", # noqa + KillswitchStatusEnum.SOFT:"Kill switch has been enabled. Please reconnect to VPN to activate it.", # noqa + KillswitchStatusEnum.DISABLED: "Kill switch has been disabled." + } + for cls_attr in inspect.getmembers(args): + if cls_attr[0] in user_choice_options_dict and cls_attr[1]: + user_int_choice = user_choice_options_dict[cls_attr[0]] + + self.user_conf_manager.update_killswitch(user_int_choice) + self.ks_manager.manage(user_int_choice, True) + + print("\n" + contextual_conf_msg[user_int_choice]) + sys.exit() + + def restore_default_configurations(self, _): + """Restore default configurations.""" + user_choice = input( + "\nAre you sure you want to restore to " + "default configurations? [y/N]: " + ).lower().strip() + + if not user_choice == "y": + return + + logger.info("Restoring default configurations") + + print("Restoring default ProtonVPN configurations...") + time.sleep(0.5) + + # should it disconnect prior to resetting user configurations ? + + self.user_conf_manager.reset_default_configs() + + print("\nConfigurations were successfully restored back to defaults.") + sys.exit() diff --git a/protonvpn_cli/cli_wrapper.py b/protonvpn_cli/cli_wrapper.py index 461a65c..d4fc228 100644 --- a/protonvpn_cli/cli_wrapper.py +++ b/protonvpn_cli/cli_wrapper.py @@ -16,8 +16,7 @@ VIRTUAL_DEVICE_NAME) from protonvpn_nm_lib.enums import (ConnectionMetadataEnum, KillswitchStatusEnum, MetadataEnum, - ProtocolImplementationEnum, - UserSettingEnum, UserSettingStatusEnum) + ProtocolImplementationEnum) from protonvpn_nm_lib.logger import logger from protonvpn_nm_lib.services import capture_exception from protonvpn_nm_lib.services.certificate_manager import CertificateManager @@ -33,6 +32,7 @@ from .cli_dialog import ProtonVPNDialog from .vpn_state_monitor import ProtonVPNStateMonitor +from .cli_configure import CLIConfigure class CLIWrapper(): @@ -48,7 +48,6 @@ class CLIWrapper(): + "----------------" + "------------" ) - time_sleep_value = 1 reconector_manager = ReconnectorManager() user_conf_manager = UserConfigurationManager() ks_manager = KillSwitchManager(user_conf_manager) @@ -57,14 +56,14 @@ class CLIWrapper(): server_manager = ServerManager(CertificateManager(), user_manager) ipv6_lp_manager = IPv6LeakProtectionManager() protonvpn_dialog = ProtonVPNDialog(server_manager, user_manager) - CLI_COMMAND_DICT = dict( - servername=server_manager.direct, - fastest=server_manager.fastest, - random=server_manager.random_c, - cc=server_manager.country_f, - sc=server_manager.feature_f, - p2p=server_manager.feature_f, - tor=server_manager.feature_f, + CLI_CONNECT_DICT = dict( + servername=server_manager.get_config_for_specific_server, + fastest=server_manager.get_config_for_fastest_server, + random=server_manager.get_config_for_random_server, + cc=server_manager.get_config_for_fastest_server_in_country, + sc=server_manager.get_config_for_fastest_server_with_specific_feature, + p2p=server_manager.get_config_for_fastest_server_with_specific_feature, + tor=server_manager.get_config_for_fastest_server_with_specific_feature, ) def __init__(self): @@ -74,12 +73,13 @@ def __init__(self): "support being executed as root user." ) sys.exit(1) + self.connect_option = None + self.connect_option_value = None def connect(self, args): """Proxymethod to connect to ProtonVPN.""" self.server_manager.killswitch_status = self.user_conf_manager.killswitch # noqa - command = False exit_type = 1 protocol = self.determine_protocol(args) self.session = self.get_existing_session(exit_type) @@ -94,18 +94,21 @@ def connect(self, args): ) ) sys.exit(1) - + delattr(args, "help") + self.server_manager.validate_session(self.session) self.remove_existing_connection() - self.check_internet_conn() for cls_attr in inspect.getmembers(args): - if cls_attr[0] in self.CLI_COMMAND_DICT and cls_attr[1]: - command = list(cls_attr) + if cls_attr[0] in self.CLI_CONNECT_DICT and cls_attr[1]: + self.connect_option = cls_attr[0] + if isinstance(cls_attr[1], bool): + self.connect_option_value = cls_attr[0] + break - logger.info("CLI connect type: {}".format(command)) + self.connect_option_value = cls_attr[1] - conn_status = self.prepare_add_connection(protocol, command) + conn_status = self.setup_connection(protocol) print( "Connecting to ProtonVPN on {} with {}...".format( @@ -123,7 +126,7 @@ def connect(self, args): self.reconector_manager, self.session ) loop.run() - sys.exit(exit_type) + sys.exit() def disconnect(self): """Proxymethod to disconnect from ProtonVPN.""" @@ -188,6 +191,7 @@ def logout(self, session=None, _pass_check=None, _removed=None): if _pass_check is None and _removed is None: print("Logging out...") session = self.get_existing_session(exit_type) + self.server_manager.validate_session(session) try: session.logout() except exceptions.ProtonSessionWrapperError: @@ -232,6 +236,7 @@ def logout(self, session=None, _pass_check=None, _removed=None): def status(self): """Proxymethod to diplay connection status.""" conn_status = self.connection_manager.display_connection_status() + print(conn_status) if not conn_status: print("\nNo active ProtonVPN connection.") sys.exit() @@ -289,12 +294,13 @@ def status(self): def configure(self, args): """Configure user settings.""" logger.info("Starting to configure") + cli_configure = CLIConfigure(self.user_conf_manager, self.ks_manager) cli_config_commands = dict( - protocol=self.set_protocol, - dns=self.set_dns, - ip=self.set_dns, - list=self.set_dns, - default=self.restore_default_configurations, + protocol=cli_configure.set_protocol, + dns=cli_configure.set_dns, + ip=cli_configure.set_dns, + list=cli_configure.set_dns, + default=cli_configure.restore_default_configurations, ) for cls_attr in inspect.getmembers(args): @@ -303,152 +309,6 @@ def configure(self, args): cli_config_commands[command[0]](command) - def set_protocol(self, args): - """Set default protocol setting. - - Args: - Namespace (object): list objects with cli args - """ - logger.info("Setting protocol to: {}".format(args)) - protocol_value = [args[1].pop()].pop() - - try: - index = FLAT_SUPPORTED_PROTOCOLS.index(protocol_value) - except ValueError: - logger.error("Select option is incorrect.") - print( - "\nSelected option \"{}\" is either incorrect ".format( - protocol_value - ) + "or protocol is (yet) not supported" - ) - sys.exit(1) - - protocol = FLAT_SUPPORTED_PROTOCOLS[index] - self.user_conf_manager.update_default_protocol( - protocol - ) - - logger.info("Default protocol has been updated.") - - if protocol in SUPPORTED_PROTOCOLS[ProtocolImplementationEnum.OPENVPN]: - protocol = "OpenVPN (" + protocol.upper() + ")" - - print("\nDefault connection protocol has been updated to {}".format( - protocol - )) - sys.exit() - - def set_dns(self, args): - """Set DNS setting. - - Args: - Namespace (object): list objects with cli args - """ - logger.info("Setting dns to: {}".format(args)) - dns_command = args[0] - - custom_dns_list = [] - - if dns_command == "list": - logger.info("Displaying custom DNS list") - user_configs = self.user_conf_manager.get_user_configurations() - dns_settings = user_configs[UserSettingEnum.CONNECTION]["dns"] - if len(dns_settings["custom_dns"]) > 0: - custom_dns_list = ", ".join(dns_settings["custom_dns"].split()) - print( - "\n{}".format( - "No custom DNS found" - if not len(dns_settings["custom_dns"]) else - "Custom DNS servers: " + custom_dns_list - ) - ) - sys.exit() - - reminder = "These changes will apply the next time you connect to VPN." # noqa - confirmation_message = "\nDNS automatic configuration enabled.\n" + reminder # noqa - user_choice = UserSettingStatusEnum.ENABLED - if dns_command == "ip": - user_choice = UserSettingStatusEnum.CUSTOM - custom_dns_ips = args[1] - if len(custom_dns_ips) > 3: - logger.error("More then 3 custom DNS IPs were provided") - print( - "\nYou provided more then 3 DNS servers. " - "Please enter up to 3 DNS server IPs." - ) - sys.exit(1) - for dns in custom_dns_ips: - if not self.user_conf_manager.is_valid_ip(dns): - logger.error("{} is an invalid IP".format(dns)) - print( - "\n{0} is invalid. " - "Please provide a valid IP DNS server.".format(dns) - ) - sys.exit(1) - - custom_dns_list = " ".join(dns for dns in custom_dns_ips) - print_custom_dns_list = ", ".join(dns for dns in custom_dns_ips) - confirmation_message = "\nDNS will be managed by "\ - "the provided custom IPs: \n\t{}\n{}".format( - print_custom_dns_list, - reminder - ) - - logger.info(confirmation_message) - - self.user_conf_manager.update_dns(user_choice, custom_dns_list) - print(confirmation_message) - sys.exit() - - def set_killswitch(self, args): - """Set kill switch setting. - - Args: - Namespace (object): list objects with cli args - """ - logger.info("Setting kill switch to: {}".format(args)) - user_choice_options_dict = dict( - always_on=KillswitchStatusEnum.HARD, - on=KillswitchStatusEnum.SOFT, - off=KillswitchStatusEnum.DISABLED - ) - contextual_conf_msg = { - KillswitchStatusEnum.HARD: "Always-on kill switch has been enabled.", # noqa - KillswitchStatusEnum.SOFT:"Kill switch has been enabled. Please reconnect to VPN to activate it.", # noqa - KillswitchStatusEnum.DISABLED: "Kill switch has been disabled." - } - for cls_attr in inspect.getmembers(args): - if cls_attr[0] in user_choice_options_dict and cls_attr[1]: - user_int_choice = user_choice_options_dict[cls_attr[0]] - - self.user_conf_manager.update_killswitch(user_int_choice) - self.ks_manager.manage(user_int_choice, True) - - print("\n" + contextual_conf_msg[user_int_choice]) - sys.exit() - - def restore_default_configurations(self, _): - """Restore default configurations.""" - user_choice = input( - "\nAre you sure you want to restore to " - "default configurations? [y/N]: " - ).lower().strip() - - if not user_choice == "y": - return - - logger.info("Restoring default configurations") - - print("Restoring default ProtonVPN configurations...") - time.sleep(0.5) - - # should it disconnect prior to resetting user configurations ? - - self.user_conf_manager.reset_default_configs() - - print("\nConfigurations were successfully restored back to defaults.") - sys.exit() - def reconnect(self): """Reconnect to previously connected server.""" logger.info("Attemtping to recconnect to previous server") @@ -510,10 +370,9 @@ def reconnect(self): sys.exit(1) self.check_internet_conn() - print(previous_server, protocol) self.remove_existing_connection() - conn_status = self.prepare_add_connection( + conn_status = self.setup_connection( protocol, ["servername", previous_server] ) @@ -531,23 +390,32 @@ def reconnect(self): ) sys.exit() - def prepare_add_connection(self, protocol, command): + def setup_connection(self, protocol): exit_type = 1 openvpn_username, openvpn_password = self.get_ovpn_credentials( exit_type ) logger.info("OpenVPN credentials fetched") - (certificate_filename, domain, - entry_ip) = self.get_cert_filename_and_domain( - protocol, - command + ( + servername, domain, + server_feature, + filtered_servers, servers + ) = self.get_connection_configurations() + + ( + certificate_fp, + matching_domain, + entry_ip + ) = self.server_manager.generate_server_certificate( + servername, domain, server_feature, + protocol, servers, filtered_servers ) logger.info("Certificate, domain and entry ip were fetched.") self.add_vpn_connection( - certificate_filename, openvpn_username, openvpn_password, - domain, exit_type, entry_ip + certificate_fp, openvpn_username, openvpn_password, + matching_domain, exit_type, entry_ip ) conn_status = self.connection_manager.display_connection_status( @@ -583,10 +451,6 @@ def extract_server_info(self, servername): Returns: tuple: (country, load, features_list) """ - self.server_manager.cache_servers( - session=self.get_existing_session() - ) - servers = self.server_manager.extract_server_list() try: country_code = self.server_manager.extract_server_value( @@ -732,14 +596,12 @@ def get_ovpn_credentials(self, exit_type, retry=False): return openvpn_username, openvpn_password - def get_cert_filename_and_domain(self, protocol, command): + def get_connection_configurations(self): """Proxymethod to get certficate filename and server domain.""" is_dialog = False handle_error = False - try: - invoke_dialog = command[0] # noqa - except TypeError: + if self.connect_option is None: is_dialog = True try: @@ -747,19 +609,12 @@ def get_cert_filename_and_domain(self, protocol, command): servername, protocol = self.protonvpn_dialog.start( self.session ) - command = ["servername", servername] - return self.server_manager.generate( - _method=self.CLI_COMMAND_DICT[command[0]], - command=command, - session=self.session, - protocol=protocol - ) + self.connect_option = "servername" + self.connect_option_value = servername - return self.server_manager.generate( - _method=self.CLI_COMMAND_DICT[command[0]], - command=command, - session=self.session, - protocol=protocol + return self.CLI_CONNECT_DICT[self.connect_option]( + self.session, + self.connect_option_value ) except (KeyError, TypeError, ValueError) as e: @@ -814,7 +669,7 @@ def get_cert_filename_and_domain(self, protocol, command): if handle_error == 403: self.login(force=True) self.session = self.get_existing_session(exit_type=1) - self.get_cert_filename_and_domain(protocol, command) + return self.get_connection_configurations() def determine_protocol(self, args): """Determine protocol based on CLI input arguments.""" @@ -823,8 +678,8 @@ def determine_protocol(self, args): protocol = args.protocol.lower().strip() except AttributeError: protocol = self.user_conf_manager.default_protocol - else: - delattr(args, "protocol") + + delattr(args, "protocol") return protocol From 752a1309717e93fa8291dfb9b243fad28cd90cf7 Mon Sep 17 00:00:00 2001 From: Alexandru Cheltuitor Date: Thu, 10 Dec 2020 13:11:26 +0000 Subject: [PATCH 3/3] Update dependencies --- debian/changelog | 1 + debian/control | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/debian/changelog b/debian/changelog index 723baa3..b8672a1 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,6 +1,7 @@ protonvpn-cli (3.2.0-1) UNRELEASED; urgency=low * Add support for protonvpn-nm-lib v0.3.0-1 + * Update dependencies -- Proton Technologies AG Mon, 07 Dec 2020 16:17:34 +0000 diff --git a/debian/control b/debian/control index 3256566..1a04999 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ X-Python3-Version: >= 3.5 Package: protonvpn-cli Architecture: all -Depends: ${python3:Depends}, ${misc:Depends}, python3-dialog, python3-protonvpn-nm-lib +Depends: ${python3:Depends}, ${misc:Depends}, python3-dialog, python3-protonvpn-nm-lib (>=0.3.0) Description: ProtonVPN CLI (Python 3) Package installs official ProtonVPN CLI. diff --git a/setup.py b/setup.py index 660e025..3af2d0e 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ author_email="contact@protonvpn.com", long_description=long_description, install_requires=[ - "protonvpn-nm-lib", "pythondialog" + "protonvpn-nm-lib~=0.3.0", "pythondialog" ], include_package_data=True, license="GPLv3",