Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions features/productid.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Notes: if you want to extend your tests, then it could be useful to get
# the list of RPMs in given repository using following command:
# $ dnf list --available --repo=never-enabled-content-37060


Feature: Management of productid certificate by libdnf5 plugin productid
The productid libdnf5 plugin should install product certificate
from testing repositories to /etc/pki/product and add record
to "database" in /var/lib/rhsm/productid.json. When the last RPM
from given repository is removed, then corresponding product
certificate should be removed, when it is not needed by any other
repository.


Scenario: Install one productid certificate for one installed RPM
Given system is registered against candlepin server
Given repositories are enabled
| repo_id |
| never-enabled-content-37060 |
When rpms are installed from RPM repository
| rpm_name |
| slow-eagle |
Then productid certificate "37060.pem" is installed in "/etc/pki/product"
And product database contains
| repo_id | product_id |
| never-enabled-content-37060 | 37060 |
Then rpms are removed from system
| rpm_name |
| slow-eagle |


Scenario: Install one productid certificate for two installed RPM
Given system is registered against candlepin server
Given repositories are enabled
| repo_id |
| never-enabled-content-37060 |
When rpms are installed from RPM repository
| rpm_name |
| slow-eagle |
| tricky-frog |
Then productid certificate "37060.pem" is installed in "/etc/pki/product"
And product database contains
| repo_id | product_id |
| never-enabled-content-37060 | 37060 |
Then rpms are removed from system
| rpm_name |
| slow-eagle |
| tricky-frog |


Scenario: Install two productid certificates for two installed RPM
Given system is registered against candlepin server
Given repositories are enabled
| repo_id |
| never-enabled-content-37060 |
| awesomeos-100000000000002 |
When rpms are installed from RPM repository
| rpm_name |
| slow-eagle |
| awesome-rabbit |
Then productid certificate "37060.pem" is installed in "/etc/pki/product"
And productid certificate "100000000000002.pem" is installed in "/etc/pki/product"
And product database contains
| repo_id | product_id |
| never-enabled-content-37060 | 37060 |
| awesomeos-100000000000002 | 100000000000002 |
When rpms are removed from system
| rpm_name |
| awesome-rabbit |
Then product database contains
| repo_id | product_id |
| never-enabled-content-37060 | 37060 |
Then rpms are removed from system
| rpm_name |
| slow-eagle |

Scenario: Install one productid certificates for two RPMs installed from different repository
Given system is registered against candlepin server
Given repositories are enabled
| repo_id |
| never-enabled-content-37060 |
| content-label-37060 |
When rpms are installed from RPM repository
| rpm_name |
| slow-eagle |
| cool-mouse |
Then productid certificate "37060.pem" is installed in "/etc/pki/product"
And product database contains
| repo_id | product_id |
| never-enabled-content-37060 | 37060 |
| content-label-37060 | 37060 |
Then rpms are removed from system
| rpm_name |
| slow-eagle |
| cool-mouse |
159 changes: 159 additions & 0 deletions features/steps/test_impl_productid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from behave import *

import subprocess
import os
import json


def run(cmd, shell=True, cwd=None):
"""
Run a command.
Return exitcode, stdout, stderr
"""

proc = subprocess.Popen(
cmd,
shell=shell,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
errors="surrogateescape",
)
Comment on lines +14 to +22
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep


stdout, stderr = proc.communicate()
return proc.returncode, stdout, stderr


def run_in_context(context, cmd, can_fail=False, expected_exit_code=None, **run_args):
"""
Run a command in the context of a behave scenario.
:param context: behave context
:param cmd: command to run
:param can_fail: whether the command can fail without raising an error
:param expected_exit_code: expected exit code of the command
:param run_args: additional arguments to pass to subprocess.Popen
:return: None
"""
if getattr(context, "faketime", None) is not None:
cmd = "NO_FAKE_STAT=1 " + context.faketime + cmd

if getattr(context, "fake_kernel_release", None) is not None:
cmd = context.fake_kernel_release + cmd

if getattr(context, "lc_all", None) is not None:
cmd = context.lc_all + cmd

context.cmd = cmd

if hasattr(context.scenario, "working_dir") and "cwd" not in run_args:
run_args["cwd"] = context.scenario.working_dir

context.cmd_exitcode, context.cmd_stdout, context.cmd_stderr = run(cmd, **run_args)

if not can_fail and context.cmd_exitcode != 0:
raise AssertionError(
'Running command "%s" failed: %s' % (cmd, context.cmd_exitcode)
)
elif expected_exit_code is not None and expected_exit_code != context.cmd_exitcode:
raise AssertionError(
'Running command "%s" had unexpected exit code: %s'
% (cmd, context.cmd_exitcode)
)


@given("system is registered against candlepin server")
def step_impl(context):
"""
Check if the system is registered against the candlepin server. If not, register it.
:param context: behave context
:return: None
"""
cmd = "rhc status --format json"
run_in_context(context, cmd, can_fail=True)
result = json.loads(context.cmd_stdout)
if not result["rhsm_connected"]:
cmd = "rhc connect --username admin --password admin --organization donaldduck"
run_in_context(context, cmd, can_fail=False)


@given("repositories are enabled")
def step_impl(context):
"""
Enable repositories for the system.
:param context: behave context
:return: None
"""
for row in context.table:
cmd = f"dnf5 config-manager setopt {row['repo_id']}.enabled=1"
run_in_context(context, cmd, can_fail=False)


@when('rpms are installed from RPM repository')
def step_impl(context):
"""
Install RPM packages from the RPM repository.
:param context: behave context
:return: None
"""
rpm_list = []
for row in context.table:
rpm_list.append(row['rpm_name'])
assert len(rpm_list) > 0
cmd = f"dnf5 install -y {' '.join(rpm_list)}"
run_in_context(context, cmd, can_fail=False)


@step("rpms are removed from system")
def step_impl(context):
"""
Remove RPMSs from the system.
:param context: behave context
:return: None
"""
rpm_list = []
for row in context.table:
rpm_list.append(row['rpm_name'])
assert len(rpm_list) > 0
cmd = f"dnf5 remove -y {' '.join(rpm_list)}"
run_in_context(context, cmd, can_fail=False)


@then(
'productid certificate "{product_cert_name}" is installed in "{product_cert_dir_path}"'
)
def step_impl(context, product_cert_name, product_cert_dir_path):
"""
Check if the productid certificate is installed in the specified directory.
:param context: behave context
:param product_cert_name: name of the productid certificate
:param product_cert_dir_path: path to the directory where the certificate should be installed
:return: None
"""
product_cert_path = os.path.join(product_cert_dir_path, product_cert_name)
assert os.path.exists(product_cert_path)


PRODUCTDB_PATH = "/var/lib/rhsm/productid.json"


@then("product database contains")
def step_impl(contex):
"""
Check if the product database contains the expected products and repositories.
:param contex: behave context
:return: None
"""
productdb_content = None
assert os.path.exists(PRODUCTDB_PATH)
with open(PRODUCTDB_PATH) as f:
productdb_content = json.loads(f.read())

assert productdb_content is not None
assert isinstance(productdb_content, dict)
for row in contex.table:
product_id = row["product_id"]
repo_id = row["repo_id"]
assert product_id in productdb_content
repo_list = productdb_content[product_id]
assert repo_id in repo_list