Skip to content
15 changes: 15 additions & 0 deletions conan/api/subapi/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference
from conan.internal.rest.pkg_sign import PkgSignaturesPlugin
from conan.internal.util.dates import revision_timestamp_now
from conan.internal.util.files import rmdir, mkdir, remove, save

Expand Down Expand Up @@ -76,6 +77,20 @@ def check_integrity(self, package_list):
checker = IntegrityChecker(cache)
checker.check(package_list)

def sign(self, package_list):
"""Sign packages with the signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
results = pkg_signer.sign(package_list, context="cache")
return {"results": results, "context": "cache", "action": "sign"}

def verify(self, package_list):
"""Verify packages with the signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
results = pkg_signer.verify_pkglist(package_list, context="cache")
return {"results": results, "context": "cache", "action": "verify"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why returning the "action"? It is clear that you called the verify() method, in the same way you are not returning the package_list as it is an input, both the context and action seem redundant, they are already define by the action of calling verify(pkg_list).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might sound redundant, but I think that having the information saved as a "log" might be useful if the json report is saved for later consumption or just for auditing


def clean(self, package_list, source=True, build=True, download=True, temp=True,
backup_sources=False):
"""
Expand Down
81 changes: 80 additions & 1 deletion conan/cli/commands/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from conan.api.conan_api import ConanAPI
from conan.api.model import ListPattern, MultiPackagesList
from conan.api.output import cli_out_write, ConanOutput
from conan.api.output import cli_out_write, ConanOutput, Color
from conan.cli import make_abs_path
from conan.cli.command import conan_command, conan_subcommand, OnceArgument
from conan.cli.commands.list import print_list_text, print_list_json
Expand All @@ -15,6 +15,25 @@ def json_export(data):
cli_out_write(json.dumps({"cache_path": data}))


def print_cache_sign_verify_text(data):
elements = data.get("results")
if elements:
title = "Verification" if data.get("action") == "verify" else "Signing"
cli_out_write(f"[Package signing plugin] {title} results:", fg=Color.BRIGHT_BLUE)
for ref, result in elements.items():
cli_out_write(f" {ref}", fg=Color.BRIGHT_BLUE)
if result is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming here that if the pkg signing plugin of the user only uses return, this means that it was correctly signed or verified

result = "Signed" if data.get("action") == "sign" else "Signature verified"
color = Color.BRIGHT_YELLOW if "warn" in result else Color.BRIGHT_WHITE
color = Color.BRIGHT_RED if "fail" in result else color
cli_out_write(f" :: {result}", fg=color)


def print_cache_sign_verify_json(data):
myjson = json.dumps(data, indent=4)
cli_out_write(myjson)


@conan_command(group="Consumer")
def cache(conan_api: ConanAPI, parser, *args):
"""
Expand Down Expand Up @@ -150,6 +169,66 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args):
ConanOutput().success("Integrity check: ok")


@conan_subcommand(formatters={"text": print_cache_sign_verify_text,
"json": print_cache_sign_verify_json})
def cache_sign(conan_api: ConanAPI, parser, subparser, *args):
"""
Sign packages with the Package Singing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to be signed")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to be signed")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, rrev="*", package_id="*", prev="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)
return conan_api.cache.sign(package_list)


@conan_subcommand(formatters={"text": print_cache_sign_verify_text,
"json": print_cache_sign_verify_json})
def cache_verify(conan_api: ConanAPI, parser, subparser, *args):
"""
Check the signature of packages with the Package Singing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to verify their signature")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to verify their signature")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, rrev="*", package_id="*", prev="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)
return conan_api.cache.verify(package_list)


@conan_subcommand(formatters={"text": print_list_text,
"json": print_list_json})
def cache_save(conan_api: ConanAPI, parser, subparser, *args):
Expand Down
186 changes: 161 additions & 25 deletions conan/internal/rest/pkg_sign.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,182 @@
import copy
import json
import os

from conan.api.output import ConanOutput
from conan.errors import ConanException
from conan.internal.cache.conan_reference_layout import METADATA
from conan.internal.cache.home_paths import HomePaths
from conan.internal.loader import load_python_file
from conan.internal.util.files import mkdir
from conan.internal.util.files import load, mkdir, save, sha256sum


class PkgSignaturesTools:

SIGN_SUMMARY_CONTENT = {
"provider": None,
"method": None,
"files": {}
}
SIGN_SUMMARY_FILENAME = "sign-summary.json"

def __init__(self, artifacts_folder, signature_folder):
self._artifacts_folder = artifacts_folder
self._signature_folder = signature_folder

def get_summary_file_path(self):
return os.path.join(self._signature_folder, self.SIGN_SUMMARY_FILENAME)

def is_pkg_signed(self):
try:
c = self.load_summary()
except FileNotFoundError:
return False
return bool(c.get("provider") and c.get("method"))

def create_summary_content(self):
"""
Creates the summary content as a dictionary for manipulation
@return: Dictionary with the summary content
"""
checksums = {}
for fname in os.listdir(self._artifacts_folder):
file_path = os.path.join(self._artifacts_folder, fname)
if os.path.isfile(file_path):
sha256 = sha256sum(file_path)
checksums[fname] = sha256
sorted_checksums = dict(sorted(checksums.items()))
content = copy.deepcopy(self.SIGN_SUMMARY_CONTENT)
Comment on lines +43 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is doing a checksum of all artifacts in the package?
What exactly for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed that this should be the way the plugin should work. Get the cheksums of all the contetns of the package, create a summary file with filenames and checkesums and use that file to sign the package

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe, this is the kind of thing that shouldn't belong then to user space? This is something that Conan should kind of enforce in the plugin? Or do we still envision other possible signing approaches that do not sign this file?

content["files"] = sorted_checksums
return content

def load_summary(self):
""""
Loads the summary file from the signature folder
"""
return json.loads(load(self.get_summary_file_path()))

def save_summary(self, content):
"""
Saves the content of the summary to the signature folder using SIGN_SUMMARY_FILENAME as the
file name
@param content: Content of the summary file
@return:
"""
assert content.get("provider")
assert content.get("method")
save(self.get_summary_file_path(), json.dumps(content))


class PkgSignaturesPlugin:
def __init__(self, cache, home_folder):
self._cache = cache
signer = HomePaths(home_folder).sign_plugin_path
if os.path.isfile(signer):
mod, _ = load_python_file(signer)
# TODO: At the moment it requires both methods sign and verify, but that might be relaxed
self._plugin_sign_function = mod.sign
self._plugin_verify_function = mod.verify
else:
self._plugin_sign_function = self._plugin_verify_function = None
self.sign_plugin_path = HomePaths(home_folder).sign_plugin_path
self._plugin_sign_function = self._plugin_verify_function = None
self._output = ConanOutput(scope="[Package signing plugin]")
if os.path.isfile(self.sign_plugin_path):
mod, _ = load_python_file(self.sign_plugin_path)
try:
self._plugin_sign_function = mod.sign
except AttributeError:
pass
try:
self._plugin_verify_function = mod.verify
except AttributeError:
pass

def sign(self, upload_data):
def sign(self, upload_data, context="upload"): # cache, upload,
results = {}
if self._plugin_sign_function is None:
return
self._output.error(f"sign() function not found in {self.sign_plugin_path}")
return results

def _sign(ref, files, folder):
def _sign(ref, files, folder, context="upload"):
output = ConanOutput(scope=f"[Package signing plugin]\n {ref.repr_notime()}\n :")
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
sign_tools = PkgSignaturesTools(folder, metadata_sign)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why instantiating here PkgSignaturesTools with those arguments, then passing it to self._plugin_sign_function() as an argument if the function can create the tools inside?

I am not convinced by this API, passing an object with data this way is not that common, in general Conan defines helpers/tools that users can instantiate and use them themselves, or provide an input-output interface, in which Conan would automatically manage the resource, in this case sign-summary.json. If this file must exist for any signing plugin, then why not mandating the return interface of sign() to return the necessary data so Conan can enforce the correct generation of sign-summary.json?

try:
result = self._plugin_sign_function(ref,
artifacts_folder=folder,
signature_folder=metadata_sign,
output=output,
sign_tools=sign_tools)
except (ConanException, AssertionError) as e:
result = _handle_failure(e, context, ref)
# Add files to the pkglist/bundle
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)
if result and context == "upload":
output.info(result)
return {ref.repr_notime(): result}

for rref, packages in upload_data.items():
recipe_bundle = upload_data.recipe_dict(rref)
if recipe_bundle["upload"]:
_sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export())
for pref in packages:
pkg_bundle = upload_data.package_dict(pref)
if pkg_bundle["upload"]:
_sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package())
if context == "upload":
for rref, packages in upload_data.items():
recipe_bundle = upload_data.recipe_dict(rref)
if recipe_bundle["upload"]:
_sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export())
for pref in packages:
pkg_bundle = upload_data.package_dict(pref)
if pkg_bundle["upload"]:
_sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package())
else:
for rref, packages in upload_data.items():
recipe_bundle = upload_data.recipe_dict(rref)
if recipe_bundle:
result = _sign(rref, {}, self._cache.recipe_layout(rref).download_export(),
context)
results.update(result)
for pref in packages:
pkg_bundle = upload_data.package_dict(pref)
if pkg_bundle:
result = _sign(pref, {}, self._cache.pkg_layout(pref).download_package(),
context)
results.update(result)
return results

def verify(self, ref, folder, files):
def verify(self, ref, folder, files, context="install"):
output = ConanOutput(scope=f"[Package signing plugin]\n {ref.repr_notime()}\n :")
if self._plugin_verify_function is None:
return
self._output.error(f"verify() function not found in {self.sign_plugin_path}")
return {}
metadata_sign = os.path.join(folder, METADATA, "sign")
self._plugin_verify_function(ref, artifacts_folder=folder, signature_folder=metadata_sign,
files=files)
sign_tools = PkgSignaturesTools(folder, metadata_sign)
try:
result = self._plugin_verify_function(ref, artifacts_folder=folder,
signature_folder=metadata_sign, files=files,
output=output, sign_tools=sign_tools)
except (ConanException, AssertionError) as e:
result = _handle_failure(e, context, ref)
if result and context == "install":
output.info(result)
return {ref.repr_notime(): result}

def verify_pkglist(self, pkg_list, context="cache"): # cache, install, upload
results = {}
if self._plugin_verify_function is None:
self._output.error(f"verify() function not found in {self.sign_plugin_path}")
return results

for rref, packages in pkg_list.items():
recipe_bundle = pkg_list.recipe_dict(rref)
if recipe_bundle:
rref_folder = self._cache.recipe_layout(rref).download_export()
result = self.verify(rref, rref_folder, os.listdir(rref_folder), context)
results.update(result)
for pref in packages:
pkg_bundle = pkg_list.package_dict(pref)
if pkg_bundle:
pref_folder = self._cache.pkg_layout(pref).download_package()
result = self.verify(pref, pref_folder, os.listdir(pref_folder), context)
results.update(result)
return results


def _handle_failure(exception, action, ref):
exception_msg = str(exception)
if action in ["upload", "install"]:
# TODO: Mark folder with set_dirty(artifacts_folder)
raise ConanException(f"\n[Package signing plugin]\n {ref.repr_notime()}\n :: {exception_msg}")
else:
error_msg = f"Failed: {exception_msg}" if exception_msg else "Failed"
return error_msg
Loading
Loading