Skip to content

Commit

Permalink
Adds local signing for metadata
Browse files Browse the repository at this point in the history
This enables the use of a locally stored metadata to create a
signature. It also saves the created signature locally.

Closes #423
  • Loading branch information
enyinna1234 committed Nov 16, 2023
1 parent 7c025d4 commit cb5872d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
81 changes: 51 additions & 30 deletions repository_service_tuf/cli/admin/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT
import copy
import json
import sys
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -112,15 +113,16 @@
# Metadata Signing
Metadata signing allows sending signature of pending Repository Service for TUF
(RSTUF) role metadata to an existing RSTUF API deployment.
(RSTUF) role metadata to an existing RSTUF API deployment. Or saving the
signature locally.
The Metadata Signing does the following steps:
- retrieves the metadata pending for signatures from RSTUF API
- retrieves the metadata pending for signatures from RSTUF API or local file
- selects the metadata role for signing
- loads the private key for signing
After loading the key it will sign the role metadata and send the request to
the RSTUF API with the signature.
the RSTUF API with the signature, or save the signature locally.
"""


Expand Down Expand Up @@ -599,26 +601,32 @@ def update(


def _get_pending_roles(
settings: Any, api_server: Optional[str]
settings: Any,
api_server: Optional[str],
offline_metadata: Optional[click.File],
) -> Dict[str, Any]:
if api_server:
settings.SERVER = api_server
response_data: Dict[str, Any]
if offline_metadata:
response_data = json.load(offline_metadata) # type: ignore
else:
if api_server:
settings.SERVER = api_server

if settings.get("SERVER") is None:
api_server = prompt.Prompt.ask("\n[cyan]API[/] URL address")
settings.SERVER = api_server
if settings.get("SERVER") is None:
api_server = prompt.Prompt.ask("\n[cyan]API[/] URL address")
settings.SERVER = api_server

response = request_server(
settings.SERVER, URL.METADATA_SIGN.value, Methods.GET
)
if response.status_code != 200:
raise click.ClickException(
f"Failed to retrieve metadata for signing. Error: {response.text}"
response = request_server(
settings.SERVER, URL.METADATA_SIGN.value, Methods.GET
)
if response.status_code != 200:
raise click.ClickException(
f"Failed to fetch metadata for signing. Error: {response.text}"
)

response_data: Dict[str, Any] = response.json().get("data")
if response_data is None:
raise click.ClickException(response.text)
response_data = response.json().get("data")
if response_data is None:
raise click.ClickException(response.text)

pending_roles: Dict[str, Any] = response_data.get("metadata", {})
if len(pending_roles) == 0:
Expand Down Expand Up @@ -689,16 +697,22 @@ def _sign_metadata(role_info: MetadataInfo, rstuf_key: RSTUFKey) -> Signature:
required=False,
is_flag=True,
)
@click.argument("offline_metadata", required=False, type=click.File("r"))
@click.pass_context
def sign(context, api_server: Optional[str], delete: Optional[bool]) -> None:
def sign(
context,
api_server: Optional[str],
delete: Optional[bool],
offline_metadata: Optional[click.File],
) -> None:
"""
Start metadata signature.
"""
console.print(markdown.Markdown(METADATA_SIGNING), width=100)

settings = context.obj["settings"]

pending_roles = _get_pending_roles(settings, api_server)
pending_roles = _get_pending_roles(settings, api_server, offline_metadata)
role_info: MetadataInfo
rolename: str

Expand Down Expand Up @@ -743,15 +757,22 @@ def sign(context, api_server: Optional[str], delete: Optional[bool]) -> None:

rstuf_key = _get_signing_key(role_info)
signature = _sign_metadata(role_info, rstuf_key)

payload = {"role": rolename, "signature": signature.to_dict()}
console.print("\nSending signature")
task_id = send_payload(
settings,
URL.METADATA_SIGN.value,
payload,
"Metadata sign accepted.",
"Metadata sign",
)
task_status(task_id, settings, "Metadata sign status:")

if offline_metadata:
with open("signature.json", "w") as write_signature:
json.dump(payload, write_signature)

else:
console.print("\nSending signature")
task_id = send_payload(
settings,
URL.METADATA_SIGN.value,
payload,
"Metadata sign accepted.",
"Metadata sign",
)
task_status(task_id, settings, "Metadata sign status:")

console.print_json(json.dumps(payload))
console.print("\nMetadata Signed! 🔑\n")
27 changes: 27 additions & 0 deletions tests/unit/cli/admin/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,3 +1210,30 @@ def test_metadata_sign_delete(self, client, test_context):
"Signing process status: ",
)
]

def test_metadata_sign_offline(self, client, test_context):
input_step = [
"root", # Choose a metadata to sign [root]
"y", # Do you still want to sign root? [y]
"Jimi Hendrix", # Choose a private key to load [Jimi Hendrix]
"", # Choose Jimi Hendrix key type [ed25519/ecdsa/rsa]
"tests/files/key_storage/JimiHendrix.key", # Enter the Jimi Hendrix`s private key path # noqa
"strongPass", # Enter the Jimi Hendrix`s private key password
]
with open("tests/files/das-root.json", "r") as file:
das_root = file.read()
fake_offline_md = {"metadata": json.loads(das_root)}
json.load = pretend.call_recorder(lambda *a: fake_offline_md)
json.dump = pretend.call_recorder(lambda *a: "fake_write")
test_result = client.invoke(
metadata.sign,
["tests/files/das-root.json"],
input="\n".join(input_step),
obj=test_context,
catch_exceptions=False,
)

assert test_result.exit_code == 0, test_result.output
assert "Metadata Signed! 🔑" in test_result.output
assert "SIGNING KEYS" in test_result.output
assert "PENDING KEYS" in test_result.output

0 comments on commit cb5872d

Please sign in to comment.