diff --git a/core_directory/management/commands/init_db.py b/core_directory/management/commands/init_db.py index ac638b8..ab9dff0 100644 --- a/core_directory/management/commands/init_db.py +++ b/core_directory/management/commands/init_db.py @@ -4,28 +4,16 @@ and stores both local file content and GitHub URLs in the database using a serializer. Utilizes environment variables for configuration. """ - -import os -import tempfile -import shutil -import stat - -from dataclasses import dataclass - -from io import BytesIO -from git import Repo -from git.exc import GitCommandError - -from github import Github, GithubException -from github.Auth import Token as GitHubAuthToken - from django.db import IntegrityError from django.core.exceptions import ValidationError as DjangoValidationError + +from django.core.files.base import ContentFile from django.core.management.base import BaseCommand from rest_framework.exceptions import ValidationError as DRFValidationError from core_directory.serializers import CoreSerializer +from core_directory.storages.github import GitHubStorage from core_directory.models import CorePackage class Command(BaseCommand): @@ -62,124 +50,71 @@ def handle(self, *args, **kwargs): """ if not CorePackage.objects.exists(): self.stdout.write(self.style.SUCCESS('Database is empty. Initializing with data...')) - self.download_and_load_data() + self.initialize_from_storage() self.stdout.write(self.style.SUCCESS('Database initialized successfully.')) else: self.stdout.write(self.style.WARNING('Database already initialized.')) - def get_repo_info(self): + def initialize_from_storage(self): """ - Retrieves repository information and default branch using the GitHub API. + Loads core and signature files from the configured GitHub repository into the database. - Returns: - tuple: (repo_name, access_token, default_branch) if successful, otherwise (None, None, None). - """ - repo_name = os.getenv('GITHUB_REPO') - access_token = os.getenv('GITHUB_ACCESS_TOKEN') - if not repo_name: - self.stdout.write(self.style.ERROR('GITHUB_REPO environment variable is not set.')) - return None, None, None - if not access_token: - self.stdout.write(self.style.ERROR('GITHUB_ACCESS_TOKEN environment variable is not set.')) - return None, None, None - g = Github(auth=GitHubAuthToken(os.getenv('GITHUB_ACCESS_TOKEN'))) - try: - repo = g.get_repo(repo_name) - default_branch = repo.default_branch - return repo_name, access_token, default_branch - except GithubException as e: - self.stdout.write(self.style.ERROR(f'GitHub API error fetching repo info: {e}')) - return None, None, None - except AttributeError as e: - self.stdout.write(self.style.ERROR(f'Attribute error fetching repo info: {e}')) - return None, None, None - - def download_and_load_data(self): - """ - Clones the GitHub repo locally and loads .core and .sig files from the root directory into the database. + This method uses the GitHubStorage backend to list and retrieve all `.core` files (and their + corresponding `.sig` files, if present) from the root of the repository. For each core file, + it creates a Django ContentFile object and passes it, along with the signature file (if available), + to the CoreSerializer for validation and saving. If the storage backend supports cache prefill, + the cache is prefilled before processing files. + + Any errors encountered during validation or saving are printed to the command output. + + Raises: + RuntimeError: If the storage cache cannot be prefilled or if required files are missing. + FileNotFoundError: If a core or signature file cannot be found in the repository. - Constructs GitHub URLs for each file and stores both the local file content and the GitHub URL - in the database using the CoreSerializer. + Side Effects: + - Populates the database with CorePackage and related objects for each valid core file. + - Prints progress and error messages to the command output. """ - @dataclass - class RepositoryData: - """ - Container for GitHub repository authentication and metadata. - - Attributes: - name (str): The full repository name in 'owner/repo' format. - access_token (str): The GitHub access token for authentication. - default_branch (str): The default branch of the repository. - - Properties: - url (str): The HTTPS URL of the repository. - url_with_access_token (str): The HTTPS URL of the repository including the access token. - base_url_raw_files (str): The base URL for downloading raw files from the repository's default branch. - """ - name: str - access_token: str - default_branch: str - - @property - def url(self): - """Returns the URL of the repository.""" - return f"https://github.com/{self.name}.git" - - @property - def url_with_access_token(self): - """Returns the URL of the repository including access token.""" - return f"https://{self.access_token}@github.com/{self.name}.git" - - @property - def base_url_raw_files(self): - """Returns the base URL for raw file download from github.""" - return f"https://raw.githubusercontent.com/{self.name}/refs/heads/{self.default_branch}" - - repo = RepositoryData(*self.get_repo_info()) - if not repo.name: - return - - temp_dir = tempfile.mkdtemp() - self.stdout.write(f'Cloning repository {repo.url} to {temp_dir}...') - try: - Repo.clone_from(repo.url_with_access_token, temp_dir) - except GitCommandError as e: - self.stdout.write(self.style.ERROR(f'Git error cloning repository: {e}')) - shutil.rmtree(temp_dir) - return - except OSError as e: - self.stdout.write(self.style.ERROR(f'Filesystem error cloning repository: {e}')) - shutil.rmtree(temp_dir) - return - - # Only process files in the root directory - files_in_root = os.listdir(temp_dir) + + storage = GitHubStorage() + + # Prefill cache if supported + prefill = getattr(storage, "prefill_cache", None) + if callable(prefill): + self.stdout.write('Prefilling storage cache from GitHub zip archive...') + try: + prefill() + self.stdout.write(self.style.SUCCESS('Cache prefilled.')) + except RuntimeError as e: + self.stdout.write(self.style.ERROR(f'Error during cache prefill: {e}')) + + _, files_in_root = storage.listdir('') core_files = [f for f in files_in_root if f.endswith('.core')] sig_files = {f.removesuffix('.sig'): f for f in files_in_root if f.endswith('.sig')} for core_filename in core_files: self.stdout.write(f'Processing {core_filename}...') - with open(os.path.join(temp_dir, core_filename), 'rb') as f: - core_file_object = BytesIO(f.read()) + with storage.open(core_filename, 'rb') as f: + core_file_object = ContentFile(f.read()) core_file_object.name = core_filename - core_file_object.size = core_file_object.getbuffer().nbytes + core_file_object.size = core_file_object.size data = { 'core_file': core_file_object, - 'core_url': f"{repo.base_url_raw_files}/{core_filename}" + 'core_url': storage.url(core_filename) } # Attach signature file if present if core_filename in sig_files: sig_filename = sig_files[core_filename] - with open(os.path.join(temp_dir, sig_filename), 'rb') as f: - sig_file_object = BytesIO(f.read()) + with storage.open(sig_filename, 'rb') as f: + sig_file_object = ContentFile(f.read()) sig_file_object.name = sig_filename - sig_file_object.size = sig_file_object.getbuffer().nbytes - data['sig_file'] = sig_file_object - data['sig_url'] = f"{repo.base_url_raw_files}/{sig_filename}" + sig_file_object.size = sig_file_object.size + data['signature_file'] = sig_file_object + data['sig_url'] = storage.url(sig_filename) # Use the serializer to create database entries serializer = CoreSerializer(data=data) @@ -191,18 +126,3 @@ def base_url_raw_files(self): self.stdout.write(self.style.ERROR(f'Error creating database object for {core_filename}: {e}')) else: self.stdout.write(self.style.ERROR(f'Errors in {core_filename}: {serializer.errors}')) - - shutil.rmtree(temp_dir, onexc=self._on_rm_exc) - - @staticmethod - def _on_rm_exc(func, path, excinfo): - """ - Error handler for `shutil.rmtree` using the `onexc` parameter (Python 3.12+). - - If the removal failed, make the file writable and try again. - """ - if not os.access(path, os.W_OK): - os.chmod(path, stat.S_IWUSR) - func(path) - else: - raise excinfo[1] diff --git a/core_directory/migrations/0001_initial.py b/core_directory/migrations/0001_initial.py index 01120a9..47e55a9 100644 --- a/core_directory/migrations/0001_initial.py +++ b/core_directory/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.2.1 on 2025-09-10 14:44 +# Generated by Django 5.2.1 on 2025-10-09 17:04 import django.db.models.deletion import utils.spdx @@ -24,8 +24,8 @@ class Migration(migrations.Migration): ('version_minor', models.IntegerField(help_text="Minor version number (e.g. 2 for version '1.2.3-abc').")), ('version_patch', models.IntegerField(help_text="Patch version number (e.g. 3 for version '1.2.3-abc').")), ('version_prerelease', models.CharField(blank=True, help_text="Pre-release label (e.g. 'abc' for version '1.2.3-abc').", max_length=20, null=True)), - ('core_url', models.URLField(help_text='URL to download the .core file from GitHub or another source.')), - ('sig_url', models.URLField(blank=True, help_text='Optional URL to download the .sig file for signature verification.', null=True)), + ('core_file', models.FileField(help_text='The FuseSoC .core file for this package. This file is required and contains the core metadata.', upload_to='')), + ('signature_file', models.FileField(blank=True, help_text="Optional signature (.sig) file for verifying the core file's authenticity.", null=True, upload_to='')), ('description', models.CharField(help_text='A short description of the core package.', max_length=255)), ('spdx_license', models.CharField(blank=True, choices=utils.spdx.get_spdx_choices, help_text='SPDX license identifier (e.g., MIT, GPL-3.0-or-later, or LicenseRef-...)', max_length=64, null=True, validators=[utils.spdx.validate_spdx])), ], diff --git a/core_directory/models.py b/core_directory/models.py index 3de4526..3ec3ffd 100644 --- a/core_directory/models.py +++ b/core_directory/models.py @@ -114,13 +114,15 @@ class CorePackage(UniqueSanitizedNameMixin): null=True, help_text="Pre-release label (e.g. 'abc' for version '1.2.3-abc')." ) - core_url = models.URLField( - help_text='URL to download the .core file from GitHub or another source.' + core_file = models.FileField( + blank=False, + null=False, + help_text='The FuseSoC .core file for this package. This file is required and contains the core metadata.' ) - sig_url = models.URLField( + signature_file = models.FileField( blank=True, null=True, - help_text='Optional URL to download the .sig file for signature verification.' + help_text='Optional signature (.sig) file for verifying the core file\'s authenticity.' ) description = models.CharField( max_length=255, @@ -142,7 +144,7 @@ def is_signed(self): Returns True if sig_url is set and valid, False otherwise. You can add more validation logic if needed. """ - return bool(self.sig_url) + return bool(self.signature_file) @property def sanitized_vlnv(self): diff --git a/core_directory/serializers.py b/core_directory/serializers.py index 33678ee..9a4de85 100644 --- a/core_directory/serializers.py +++ b/core_directory/serializers.py @@ -34,11 +34,13 @@ from jsonschema import validate, ValidationError, SchemaError from fusesoc.capi2.coreparser import Core2Parser +from utils.files import filefield_value_for_storage from utils.sanitize import sanitize_string from utils.spdx import validate_spdx from utils.vlnv import VLNV from .models import Project, Vendor, Library, CorePackage, Fileset, FilesetDependency, Target, TargetConfiguration + class CoreSerializer(serializers.Serializer): """ Serializer for validating core and signature files. @@ -60,12 +62,8 @@ class CoreSerializer(serializers.Serializer): """ # User-uploaded files - core_file = serializers.FileField() - signature_file = serializers.FileField(required=False) - - # Optionally, allow user to provide URLs - core_url = serializers.URLField(required=False) - sig_url = serializers.URLField(required=False, allow_null=True) + core_file = serializers.FileField(required=True) + signature_file = serializers.FileField(required=False, allow_null=True) # Read-only fields extracted from the core file vlnv_name = serializers.CharField(read_only=True, max_length=255) @@ -77,6 +75,14 @@ class CoreSerializer(serializers.Serializer): description = serializers.CharField(read_only=True, max_length=255, required=False) spdx_license = serializers.CharField(read_only=True, max_length=64) + class Meta: + model = CorePackage + fields = [ + 'core_file', 'sig_file', + 'vlnv_name', 'sanitized_name', 'vendor_name', 'library_name', 'project_name', + 'version', 'description', 'spdx_license' + ] + def validate_core_file(self, value): """ Validates the core file's extension and size. @@ -207,7 +213,7 @@ def validate(self, attrs): return attrs - def create(self, validated_data): + def create(self, validated_data): # pylint: disable=too-many-locals with transaction.atomic(): # Get or create Vendor, Library, Project vendor, _ = Vendor.objects.get_or_create(name=validated_data['vendor_name']) @@ -218,15 +224,30 @@ def create(self, validated_data): name=validated_data['project_name'] ) + # Prepare file field values + core_file_obj = validated_data['core_file'] + core_file_name = core_file_obj.name + core_file_obj.name = f"{validated_data['sanitized_name']}.core" + + sig_file_obj = validated_data.get('signature_file') + sig_filename = sig_file_obj.name if sig_file_obj else None + if sig_file_obj: + sig_file_obj.name = f"{validated_data['sanitized_name']}.core.sig" + + # Use the helper to avoid duplicate uploads + core_file_field_value = filefield_value_for_storage(core_file_name, core_file_obj) + sig_file_field_value = filefield_value_for_storage(sig_filename, sig_file_obj) if sig_file_obj else None + + # Create an save the model instance instance = CorePackage.objects.create( project=project, vlnv_name=validated_data['vlnv_name'], version=validated_data['version'], - core_url=validated_data.get('core_url'), - sig_url=validated_data.get('sig_url'), description=validated_data.get('description'), - spdx_license=validated_data.get('spdx_license') + spdx_license=validated_data.get('spdx_license'), + core_file=core_file_field_value, + signature_file=sig_file_field_value ) # Create Filesets and their Dependencies diff --git a/core_directory/storages/__init__.py b/core_directory/storages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core_directory/storages/dummy_storage.py b/core_directory/storages/dummy_storage.py new file mode 100644 index 0000000..442fcc9 --- /dev/null +++ b/core_directory/storages/dummy_storage.py @@ -0,0 +1,61 @@ +""" +DummyStorage for Django tests. + +This in-memory storage backend is used to ensure that no files are written to disk +or external services during tests. All file operations are performed in memory. + +Usage: + Set DEFAULT_FILE_STORAGE = 'core_directory.storages.dummy_storage.DummyStorage' + in your test settings to use this storage for all FileFields during tests. +""" + +from django.core.files.storage import Storage +from django.core.files.base import ContentFile + +class DummyStorage(Storage): + """ + In-memory Django storage backend for use in tests. + """ + _files = {} + + def _open(self, name, _mode='rb'): # mode is required by Django Storage API + return ContentFile(self._files.get(name, b''), name=name) + + def _save(self, name, content): + content.seek(0) + self._files[name] = content.read() + return name + + def exists(self, name): + return name in self._files + + def delete(self, name): + if name in self._files: + del self._files[name] + + def url(self, name): + return f"/dummy/{name}" + + def get_accessed_time(self, name): + """Not supported for DummyStorage.""" + raise NotImplementedError("get_accessed_time is not supported by DummyStorage.") + + def get_created_time(self, name): + """Not supported for DummyStorage.""" + raise NotImplementedError("get_created_time is not supported by DummyStorage.") + + def get_modified_time(self, name): + """Not supported for DummyStorage.""" + raise NotImplementedError("get_modified_time is not supported by DummyStorage.") + + def path(self, name): + """Not supported for DummyStorage.""" + raise NotImplementedError("path() is not available for DummyStorage.") + + def listdir(self, path): + """Not supported for DummyStorage.""" + raise NotImplementedError("listdir() is not available for DummyStorage.") + + def size(self, name): + """Not supported for DummyStorage.""" + raise NotImplementedError("size() is not available for DummyStorage.") diff --git a/core_directory/storages/github.py b/core_directory/storages/github.py new file mode 100644 index 0000000..8096a31 --- /dev/null +++ b/core_directory/storages/github.py @@ -0,0 +1,253 @@ +""" +Django storage backend for GitHub repositories. + +This module provides a Django Storage class that stores files in a GitHub repository, +with optional local caching for efficient repeated access. It supports overwriting files, +deleting files, and retrieving direct raw URLs. The storage backend is fully compatible +with Django's standard storage API, making it interchangeable with other backends. + +Configuration is via environment variables or constructor arguments: + - GITHUB_REPO: GitHub repository in 'owner/repo' format + - GITHUB_ACCESS_TOKEN: Personal access token with repo access + - GITHUB_BRANCH: Branch to use (default: 'main') + - GITHUB_STORAGE_CACHE_DIR: Optional local cache directory + +Example usage: + from core_directory.storage import GitHubStorage + storage = GitHubStorage() + storage.save('foo.txt', ContentFile(b'hello')) + with storage.open('foo.txt') as f: + print(f.read()) + storage.delete('foo.txt') +""" + +import os +import zipfile +import shutil +import tempfile +import requests + +from django.core.files.storage import Storage +from django.core.files.base import ContentFile +from github import Github, GithubException, UnknownObjectException +from github.Auth import Token as GitHubAuthToken + +class GitHubStorage(Storage): + """ + Django storage backend for GitHub repository files, with overwrite and optional local caching. + Implements only the standard Django storage API methods. + """ + + def __init__(self, repo_name=None, access_token=None, branch=None, cache_dir=None): + self.repo_name = repo_name or os.getenv('GITHUB_REPO') + self.access_token = access_token or os.getenv('GITHUB_ACCESS_TOKEN') + self.branch = branch or os.getenv('GITHUB_BRANCH', 'main') + self.cache_dir = cache_dir or os.getenv('GITHUB_STORAGE_CACHE_DIR') + if not self.repo_name or not self.access_token: + raise ValueError("GITHUB_REPO and GITHUB_ACCESS_TOKEN must be set") + self._github = Github(auth=GitHubAuthToken(self.access_token)) + self._repo = self._github.get_repo(self.repo_name) + if self.cache_dir: + os.makedirs(self.cache_dir, exist_ok=True) + + def _cache_path(self, name): + if not self.cache_dir: + return None + return os.path.join(self.cache_dir, name) + + def _open(self, name, _mode='rb'): + """ + Retrieve file content from cache if available, else from GitHub and cache it. + """ + cache_path = self._cache_path(name) + if cache_path and os.path.exists(cache_path): + with open(cache_path, 'rb') as f: + return ContentFile(f.read(), name=name) + try: + file_content = self._repo.get_contents(name, ref=self.branch) + content = file_content.decoded_content + if cache_path: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, 'wb') as f: + f.write(content) + return ContentFile(content, name=name) + except UnknownObjectException as e: + raise FileNotFoundError(f"{name} not found in GitHub repo") from e + except GithubException as e: + raise IOError(f"GitHub error: {e}") from e + + def _save(self, name, content): + """ + Save or overwrite a file in the GitHub repo. + If the file exists, it is updated; otherwise, it is created. + """ + content.seek(0) + data = content.read() + if isinstance(data, bytes): + data = data.decode('utf-8') + try: + # Try to get the file (to update) + file_content = self._repo.get_contents(name, ref=self.branch) + self._repo.update_file( + name, + f"Update {name} via Django storage", + data, + file_content.sha, + branch=self.branch + ) + except UnknownObjectException: + # File does not exist, create it + self._repo.create_file( + name, + f"Add {name} via Django storage", + data, + branch=self.branch + ) + except GithubException as e: + raise IOError(f"GitHub error: {e}") from e + + # Optionally update cache + cache_path = self._cache_path(name) + if cache_path: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, 'wb') as f: + f.write(data.encode('utf-8')) + return name + + def delete(self, name): + """ + Delete a file from GitHub and remove it from the local cache if present. + """ + try: + file_content = self._repo.get_contents(name, ref=self.branch) + self._repo.delete_file( + name, + f"Delete {name} via Django storage", + file_content.sha, + branch=self.branch + ) + except UnknownObjectException: + pass # Already deleted + except GithubException as e: + raise IOError(f"GitHub error: {e}") from e + + # Invalidate cache + cache_path = self._cache_path(name) + if cache_path and os.path.exists(cache_path): + try: + os.remove(cache_path) + except OSError: + pass # Ignore errors if file is already gone + + def exists(self, name): + """ + Check if a file exists in the cache or on GitHub. + """ + cache_path = self._cache_path(name) + if cache_path and os.path.exists(cache_path): + return True + try: + self._repo.get_contents(name, ref=self.branch) + return True + except UnknownObjectException: + return False + + def get_available_name(self, name, max_length=None): + self.delete(name) + return name + + def url(self, name): + """ + Return the direct raw URL for the file on GitHub. + """ + return f"https://raw.githubusercontent.com/{self.repo_name}/{self.branch}/{name}" + + def size(self, name): + """ + Return the size of the file in bytes. + """ + try: + file_content = self._repo.get_contents(name, ref=self.branch) + return file_content.size + except UnknownObjectException: + return 0 + + def listdir(self, path): + """ + List files and directories in the given path (only root supported). + """ + if path not in ('', '/'): + raise NotImplementedError("Only root directory listing is supported") + contents = self._repo.get_contents('', ref=self.branch) + files = [c.name for c in contents if c.type == 'file'] + dirs = [c.name for c in contents if c.type == 'dir'] + return dirs, files + + def clear_cache(self): + """ + Remove all files and subdirectories from the given cache directory. + + Use this to ensure the cache is empty before prefilling, so only current repo files are cached. + """ + for filename in os.listdir(self.cache_dir): + file_path = os.path.join(self.cache_dir, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except OSError as e: + print(f'Failed to delete {file_path}. Reason: {e}') + + def prefill_cache(self): + """ + Download the GitHub repo as a zip and prefill the cache directory with its files. + The cache is cleared first to remove any old or stale files. + Only works if cache_dir is set. + """ + if not self.cache_dir: + raise RuntimeError("No cache_dir set for GitHubStorage; cannot prefill cache.") + + self.clear_cache() + + zip_url = f"https://api.github.com/repos/{self.repo_name}/zipball/{self.branch}" + headers = {'Authorization': f'token {self.access_token}'} + response = requests.get(zip_url, headers=headers, stream=True, timeout=30) + if response.status_code != 200: + raise RuntimeError(f"Failed to download repo archive: {response.status_code} {response.text}") + + with tempfile.TemporaryDirectory() as temp_dir: + zip_path = os.path.join(temp_dir, 'repo.zip') + with open(zip_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(temp_dir) + extracted_dirs = [d for d in os.listdir(temp_dir) if os.path.isdir(os.path.join(temp_dir, d))] + if not extracted_dirs: + raise RuntimeError("No directory found in extracted archive.") + repo_root = os.path.join(temp_dir, extracted_dirs[0]) + files_in_root = os.listdir(repo_root) + for filename in files_in_root: + src_path = os.path.join(repo_root, filename) + if os.path.isfile(src_path): + cache_path = os.path.join(self.cache_dir, filename) + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(src_path, 'rb') as src, open(cache_path, 'wb') as dst: + dst.write(src.read()) + + def get_accessed_time(self, name): + """Not supported for GitHubStorage.""" + raise NotImplementedError("get_accessed_time is not supported by GitHubStorage.") + + def get_created_time(self, name): + """Not supported for GitHubStorage.""" + raise NotImplementedError("get_created_time is not supported by GitHubStorage.") + + def get_modified_time(self, name): + """Not supported for GitHubStorage.""" + raise NotImplementedError("get_modified_time is not supported by GitHubStorage.") + + def path(self, name): + """Not supported for GitHubStorage.""" + raise NotImplementedError("path() is not available for GitHubStorage.") diff --git a/core_directory/templates/web_ui/core_detail.html b/core_directory/templates/web_ui/core_detail.html index 19c088a..3607eed 100644 --- a/core_directory/templates/web_ui/core_detail.html +++ b/core_directory/templates/web_ui/core_detail.html @@ -109,11 +109,11 @@
Files
- + .core {% if core.is_signed %} - + .sig {% endif %} diff --git a/core_directory/templates/web_ui/core_package_summary.html b/core_directory/templates/web_ui/core_package_summary.html index cc5020d..142eac4 100644 --- a/core_directory/templates/web_ui/core_package_summary.html +++ b/core_directory/templates/web_ui/core_package_summary.html @@ -16,11 +16,11 @@ - + .core {% if core_package.is_signed %} - + .sig {% endif %} diff --git a/core_directory/tests/api/test_cores.py b/core_directory/tests/api/test_cores.py index ca5aa03..fbd3e88 100644 --- a/core_directory/tests/api/test_cores.py +++ b/core_directory/tests/api/test_cores.py @@ -30,7 +30,7 @@ def test_multiple_cores_success(client): version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/core1", + core_file="core1", description="desc" ) CorePackage.objects.create( @@ -40,7 +40,7 @@ def test_multiple_cores_success(client): version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/core2_v1.0.0", + core_file="core2_v1.0.0", description="desc" ) CorePackage.objects.create( @@ -50,7 +50,7 @@ def test_multiple_cores_success(client): version_major=0, version_minor=1, version_patch=0, - core_url="https://example.com/core2_v0.1.0", + core_file="core2_v0.1.0", description="desc" ) @@ -74,7 +74,7 @@ def test_cores_with_filter(client): version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/foo_core", + core_file="foo_core", description="desc" ) cp2 = CorePackage.objects.create( @@ -84,7 +84,7 @@ def test_cores_with_filter(client): version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/bar_core", + core_file="bar_core", description="desc" ) diff --git a/core_directory/tests/api/test_get_core.py b/core_directory/tests/api/test_get_core.py index 5fed934..acdbc28 100644 --- a/core_directory/tests/api/test_get_core.py +++ b/core_directory/tests/api/test_get_core.py @@ -1,7 +1,18 @@ import pytest +import io from django.urls import reverse + +from django.core.files.storage import default_storage, FileSystemStorage from core_directory.models import Vendor, Library, Project, CorePackage + +@pytest.fixture(autouse=True) +def patch_corepackage_storage(settings): + from ...storages.dummy_storage import DummyStorage + settings.DEFAULT_FILE_STORAGE = 'path.to.dummy_storage.DummyStorage' + CorePackage._meta.get_field('core_file').storage = DummyStorage() + CorePackage._meta.get_field('signature_file').storage = DummyStorage() + @pytest.mark.django_db def test_getcore_success(client, mocker): # Set up test data @@ -15,15 +26,12 @@ def test_getcore_success(client, mocker): version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/foo.core", + core_file="foo.core", description="desc" ) - - # Mock requests.get to return a fake file - mock_response = mocker.Mock() - mock_response.status_code = 200 - mock_response.content = b"core file content" - mocker.patch("requests.get", return_value=mock_response) + # Patch the storage used by the FileField + storage = CorePackage._meta.get_field('core_file').storage + mocker.patch.object(storage, 'open', return_value=io.BytesIO(b"core file content")) url = reverse('core_directory:core_get') response = client.get(url, {"core": "acme:lib1:foo:1.0.0"}) @@ -48,27 +56,25 @@ def test_getcore_file_not_found(client, mocker): vendor = Vendor.objects.create(name="Acme") library = Library.objects.create(vendor=vendor, name="Lib1") project = Project.objects.create(vendor=vendor, library=library, name="foo", description="desc") - core_package = CorePackage.objects.create( + mocker.patch.object(default_storage, 'open', side_effect=FileNotFoundError("No such file")) + CorePackage.objects.create( project=project, vlnv_name="acme:lib1:foo:1.0.0", version="1.0.0", version_major=1, version_minor=0, version_patch=0, - core_url="https://example.com/foo.core", + core_file="foo.core", description="desc" ) - # Mock requests.get to simulate file not found (404) - mock_response = mocker.Mock() - mock_response.status_code = 404 - mock_response.content = b"" - mocker.patch("requests.get", return_value=mock_response) + storage = CorePackage._meta.get_field('core_file').storage + mocker.patch.object(storage, 'open', side_effect=FileNotFoundError("No such file")) url = reverse('core_directory:core_get') response = client.get(url, {"core": "acme:lib1:foo:1.0.0"}) assert response.status_code == 404 - assert b"not found" in response.content or b"not found" in response.json().get("error", "").lower() + assert b"not available" in response.content or b"not available" in response.json().get("error", "").lower() @pytest.mark.django_db def test_getcore_missing_param(client): diff --git a/core_directory/tests/api/test_publish.py b/core_directory/tests/api/test_publish.py index c2924c2..a8128a5 100644 --- a/core_directory/tests/api/test_publish.py +++ b/core_directory/tests/api/test_publish.py @@ -4,172 +4,171 @@ from django.urls import reverse from django.core.files.uploadedfile import SimpleUploadedFile +from django.core.files.storage import default_storage from core_directory.models import Vendor, Library, Project, CorePackage +import pathlib + +FIXTURES = pathlib.Path(__file__).parent.parent / "fixtures" + +@pytest.fixture(autouse=True) +def patch_corepackage_storage(settings): + from ...storages.dummy_storage import DummyStorage + settings.DEFAULT_FILE_STORAGE = 'path.to.dummy_storage.DummyStorage' + CorePackage._meta.get_field('core_file').storage = DummyStorage() + CorePackage._meta.get_field('signature_file').storage = DummyStorage() + +def get_core_sig_pairs(directory): + for core_file in directory.glob("*.core"): + sig_file = core_file.with_suffix(".sig") + yield (core_file, sig_file if sig_file.exists() else None) + +# Precompute pairs and ids for valid and invalid +valid_pairs = list(get_core_sig_pairs(FIXTURES / "valid")) +valid_ids = [f"valid/{core_path.name}" for core_path, _ in valid_pairs] + +invalid_pairs = list(get_core_sig_pairs(FIXTURES / "invalid")) +invalid_ids = [f"invalid/{core_path.name}" for core_path, _ in invalid_pairs] + @pytest.mark.django_db -def test_publish_success(client, mocker): +def test_publish_no_core_file(client, mocker): url = reverse('core_directory:publish') - # Mock serializer - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.validated_data = { - "vlnv_name": "vendor:lib:core:1.0.0", - "core_file": SimpleUploadedFile("test.core", b"dummy"), - "sanitized_name": "core", - "signature_file": None, - } - # Mock github repo - mock_repo = mocker.Mock() - # Define a mock exception that matches the view's except block - class GithubException(Exception): - pass - mock_repo.get_contents.side_effect = GithubException() - mock_repo.create_file.return_value = {"content": mocker.Mock(download_url="https://example.com/core")} - mock_github = mocker.patch("core_directory.views.api_views.Github") - mock_github.return_value.get_repo.return_value = mock_repo - mocker.patch("core_directory.views.api_views.GithubException", GithubException) - mocker.patch("os.getenv", side_effect=lambda key, default=None: "dummy_token" if key == "GITHUB_ACCESS_TOKEN" else default) - - response = client.post(url, data={"core_file": SimpleUploadedFile("test.core", b"dummy")}) - assert response.status_code is 201 - assert b"published" in response.content or b"valid" in response.content + mock_save = mocker.patch('django.core.files.storage.default_storage.save', return_value='test_core.core') + + response = client.post(url, data={}) + data = response.json() + assert response.status_code == 400 + assert "error" in data + mock_save.assert_not_called() @pytest.mark.django_db -def test_publish_core_already_exists_in_db(client, mocker): - # Set up test data: create a core with the same VLNV in the database - vendor = Vendor.objects.create(name="Acme") - library = Library.objects.create(vendor=vendor, name="Lib1") - project = Project.objects.create(vendor=vendor, library=library, name="foo", description="desc") - CorePackage.objects.create( - project=project, - vlnv_name="acme:lib1:foo:1.0.0", - version="1.0.0", - version_major=1, - version_minor=0, - version_patch=0, - core_url="https://example.com/foo.core", - description="desc" - ) - +@pytest.mark.parametrize( + "core_path,sig_path", + valid_pairs, + ids=valid_ids +) +def test_publish_valid_core_and_sig(client, mocker, core_path, sig_path): url = reverse('core_directory:publish') - # Mock serializer - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.validated_data = { - "vlnv_name": "acme:lib1:foo:1.0.0", - "core_file": SimpleUploadedFile("test.core", b"dummy"), - "sanitized_name": "core", - "signature_file": None, - } - - response = client.post(url, data={"core_file": SimpleUploadedFile("test.core", b"dummy")}) - assert response.status_code == 409 - assert b"already exists" in response.content + + # Get the DummyStorage instance used by the FileFields + storage_core = CorePackage._meta.get_field('core_file').storage + storage_sig = CorePackage._meta.get_field('signature_file').storage + # Patch the save method on DummyStorage + mock_save_core = mocker.patch.object(storage_core, 'save', side_effect=lambda name, content, **kwargs: name) + mock_save_sig = mocker.patch.object(storage_sig, 'save', side_effect=lambda name, content, **kwargs: name) + with open(core_path, "rb") as f_core: + files = {'core_file': SimpleUploadedFile(core_path.name, f_core.read(), content_type="application/x-yaml")} + if sig_path: + with open(sig_path, "rb") as f_sig: + files['signature_file'] = SimpleUploadedFile(sig_path.name, f_sig.read(), content_type="application/x-yaml") + response = client.post(url, data=files) + + data = response.json() + assert response.status_code == 201 + assert "message" in data + assert "Core published successfully" in data["message"] + mock_save_core.assert_called_once() + mock_save_sig.assert_called_once() + @pytest.mark.django_db -def test_publish_already_exists_on_github(client, mocker): +@pytest.mark.parametrize( + "core_path,sig_path", + invalid_pairs, + ids=invalid_ids +) +def test_publish_invalid_core_and_sig(client, mocker, core_path, sig_path): url = reverse('core_directory:publish') - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.validated_data = { - "vlnv_name": "vendor:lib:core:1.0.0", - "core_file": SimpleUploadedFile("test.core", b"dummy"), - "sanitized_name": "core", - "signature_file": None, - } - mock_repo = mocker.Mock() - mock_repo.get_contents.return_value = True # Simulate file exists - mock_github = mocker.patch("core_directory.views.api_views.Github") - mock_github.return_value.get_repo.return_value = mock_repo - mocker.patch("os.getenv", side_effect=lambda key, default=None: "dummy_token" if key == "GITHUB_ACCESS_TOKEN" else default) - - response = client.post(url, data={"core_file": SimpleUploadedFile("test.core", b"dummy")}) - assert response.status_code == 409 - assert b"already exists" in response.content + mock_save = mocker.patch('django.core.files.storage.default_storage.save', return_value='test_core.core') + + with open(core_path, "rb") as f_core: + files = {'core_file': SimpleUploadedFile(core_path.name, f_core.read(), content_type="application/x-yaml")} + if sig_path: + with open(sig_path, "rb") as f_sig: + files['signature_file'] = SimpleUploadedFile(sig_path.name, f_sig.read(), content_type="application/x-yaml") + response = client.post(url, data=files) + assert response.status_code == 400 + mock_save.assert_not_called() @pytest.mark.django_db -def test_publish_github_error(client, mocker): +@pytest.mark.parametrize( + "core_path", + list((FIXTURES / "valid_no_sig").glob("*.core")), + ids=lambda p: f"valid_no_sig/{p.name}" +) +def test_publish_valid_core_no_sig(client, mocker, core_path): url = reverse('core_directory:publish') - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.validated_data = { - "vlnv_name": "vendor:lib:core:1.0.0", - "core_file": SimpleUploadedFile("test.core", b"dummy"), - "sanitized_name": "core", - "signature_file": None, - } - class UnknownObjectException(Exception): - pass - class GithubException(Exception): - data = "fail" - mock_repo = mocker.Mock() - # Raise UnknownObjectException to enter the except block - mock_repo.get_contents.side_effect = UnknownObjectException() - # Raise GithubException from create_file to simulate a GitHub error - mock_repo.create_file.side_effect = GithubException() - mock_github = mocker.patch("core_directory.views.api_views.Github") - mock_github.return_value.get_repo.return_value = mock_repo - mocker.patch("core_directory.views.api_views.UnknownObjectException", UnknownObjectException) - mocker.patch("core_directory.views.api_views.GithubException", GithubException) - mocker.patch("os.getenv", side_effect=lambda key, default=None: "dummy_token" if key == "GITHUB_ACCESS_TOKEN" else default) - - response = client.post(url, data={"core_file": SimpleUploadedFile("test.core", b"dummy")}) - assert response.status_code == 500 - assert b"GitHub error" in response.content or b"fail" in response.content + + # Get the DummyStorage instance used by the FileFields + storage = CorePackage._meta.get_field('core_file').storage + # Patch the save method on DummyStorage + mock_save = mocker.patch.object(storage, 'save', side_effect=lambda name, content, **kwargs: name) + + + with open(core_path, "rb") as f_core: + files = {'core_file': SimpleUploadedFile(core_path.name, f_core.read(), content_type="application/x-yaml")} + response = client.post(url, data=files) + data = response.json() + assert response.status_code == 201 + assert "message" in data + assert "Core published successfully" in data["message"] + mock_save.assert_called_once() @pytest.mark.django_db -def test_publish_invalid_serializer(client, mocker): +@pytest.mark.parametrize( + "core_path", + list((FIXTURES / "invalid_no_sig").glob("*.core")), + ids=lambda p: f"invalid_no_sig/{p.name}" +) +def test_publish_invalid_core_no_sig(client, mocker, core_path): url = reverse('core_directory:publish') - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = False - instance.errors = {"field": ["error"]} - mocker.patch("os.getenv", side_effect=lambda key, default=None: "dummy_token" if key == "GITHUB_ACCESS_TOKEN" else default) - response = client.post(url, data={"core_file": SimpleUploadedFile("test.core", b"dummy")}) + + # Get the DummyStorage instance used by the FileFields + storage_core = CorePackage._meta.get_field('core_file').storage + # Patch the save method on DummyStorage + mock_save_core = mocker.patch.object(storage_core, 'save', side_effect=lambda name, content, **kwargs: name) + + with open(core_path, "rb") as f_core: + files = {'core_file': SimpleUploadedFile(core_path.name, f_core.read(), content_type="application/x-yaml")} + response = client.post(url, data=files) assert response.status_code == 400 - assert b"error" in response.content + mock_save_core.assert_not_called() @pytest.mark.django_db -def test_publish_with_signature(client, mocker): +def test_republish_existing_core(client, mocker): url = reverse('core_directory:publish') - # Mock serializer - mock_serializer = mocker.patch("core_directory.views.api_views.CoreSerializer") - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.validated_data = { - "vlnv_name": "vendor:lib:core:1.0.0", - "core_file": SimpleUploadedFile("test.core", b"dummy core"), - "sanitized_name": "core", - "signature_file": SimpleUploadedFile("test.core.sig", b"dummy sig"), - } - # Mock github repo - mock_repo = mocker.Mock() - class UnknownObjectException(Exception): - pass - mock_repo.get_contents.side_effect = UnknownObjectException() - # Simulate create_file for core and signature - mock_repo.create_file.side_effect = [ - {"content": mocker.Mock(download_url="https://example.com/core")}, - {"content": mocker.Mock(download_url="https://example.com/core.sig")}, - ] - mock_github = mocker.patch("core_directory.views.api_views.Github") - mock_github.return_value.get_repo.return_value = mock_repo - mocker.patch("core_directory.views.api_views.UnknownObjectException", UnknownObjectException) - mocker.patch("os.getenv", side_effect=lambda key, default=None: "dummy_token" if key == "GITHUB_ACCESS_TOKEN" else default) - - response = client.post( - url, - data={ - "core_file": SimpleUploadedFile("test.core", b"dummy core"), - "signature_file": SimpleUploadedFile("test.core.sig", b"dummy sig"), - } - ) - assert response.status_code in (200, 201) - assert b"published" in response.content or b"valid" in response.content - # Optionally, check that create_file was called twice (core and sig) - assert mock_repo.create_file.call_count == 2 \ No newline at end of file + + core_file_content = ( + 'CAPI=2:\n' + 'name: vendor:library:core:1.0.0\n' + 'description: "A valid core file for testing with signature."\n' + 'provider:\n' + ' name: github\n' + ' user: myuser\n' + ' repo: myrepo\n' + ' version: "v1.0.0"\n' + ).encode('utf-8') + + mock_save = mocker.patch('django.core.files.storage.default_storage.save', return_value='test_core.core') + mocker.patch('django.core.files.storage.default_storage.exists', side_effect=[False, True]) + + files = {'core_file': SimpleUploadedFile("test_core.core", core_file_content, content_type="application/x-yaml")} + + response = client.post(url, data=files) + data = response.json() + + assert response.status_code == 201 + assert "message" in data + assert "Core published successfully" in data["message"] + + files = {'core_file': SimpleUploadedFile("test_core.core", core_file_content, content_type="application/x-yaml")} + response = client.post(url, data=files) + + data = response.json() + + assert response.status_code == 409 + assert "error" in data + assert "already exists" in data["error"] + mock_save.assert_not_called() + \ No newline at end of file diff --git a/core_directory/tests/api/test_validate.py b/core_directory/tests/api/test_validate.py index d7b9c58..15ff528 100644 --- a/core_directory/tests/api/test_validate.py +++ b/core_directory/tests/api/test_validate.py @@ -24,7 +24,7 @@ def test_validate_no_core_file(client): data = response.json() assert response.status_code == 400 assert "error" in data - assert "No file provided" in data["error"] + assert "No core file provided" in data["error"] @pytest.mark.django_db @pytest.mark.parametrize( diff --git a/core_directory/tests/management/test_init_db.py b/core_directory/tests/management/test_init_db.py index 0a85314..ed24d13 100644 --- a/core_directory/tests/management/test_init_db.py +++ b/core_directory/tests/management/test_init_db.py @@ -1,20 +1,34 @@ import io -import os -import stat from unittest import mock import pytest -from django.core.management import call_command from django.db import IntegrityError -from github import GithubException -from git.exc import GitCommandError +from django.core.files.base import ContentFile +from django.core.management import call_command +from core_directory.management.commands.init_db import Command from core_directory.models import Vendor, Library, Project, CorePackage - # Path to the management command module COMMAND_PATH = "core_directory.management.commands.init_db" +@pytest.fixture +def fake_storage(): + storage = mock.Mock() + # Simulate two files: one core, one sig + storage.listdir.return_value = ([], ["foo.core", "foo.core.sig", "bar.core"]) + storage.url.side_effect = lambda name: f"https://example.com/{name}" + # Simulate file content + def fake_open(name, mode='rb'): + f = ContentFile(b"dummy content") + f.name = name + f.size = len(f.read()) + f.seek(0) + return f + storage.open.side_effect = fake_open + return storage + + @pytest.mark.django_db def test_command_skips_if_db_not_empty(): # Create all required related objects @@ -25,7 +39,7 @@ def test_command_skips_if_db_not_empty(): project=project, vlnv_name="dummy:dummy:dummy:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="dummy.core", description="desc" ) # Now run the command as before... @@ -36,7 +50,7 @@ def test_command_skips_if_db_not_empty(): assert "Database already initialized." in out.getvalue() @pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Command.download_and_load_data") +@mock.patch(f"{COMMAND_PATH}.Command.initialize_from_storage") def test_command_runs_download_if_db_empty(mock_download): out = io.StringIO() call_command("init_db", stdout=out) @@ -45,259 +59,79 @@ def test_command_runs_download_if_db_empty(mock_download): assert "Database initialized successfully." in out.getvalue() @pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Github") -def test_get_repo_info_env_missing(mock_github): - from core_directory.management.commands.init_db import Command - cmd = Command() - # Remove env vars - with mock.patch.dict(os.environ, {}, clear=True): - repo, token, branch = cmd.get_repo_info() - assert repo is None and token is None and branch is None - -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Github") -def test_get_repo_info_success(mock_github): - from core_directory.management.commands.init_db import Command +def test_initialize_from_storage_prefill_and_success(monkeypatch, fake_storage): cmd = Command() - # Set env vars - with mock.patch.dict(os.environ, {"GITHUB_REPO": "user/repo", "GITHUB_ACCESS_TOKEN": "token"}): - mock_repo = mock.Mock() - mock_repo.default_branch = "main" - mock_github.return_value.get_repo.return_value = mock_repo - repo, token, branch = cmd.get_repo_info() - assert repo == "user/repo" - assert token == "token" - assert branch == "main" - -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from") -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -@mock.patch(f"{COMMAND_PATH}.CoreSerializer") -def test_download_and_load_data_success(mock_serializer, mock_get_repo_info, mock_clone_from, tmp_path): - from core_directory.management.commands.init_db import Command - # Setup repo info - mock_get_repo_info.return_value = ("user/repo", "token", "main") - # Create dummy files in temp dir - temp_dir = tmp_path - core_file = temp_dir / "test.core" - core_file.write_bytes(b"dummy core content") - sig_file = temp_dir / "test.sig" - sig_file.write_bytes(b"dummy sig content") - # Patch os.listdir to return our files - with mock.patch("os.listdir", return_value=["test.core", "test.sig"]), \ - mock.patch("tempfile.mkdtemp", return_value=str(temp_dir)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - # Mock serializer - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.save.return_value = None - cmd.stdout = io.StringIO() - cmd.download_and_load_data() - assert "Processing test.core" in cmd.stdout.getvalue() - assert instance.is_valid.called - assert instance.save.called - -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from", side_effect=GitCommandError('clone', 1)) -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -def test_download_and_load_data_clone_error(mock_get_repo_info, mock_clone_from, tmp_path): - from core_directory.management.commands.init_db import Command - mock_get_repo_info.return_value = ("user/repo", "token", "main") - with mock.patch("tempfile.mkdtemp", return_value=str(tmp_path)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - cmd.stdout = io.StringIO() - cmd.download_and_load_data() - assert "error cloning repository" in cmd.stdout.getvalue().lower() - - -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from") -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -@mock.patch(f"{COMMAND_PATH}.CoreSerializer") -def test_download_and_load_data_with_signature( - mock_serializer, mock_get_repo_info, mock_clone_from, tmp_path -): - from core_directory.management.commands.init_db import Command + cmd.stdout = io.StringIO() - # Setup repo info - mock_get_repo_info.return_value = ("user/repo", "token", "main") + # Patch GitHubStorage to return our fake storage + monkeypatch.setattr("core_directory.management.commands.init_db.GitHubStorage", lambda: fake_storage) + # Add a prefill_cache method + fake_storage.prefill_cache = mock.Mock() - # Create dummy files in temp dir - temp_dir = tmp_path - core_file = temp_dir / "test.core" - core_file.write_bytes(b"dummy core content") - sig_file = temp_dir / "test.core.sig" - sig_file.write_bytes(b"dummy sig content") + # Patch CoreSerializer + fake_serializer = mock.Mock() + fake_serializer.is_valid.return_value = True + fake_serializer.save.return_value = None + monkeypatch.setattr("core_directory.management.commands.init_db.CoreSerializer", lambda data: fake_serializer) - # Patch os.listdir to return our files - with mock.patch("os.listdir", return_value=["test.core", "test.core.sig"]), \ - mock.patch("tempfile.mkdtemp", return_value=str(temp_dir)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - # Mock serializer - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.save.return_value = None - cmd.stdout = io.StringIO() - cmd.download_and_load_data() + cmd.initialize_from_storage() - # Check that the serializer was called with both core_file and sig_file - called_data = mock_serializer.call_args[1]["data"] - assert "core_file" in called_data - assert "sig_file" in called_data - assert called_data["sig_file"].name == "test.core.sig" - assert called_data["sig_url"].endswith("test.core.sig") - assert "Processing test.core" in cmd.stdout.getvalue() - assert instance.is_valid.called - assert instance.save.called + # Check prefill_cache was called + fake_storage.prefill_cache.assert_called_once() + # Should process both core files + assert "Processing foo.core" in cmd.stdout.getvalue() + assert "Processing bar.core" in cmd.stdout.getvalue() + # Should call serializer.save() for each core file + assert fake_serializer.save.call_count == 2 @pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -def test_download_and_load_data_returns_early_if_no_repo(mock_get_repo_info, tmp_path): - from core_directory.management.commands.init_db import Command - # Simulate get_repo_info returning (None, None, None) - mock_get_repo_info.return_value = (None, None, None) - with mock.patch("tempfile.mkdtemp", return_value=str(tmp_path)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - cmd.stdout = io.StringIO() - # Should return early, not attempt to clone or process files - cmd.download_and_load_data() - # Optionally, check that nothing was processed - output = cmd.stdout.getvalue() - # There should be no "Processing" or "Cloning" messages - assert "Processing" not in output - assert "Cloning" not in output - -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from", side_effect=OSError("filesystem error")) -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -def test_download_and_load_data_oserror(mock_get_repo_info, mock_clone_from, tmp_path): - from core_directory.management.commands.init_db import Command - mock_get_repo_info.return_value = ("user/repo", "token", "main") - with mock.patch("tempfile.mkdtemp", return_value=str(tmp_path)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - cmd.stdout = io.StringIO() - cmd.download_and_load_data() - output = cmd.stdout.getvalue().lower() - assert "filesystem error cloning repository" in output +def test_initialize_from_storage_prefill_error(monkeypatch, fake_storage): + cmd = Command() + cmd.stdout = io.StringIO() -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from") -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -@mock.patch(f"{COMMAND_PATH}.CoreSerializer") -def test_download_and_load_data_invalid_serializer( - mock_serializer, mock_get_repo_info, mock_clone_from, tmp_path -): - from core_directory.management.commands.init_db import Command + monkeypatch.setattr("core_directory.management.commands.init_db.GitHubStorage", lambda: fake_storage) + # Simulate prefill_cache raising RuntimeError + fake_storage.prefill_cache = mock.Mock(side_effect=RuntimeError("fail prefill")) - mock_get_repo_info.return_value = ("user/repo", "token", "main") - temp_dir = tmp_path - core_file = temp_dir / "test.core" - core_file.write_bytes(b"dummy core content") + fake_serializer = mock.Mock() + fake_serializer.is_valid.return_value = True + fake_serializer.save.return_value = None + monkeypatch.setattr("core_directory.management.commands.init_db.CoreSerializer", lambda data: fake_serializer) - with mock.patch("os.listdir", return_value=["test.core"]), \ - mock.patch("tempfile.mkdtemp", return_value=str(temp_dir)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - instance = mock_serializer.return_value - instance.is_valid.return_value = False - instance.errors = {"field": ["error"]} - cmd.stdout = io.StringIO() - cmd.download_and_load_data() - output = cmd.stdout.getvalue().lower() - assert "errors in test.core" in output - assert "error" in output + cmd.initialize_from_storage() + assert "Error during cache prefill: fail prefill" in cmd.stdout.getvalue() @pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Repo.clone_from") -@mock.patch(f"{COMMAND_PATH}.Command.get_repo_info") -@mock.patch(f"{COMMAND_PATH}.CoreSerializer") -def test_download_and_load_data_save_exception( - mock_serializer, mock_get_repo_info, mock_clone_from, tmp_path -): - from core_directory.management.commands.init_db import Command - - mock_get_repo_info.return_value = ("user/repo", "token", "main") - temp_dir = tmp_path - core_file = temp_dir / "test.core" - core_file.write_bytes(b"dummy core content") +def test_initialize_from_storage_serializer_invalid(monkeypatch, fake_storage): + cmd = Command() + cmd.stdout = io.StringIO() - with mock.patch("os.listdir", return_value=["test.core"]), \ - mock.patch("tempfile.mkdtemp", return_value=str(temp_dir)), \ - mock.patch("shutil.rmtree"): - cmd = Command() - instance = mock_serializer.return_value - instance.is_valid.return_value = True - instance.save.side_effect = IntegrityError("save failed") - cmd.stdout = io.StringIO() - cmd.download_and_load_data() - output = cmd.stdout.getvalue().lower() - assert "error creating database object for test.core" in output - assert "save failed" in output + monkeypatch.setattr("core_directory.management.commands.init_db.GitHubStorage", lambda: fake_storage) + fake_storage.prefill_cache = mock.Mock() -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Github") -def test_get_repo_info_no_access_token(mock_github): - from core_directory.management.commands.init_db import Command - cmd = Command() - # Set only GITHUB_REPO, not GITHUB_ACCESS_TOKEN - with mock.patch.dict(os.environ, {"GITHUB_REPO": "user/repo"}, clear=True): - repo, token, branch = cmd.get_repo_info() - assert repo is None and token is None and branch is None + fake_serializer = mock.Mock() + fake_serializer.is_valid.return_value = False + fake_serializer.errors = {"core_file": ["invalid"]} + monkeypatch.setattr("core_directory.management.commands.init_db.CoreSerializer", lambda data: fake_serializer) -@pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Github") -def test_get_repo_info_github_exception(mock_github): - from core_directory.management.commands.init_db import Command - cmd = Command() - # Set both env vars - with mock.patch.dict(os.environ, {"GITHUB_REPO": "user/repo", "GITHUB_ACCESS_TOKEN": "token"}, clear=True): - # Simulate GithubException when calling get_repo - mock_github.return_value.get_repo.side_effect = GithubException(500, "fail", None) - repo, token, branch = cmd.get_repo_info() - assert repo is None and token is None and branch is None + cmd.initialize_from_storage() + assert "Errors in foo.core" in cmd.stdout.getvalue() + assert "invalid" in cmd.stdout.getvalue() @pytest.mark.django_db -@mock.patch(f"{COMMAND_PATH}.Github") -def test_get_repo_info_attribute_error(mock_github): - from core_directory.management.commands.init_db import Command +def test_initialize_from_storage_save_exception(monkeypatch, fake_storage): cmd = Command() - # Set both env vars - with mock.patch.dict(os.environ, {"GITHUB_REPO": "user/repo", "GITHUB_ACCESS_TOKEN": "token"}, clear=True): - # Simulate AttributeError when calling get_repo - mock_github.return_value.get_repo.side_effect = AttributeError("fail") - repo, token, branch = cmd.get_repo_info() - assert repo is None and token is None and branch is None - -def test_on_rm_exc_makes_file_writable_and_retries(tmp_path): - from core_directory.management.commands.init_db import Command - - file_path = tmp_path / "dummy.txt" - file_path.write_text("test") - file_path.chmod(0o400) - - called = {} - - # Instead of actually removing, just record the call - def fake_remove(path): - called["called"] = True - # Simulate successful removal (do nothing) + cmd.stdout = io.StringIO() - with mock.patch("os.chmod") as mock_chmod, \ - mock.patch("os.access", return_value=False): - Command._on_rm_exc(fake_remove, str(file_path), (PermissionError, PermissionError("denied"), None)) - mock_chmod.assert_called_with(str(file_path), stat.S_IWUSR) - assert called["called"] + monkeypatch.setattr("core_directory.management.commands.init_db.GitHubStorage", lambda: fake_storage) + fake_storage.prefill_cache = mock.Mock() -def test_on_rm_exc_raises_other_errors(tmp_path): - from core_directory.management.commands.init_db import Command + fake_serializer = mock.Mock() + fake_serializer.is_valid.return_value = True + fake_serializer.save.side_effect = IntegrityError("save failed") - file_path = tmp_path / "dummy.txt" - file_path.write_text("test") + monkeypatch.setattr("core_directory.management.commands.init_db.CoreSerializer", lambda data: fake_serializer) - with mock.patch("os.access", return_value=True): - with pytest.raises(PermissionError): - Command._on_rm_exc(lambda p: None, str(file_path), (PermissionError, PermissionError("denied"), None)) + cmd.initialize_from_storage() + assert "Error creating database object for foo.core" in cmd.stdout.getvalue() + assert "save failed" in cmd.stdout.getvalue() \ No newline at end of file diff --git a/core_directory/tests/storages/__init__.py b/core_directory/tests/storages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core_directory/tests/storages/test_github_storage.py b/core_directory/tests/storages/test_github_storage.py new file mode 100644 index 0000000..0968947 --- /dev/null +++ b/core_directory/tests/storages/test_github_storage.py @@ -0,0 +1,327 @@ +import io +import os +import pytest +from unittest import mock + +from django.core.files.base import ContentFile + +from core_directory.storages.github import GitHubStorage +from github import UnknownObjectException, GithubException + +@pytest.fixture +def mock_github(monkeypatch): + # Patch Github and repo + mock_repo = mock.Mock() + mock_github = mock.Mock() + mock_github.get_repo.return_value = mock_repo + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock_github) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + return mock_repo + +@pytest.fixture +def storage(mock_github, tmp_path, monkeypatch): + # Patch os.makedirs to avoid real dirs + monkeypatch.setattr(os, "makedirs", lambda *a, **k: None) + # Use a temp cache dir + return GitHubStorage( + repo_name="user/repo", + access_token="token", + branch="main", + cache_dir=str(tmp_path) + ) + +def test_init_env_vars(monkeypatch): + monkeypatch.setenv("GITHUB_REPO", "user/repo") + monkeypatch.setenv("GITHUB_ACCESS_TOKEN", "token") + monkeypatch.setenv("GITHUB_BRANCH", "main") + monkeypatch.setenv("GITHUB_STORAGE_CACHE_DIR", "/tmp/cache") + with mock.patch("core_directory.storages.github.Github") as mock_github, \ + mock.patch("core_directory.storages.github.GitHubAuthToken"): + storage = GitHubStorage() + assert storage.repo_name == "user/repo" + assert storage.access_token == "token" + assert storage.branch == "main" + assert storage.cache_dir == "/tmp/cache" + +def test_init_raises_if_no_repo_or_token(monkeypatch): + monkeypatch.delenv("GITHUB_REPO", raising=False) + monkeypatch.delenv("GITHUB_ACCESS_TOKEN", raising=False) + with pytest.raises(ValueError): + GitHubStorage(repo_name=None, access_token=None) + +def test_init_makes_cache_dir(monkeypatch): + # Patch Github and GitHubAuthToken to avoid real network + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + + called = {} + def fake_makedirs(path, exist_ok): + called["path"] = path + called["exist_ok"] = exist_ok + monkeypatch.setattr("os.makedirs", fake_makedirs) + GitHubStorage(repo_name="r", access_token="t", cache_dir="/tmp/mycache") + assert called["path"] == "/tmp/mycache" + assert called["exist_ok"] is True + +def test_open_reads_from_cache(storage, tmp_path, monkeypatch): + # Write a file to cache + cache_file = tmp_path / "foo.txt" + cache_file.write_bytes(b"hello") + result = storage._open("foo.txt") + assert result.read() == b"hello" + assert result.name == "foo.txt" + +def test_open_reads_from_github(storage, mock_github, tmp_path, monkeypatch): + # Remove cache file + cache_file = tmp_path / "bar.txt" + if cache_file.exists(): + cache_file.unlink() + # Mock repo.get_contents + mock_file = mock.Mock() + mock_file.decoded_content = b"from github" + mock_github.get_contents.return_value = mock_file + result = storage._open("bar.txt") + assert result.read() == b"from github" + assert result.name == "bar.txt" + # Should write to cache + assert (tmp_path / "bar.txt").exists() + +def test_open_file_not_found(storage, mock_github): + mock_github.get_contents.side_effect = UnknownObjectException(404, "Not found", None) + with pytest.raises(FileNotFoundError): + storage._open("missing.txt") + +def test_open_github_exception(storage, mock_github): + mock_github.get_contents.side_effect = GithubException(500, "fail", None) + with pytest.raises(IOError): + storage._open("fail.txt") + +def test_save_creates_file(storage, mock_github, tmp_path): + # Simulate file does not exist + mock_github.get_contents.side_effect = UnknownObjectException(404, "Not found", None) + content = ContentFile(b"abc", name="foo.txt") + storage._repo.create_file.return_value = None + result = storage._save("foo.txt", content) + assert result == "foo.txt" + storage._repo.create_file.assert_called_once() + # Should write to cache + assert (tmp_path / "foo.txt").exists() + +def test_save_updates_file(storage, mock_github, tmp_path): + # Simulate file exists + mock_file = mock.Mock() + mock_file.sha = "sha123" + mock_github.get_contents.return_value = mock_file + content = ContentFile(b"def", name="foo.txt") + storage._repo.update_file.return_value = None + result = storage._save("foo.txt", content) + assert result == "foo.txt" + storage._repo.update_file.assert_called_once() + # Should write to cache + assert (tmp_path / "foo.txt").exists() + +def test_save_github_exception(storage, mock_github): + mock_github.get_contents.side_effect = GithubException(500, "fail", None) + content = ContentFile(b"abc", name="foo.txt") + with pytest.raises(IOError): + storage._save("foo.txt", content) + +def test_delete_removes_file(storage, mock_github, tmp_path): + # Simulate file exists + mock_file = mock.Mock() + mock_file.sha = "sha123" + mock_github.get_contents.return_value = mock_file + storage._repo.delete_file.return_value = None + # Write to cache + cache_file = tmp_path / "foo.txt" + cache_file.write_bytes(b"abc") + storage.delete("foo.txt") + storage._repo.delete_file.assert_called_once() + assert not cache_file.exists() + +def test_delete_file_not_found(storage, mock_github, tmp_path): + # Simulate file does not exist + mock_github.get_contents.side_effect = UnknownObjectException(404, "Not found", None) + # Write to cache + cache_file = tmp_path / "foo.txt" + cache_file.write_bytes(b"abc") + storage.delete("foo.txt") + # Should remove cache file + assert not cache_file.exists() + +def test_delete_github_exception(storage, mock_github): + mock_github.get_contents.side_effect = GithubException(500, "fail", None) + with pytest.raises(IOError): + storage.delete("foo.txt") + +def test_exists_checks_cache_and_github(storage, mock_github, tmp_path): + # File in cache + cache_file = tmp_path / "foo.txt" + cache_file.write_bytes(b"abc") + assert storage.exists("foo.txt") + # Not in cache, but in github + cache_file.unlink() + mock_github.get_contents.return_value = mock.Mock() + assert storage.exists("foo.txt") + # Not in cache, not in github + mock_github.get_contents.side_effect = UnknownObjectException(404, "Not found", None) + assert not storage.exists("foo.txt") + +def test_get_available_name_calls_delete(storage, monkeypatch): + called = {} + def fake_delete(name): + called["deleted"] = name + return None + monkeypatch.setattr(storage, "delete", fake_delete) + name = storage.get_available_name("foo.txt") + assert name == "foo.txt" + assert called["deleted"] == "foo.txt" + +def test_url(storage): + url = storage.url("foo.txt") + assert url == "https://raw.githubusercontent.com/user/repo/main/foo.txt" + +def test_size_returns_size(storage, mock_github): + mock_file = mock.Mock() + mock_file.size = 123 + mock_github.get_contents.return_value = mock_file + assert storage.size("foo.txt") == 123 + +def test_size_returns_zero_if_not_found(storage, mock_github): + mock_github.get_contents.side_effect = UnknownObjectException(404, "Not found", None) + assert storage.size("foo.txt") == 0 + +def test_listdir(storage, mock_github): + file1 = mock.Mock() + file1.name = "foo.txt" + file1.type = "file" + dir1 = mock.Mock() + dir1.name = "bar" + dir1.type = "dir" + mock_github.get_contents.return_value = [file1, dir1] + dirs, files = storage.listdir("") + assert dirs == ["bar"] + assert files == ["foo.txt"] + +def test_listdir_not_implemented(storage): + with pytest.raises(NotImplementedError): + storage.listdir("not-root") + +def test_listdir_not_root_raises(monkeypatch): + # Patch Github and GitHubAuthToken to avoid real network + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + storage = GitHubStorage(repo_name="r", access_token="t") + with pytest.raises(NotImplementedError): + storage.listdir("not-root") + +def test_clear_cache(tmp_path, monkeypatch): + # Patch Github and GitHubAuthToken to avoid real network + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + + # Create files and dirs in cache + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + (cache_dir / "file1.txt").write_text("abc") + (cache_dir / "dir1").mkdir() + (cache_dir / "dir1" / "file2.txt").write_text("def") + storage = GitHubStorage( + repo_name="user/repo", + access_token="token", + branch="main", + cache_dir=str(cache_dir) + ) + storage.clear_cache() + assert not any(cache_dir.iterdir()) + +def test_prefill_cache_success(monkeypatch, tmp_path): + # Patch Github and GitHubAuthToken to avoid real network + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + + # Patch requests.get, zipfile.ZipFile, etc. + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + storage = GitHubStorage( + repo_name="user/repo", + access_token="token", + branch="main", + cache_dir=str(cache_dir) + ) + # Patch clear_cache + storage.clear_cache = mock.Mock() + # Patch requests.get + fake_response = mock.Mock() + fake_response.status_code = 200 + fake_response.iter_content = lambda chunk_size: [b"zipdata"] + monkeypatch.setattr("core_directory.storages.github.requests.get", lambda *a, **k: fake_response) + # Patch zipfile.ZipFile + class FakeZip: + def __enter__(self): return self + def __exit__(self, *a): pass + def extractall(self, path): + # Create a fake extracted dir and file + extracted_dir = os.path.join(path, "repo-main") + os.makedirs(extracted_dir, exist_ok=True) + with open(os.path.join(extracted_dir, "foo.txt"), "wb") as f: + f.write(b"abc") + monkeypatch.setattr("core_directory.storages.github.zipfile.ZipFile", lambda *a, **k: FakeZip()) + # Patch os.listdir to simulate extracted dir + def fake_listdir(path): + if "repo-main" in path: + return ["foo.txt"] + if os.path.basename(path).startswith("tmp"): + return ["repo-main"] + return [] + monkeypatch.setattr("os.listdir", fake_listdir) + # Patch os.path.isdir to match our fake structure + monkeypatch.setattr("os.path.isdir", lambda path: "repo-main" in path or "cache" in path) + storage.prefill_cache() + # Should have file in cache + assert (cache_dir / "foo.txt").exists() + +def test_prefill_cache_no_cache_dir(monkeypatch): + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + storage = GitHubStorage(repo_name="r", access_token="t", cache_dir=None) + with pytest.raises(RuntimeError): + storage.prefill_cache() + +def test_prefill_cache_bad_status(monkeypatch, tmp_path): + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + storage = GitHubStorage(repo_name="r", access_token="t", cache_dir=str(cache_dir)) + storage.clear_cache = mock.Mock() + fake_response = mock.Mock() + fake_response.status_code = 404 + fake_response.text = "not found" + monkeypatch.setattr("core_directory.storages.github.requests.get", lambda *a, **k: fake_response) + with pytest.raises(RuntimeError) as excinfo: + storage.prefill_cache() + assert "Failed to download repo archive" in str(excinfo.value) + +def test_prefill_cache_no_extracted_dir(monkeypatch, tmp_path): + monkeypatch.setattr("core_directory.storages.github.Github", lambda **kwargs: mock.Mock(get_repo=lambda repo_name: mock.Mock())) + monkeypatch.setattr("core_directory.storages.github.GitHubAuthToken", lambda token: token) + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + storage = GitHubStorage(repo_name="r", access_token="t", cache_dir=str(cache_dir)) + storage.clear_cache = mock.Mock() + fake_response = mock.Mock() + fake_response.status_code = 200 + fake_response.iter_content = lambda chunk_size: [b"zipdata"] + monkeypatch.setattr("core_directory.storages.github.requests.get", lambda *a, **k: fake_response) + class FakeZip: + def __enter__(self): return self + def __exit__(self, *a): pass + def extractall(self, path): pass # Do not create any dirs + monkeypatch.setattr("core_directory.storages.github.zipfile.ZipFile", lambda *a, **k: FakeZip()) + # Patch os.listdir to always return empty list for temp_dir + monkeypatch.setattr("os.listdir", lambda path: []) + with pytest.raises(RuntimeError) as excinfo: + storage.prefill_cache() + assert "No directory found in extracted archive" in str(excinfo.value) + \ No newline at end of file diff --git a/core_directory/tests/test_models.py b/core_directory/tests/test_models.py index fbb88d8..8585872 100644 --- a/core_directory/tests/test_models.py +++ b/core_directory/tests/test_models.py @@ -38,7 +38,7 @@ def test_corepackage_creation_and_version_parsing(): project=proj, vlnv_name="acme:lib1:core1:1.2.3-rc1", version="1.2.3-rc1", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) assert cp.version_major == 1 @@ -48,7 +48,7 @@ def test_corepackage_creation_and_version_parsing(): assert str(cp) == f"{proj}:1.2.3-rc1" assert not cp.is_signed # Now add sig_url - cp.sig_url = "https://example.com/core.sig" + cp.signature_file = "test.core.sig" cp.save() assert cp.is_signed @@ -61,7 +61,7 @@ def test_corepackage_invalid_version(): project=proj, vlnv_name="acme:lib1:core1:bad", version="bad", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) with pytest.raises(ValidationError): @@ -76,7 +76,7 @@ def test_unique_constraints(): project=proj, vlnv_name="acme:lib1:core1:1.2.3", version="1.2.3", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) # Duplicate version for same project should fail @@ -85,7 +85,7 @@ def test_unique_constraints(): project=proj, vlnv_name="acme:lib1:core1:1.2.3", version="1.2.3", - core_url="https://example.com/core2", + core_file="test.core", description="desc" ) @@ -98,7 +98,7 @@ def test_fileset_and_dependency(): project=proj, vlnv_name="acme:lib1:core1:1.2.3", version="1.2.3", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) fs = Fileset.objects.create(core_package=cp, name="fs1") @@ -119,7 +119,7 @@ def test_target_and_target_configuration(): project=proj, vlnv_name="acme:lib1:core1:1.2.3", version="1.2.3", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) fs = Fileset.objects.create(core_package=cp, name="fs1") @@ -140,7 +140,7 @@ def test_corepackage_with_valid_spdx_license(): project=proj, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="test.core", description="desc", spdx_license=valid_license ) @@ -156,7 +156,7 @@ def test_corepackage_with_license_ref_fails(): project=proj, vlnv_name="acme:lib1:core1:2.0.0", version="2.0.0", - core_url="https://example.com/core", + core_file="test.core", description="desc", spdx_license="LicenseRef-MyCustomLicense" ) @@ -172,7 +172,7 @@ def test_corepackage_with_invalid_license_fails(): project=proj, vlnv_name="acme:lib1:core1:3.0.0", version="3.0.0", - core_url="https://example.com/core", + core_file="test.core", description="desc", spdx_license="NOT_A_VALID_LICENSE" ) @@ -188,7 +188,7 @@ def test_corepackage_with_blank_license(): project=proj, vlnv_name="acme:lib1:core1:4.0.0", version="4.0.0", - core_url="https://example.com/core", + core_file="test.core", description="desc" ) assert cp.spdx_license is None or cp.spdx_license == "" diff --git a/core_directory/tests/test_serializers.py b/core_directory/tests/test_serializers.py index b6b18b8..be63425 100644 --- a/core_directory/tests/test_serializers.py +++ b/core_directory/tests/test_serializers.py @@ -7,6 +7,8 @@ from rest_framework import serializers from rest_framework.exceptions import ValidationError +from django.core.files.uploadedfile import SimpleUploadedFile +from django.core.files.storage import FileSystemStorage from jsonschema import ValidationError as JsonSchemaValidationError, SchemaError import json @@ -14,6 +16,13 @@ from core_directory.models import Vendor, Library, Project, CorePackage from utils.spdx import get_spdx_license_ids +@pytest.fixture(autouse=True) +def patch_corepackage_storage(settings): + from ..storages.dummy_storage import DummyStorage + settings.DEFAULT_FILE_STORAGE = 'path.to.dummy_storage.DummyStorage' + CorePackage._meta.get_field('core_file').storage = DummyStorage() + CorePackage._meta.get_field('signature_file').storage = DummyStorage() + # --- Helper to create a fake file object --- class FakeFile(io.BytesIO): def __init__(self, content, name="test.core", size=None): @@ -213,9 +222,10 @@ def test_core_serializer_create(): "library_name": "Lib1", "project_name": "Core1", "vlnv_name": "Acme:Lib1:Core1:1.0.0", + "sanitized_name": "acme_lib1_core1_1_0_0", "version": "1.0.0", - "core_url": "https://example.com/core", - "sig_url": None, + "core_file": SimpleUploadedFile("acme_lib1_core1_1_0_0.core", b"CAPI=2:\nname: Acme:Lib1:Core1:1.0.0\n"), + "signature_file": None, "description": "A test core package.", "core_content_yaml": { "filesets": { @@ -235,7 +245,6 @@ def test_core_serializer_create(): } } } - serializer = CoreSerializer() instance = serializer.create(validated_data) @@ -245,7 +254,7 @@ def test_core_serializer_create(): assert instance.project.library.name == "Lib1" assert instance.project.name == "Core1" assert instance.version == "1.0.0" - assert instance.core_url == "https://example.com/core" + assert instance.core_file == "acme_lib1_core1_1_0_0.core" assert instance.description == "A test core package." # Fileset and Target should also exist filesets = instance.filesets.all() @@ -269,7 +278,8 @@ def test_core_serializer_create_with_dependencies(): "project_name": "Core1", "vlnv_name": "Acme:Lib1:Core1:1.0.0", "version": "1.0.0", - "core_url": "https://example.com/core", + "sanitized_name": "acme_lib1_core1_1_0_0", + "core_file": SimpleUploadedFile("acme_lib1_core1_1_0_0.core", b"CAPI=2:\nname: Acme:Lib1:Core1:1.0.0\n"), "sig_url": None, "description": "A test core package.", "core_content_yaml": { @@ -294,7 +304,6 @@ def test_core_serializer_create_with_dependencies(): } } } - serializer = CoreSerializer() instance = serializer.create(validated_data) @@ -326,7 +335,8 @@ def test_core_serializer_create_with_valid_spdx_license(): "project_name": "Core1", "vlnv_name": "Acme:Lib1:Core1:1.0.0", "version": "1.0.0", - "core_url": "https://example.com/core", + "sanitized_name": "acme_lib1_core1_1_0_0", + "core_file": SimpleUploadedFile("acme_lib1_core1_1_0_0.core", b"CAPI=2:\nname: Acme:Lib1:Core1:1.0.0\n"), "sig_url": None, "description": "A test core package.", "spdx_license": valid_license, @@ -348,6 +358,7 @@ def test_core_serializer_create_with_valid_spdx_license(): } } } + serializer = CoreSerializer() instance = serializer.create(validated_data) assert isinstance(instance, CorePackage) @@ -361,7 +372,8 @@ def test_core_serializer_create_with_invalid_license_fails(): "project_name": "Core1", "vlnv_name": "Acme:Lib1:Core1:1.0.0", "version": "1.0.0", - "core_url": "https://example.com/core", + "sanitized_name": "acme_lib1_core1_1_0_0", + "core_file": SimpleUploadedFile("acme_lib1_core1_1_0_0.core", b"CAPI=2:\nname: Acme:Lib1:Core1:1.0.0\n"), "sig_url": None, "description": "A test core package.", "spdx_license": "NOT_A_VALID_LICENSE", @@ -383,6 +395,7 @@ def test_core_serializer_create_with_invalid_license_fails(): } } } + serializer = CoreSerializer() with pytest.raises(Exception): # Could be ValidationError or IntegrityError depending on your model serializer.create(validated_data) @@ -417,6 +430,7 @@ def test_core_serializer_create_with_license_ref_fails(): } } } + serializer = CoreSerializer() with pytest.raises(Exception): # Could be ValidationError or IntegrityError depending on your model serializer.create(validated_data) @@ -429,7 +443,8 @@ def test_core_serializer_create_with_missing_license(): "project_name": "Core1", "vlnv_name": "Acme:Lib1:Core1:1.0.0", "version": "1.0.0", - "core_url": "https://example.com/core", + "sanitized_name": "acme_lib1_core1_1_0_0", + "core_file": SimpleUploadedFile("acme_lib1_core1_1_0_0.core", b"CAPI=2:\nname: Acme:Lib1:Core1:1.0.0\n"), "sig_url": None, "description": "A test core package.", # No spdx_license field @@ -450,7 +465,7 @@ def test_core_serializer_create_with_missing_license(): } } } - } + } serializer = CoreSerializer() instance = serializer.create(validated_data) assert isinstance(instance, CorePackage) diff --git a/core_directory/tests/test_sitemap.py b/core_directory/tests/test_sitemap.py index 6252042..9b865c5 100644 --- a/core_directory/tests/test_sitemap.py +++ b/core_directory/tests/test_sitemap.py @@ -18,7 +18,7 @@ def test_sitemap_with_data(client): project=project, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) diff --git a/core_directory/tests/test_urls.py b/core_directory/tests/test_urls.py index d2c7cdb..1a23fda 100644 --- a/core_directory/tests/test_urls.py +++ b/core_directory/tests/test_urls.py @@ -18,25 +18,7 @@ def test_url_resolves_and_returns(client, mocker, url_name, kwargs, expected_sta Test that each named URL can be reversed, resolved, and returns a valid response. For core_get and core_list, mock the GitHub API call. """ - mocker.patch.dict("os.environ", { - "GITHUB_REPO": "dummy/repo", - "GITHUB_ACCESS_TOKEN": "dummy_token" - }) - - # Mock GitHub for endpoints that use it - if url_name in ("core_get", "core_list"): - mock_github = mocker.patch("core_directory.views.api_views.Github") - mock_repo = mock_github.return_value.get_repo.return_value - if url_name == "core_get": - mock_contents = mocker.Mock() - mock_contents.decoded_content = b"dummy core file content" - mock_repo.get_contents.return_value = mock_contents - elif url_name == "core_list": - mock_content = mocker.Mock() - mock_content.type = "file" - mock_content.path = "foo.core" - mock_repo.get_contents.return_value = [mock_content] - + url = reverse(f"core_directory:{url_name}", kwargs=kwargs) match = resolve(url) assert match.view_name == f"core_directory:{url_name}" diff --git a/core_directory/tests/web_ui/test_core_detail.py b/core_directory/tests/web_ui/test_core_detail.py index f0589e9..ada971c 100644 --- a/core_directory/tests/web_ui/test_core_detail.py +++ b/core_directory/tests/web_ui/test_core_detail.py @@ -14,7 +14,7 @@ def test_core_detail_view_by_pk(client): project=p, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) url = reverse("core-detail", kwargs={"pk": cp.pk}) @@ -38,7 +38,7 @@ def test_core_detail_by_vlnv_view(client): project=p, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) url = reverse("core-detail-vlnv", kwargs={ @@ -59,7 +59,7 @@ def test_core_detail_by_vlnv_view_without_lib(client): project=p, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) url = reverse("core-detail-vlnv", kwargs={ @@ -81,7 +81,7 @@ def test_core_detail_with_target_and_filesets(client): project=project, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) # Add fileset diff --git a/core_directory/tests/web_ui/tests.py b/core_directory/tests/web_ui/tests.py index effdf22..cc04190 100644 --- a/core_directory/tests/web_ui/tests.py +++ b/core_directory/tests/web_ui/tests.py @@ -51,7 +51,7 @@ def test_core_package_list_view(client): project=p, vlnv_name="acme:lib1:core1:1.0.0", version="1.0.0", - core_url="https://example.com/core", + core_file="acme_lib1_core1_1_0_0.core", description="desc" ) url = reverse("core-package-list") diff --git a/core_directory/views/api_views.py b/core_directory/views/api_views.py index 3da3e68..2d9d70f 100644 --- a/core_directory/views/api_views.py +++ b/core_directory/views/api_views.py @@ -1,21 +1,13 @@ -"""API views for core_directory""" -import os - -from dataclasses import dataclass - -import requests - +"""API views for FuseSoC Package Directory.""" +from django.db import IntegrityError, DatabaseError from django.http import HttpResponse from django.views.generic import TemplateView from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiResponse -from github import Github -from github import GithubException, UnknownObjectException -from github.Auth import Token as GitHubAuthToken - from rest_framework import status +from rest_framework.exceptions import ValidationError as DRFValidationError from rest_framework.parsers import FormParser, MultiPartParser from rest_framework.response import Response from rest_framework.views import APIView @@ -140,9 +132,8 @@ def get(self, request): try: core_object = CorePackage.objects.get(vlnv_name=requested_core_vlnv) - requested_file = requests.get(core_object.core_url, timeout=10) - if requested_file.status_code == 200: - response = HttpResponse(requested_file.content, content_type='application/octet-stream') + if core_object: + response = HttpResponse(core_object.core_file.file, content_type='application/octet-stream') response['Content-Disposition'] = f'attachment; filename={core_object.sanitized_vlnv}.core' return response return Response( @@ -154,6 +145,12 @@ def get(self, request): {'error': f'FuseSoC Core Package {requested_core_vlnv} not available.'}, status=status.HTTP_404_NOT_FOUND ) + except FileNotFoundError: + return Response( + {'error': f'FuseSoC Core Package {requested_core_vlnv} not available.'}, + status=status.HTTP_404_NOT_FOUND + ) + class Publish(APIView): """Endpoint for publishing a new core file to FuseSoC Package Directory.""" parser_classes = (MultiPartParser, FormParser) @@ -196,119 +193,40 @@ def post(self, request, *args, **kwargs): Returns: Response: Success message or error message. """ - @dataclass - class CoreData: - """ - Container for core file publishing data. - - Attributes: - vlnv_name (str): The VLNV (Vendor:Library:Name:Version) name of the core. - sanitized_name (str): A sanitized version of the core name, suitable for filenames. - core_file (Any): The uploaded core file object. - signature_file (Any, optional): The uploaded signature file object, if provided. - core_url (str, optional): The URL of the core file in the GitHub repository. - sig_url (str, optional): The URL of the signature file in the GitHub repository. - """ - vlnv_name: str - sanitized_name: str - core_file: any - signature_file: any = None - core_url: str = None - sig_url: str = None - - @property - def core_file_name(self): - """Returns the filename for the core file.""" - return f'{self.sanitized_name}.core' - - @property - def signature_file_name(self): - """Returns the filename for the signature file.""" - return f'{self.sanitized_name}.core.sig' - - def read_core_content(self): - """Reads and decodes the core file content as UTF-8.""" - self.core_file.seek(0) - return self.core_file.read().decode('utf-8') - - def read_signature_content(self): - """Reads and decodes the signature file content as UTF-8, if present.""" - if self.signature_file: - self.signature_file.seek(0) - return self.signature_file.read().decode('utf-8') - return None + file_obj = request.data.get('core_file') + if not file_obj: + return Response({'error': 'No core file provided'}, status=status.HTTP_400_BAD_REQUEST) serializer = CoreSerializer(data=request.data) if serializer.is_valid(): - vlnv_name = serializer.validated_data['vlnv_name'] - # Check if a core with this VLNV already exists in the database if CorePackage.objects.filter(vlnv_name=vlnv_name).exists(): return Response( {'error': f'Core \'{vlnv_name}\' already exists in FuseSoC Package Directory.'}, status=status.HTTP_409_CONFLICT ) - core_data = CoreData( - vlnv_name = serializer.validated_data['vlnv_name'], - core_file = serializer.validated_data['core_file'], - sanitized_name = serializer.validated_data['sanitized_name'], - signature_file = serializer.validated_data.get('signature_file') - ) - - # Initialize GitHub client - g = Github(auth=GitHubAuthToken(os.getenv('GITHUB_ACCESS_TOKEN'))) - repo = g.get_repo(os.getenv('GITHUB_REPO')) - - # Read and encode the core file content - encoded_core_content = core_data.read_core_content() - + # Save new core in DB (this will upload files via the storage backend) try: - # Try to get the core from the repository - _ = repo.get_contents(core_data.core_file_name) - # The core already exists -> do not create again + serializer.save() + return Response( - {'message': f'Core \'{core_data.vlnv_name}\' already exists in FuseSoC Package Directory.'}, - status=status.HTTP_409_CONFLICT + { + 'message': 'Core published successfully', + }, + status=status.HTTP_201_CREATED + ) + except (IntegrityError, DatabaseError, DRFValidationError) as e: + return Response( + {'error': f'Error saving core: {str(e)}'}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + except (OSError, IOError) as e: + return Response( + {'error': f'Unexpected error: {str(e)}'}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR ) - except (UnknownObjectException, IndexError, GithubException): - try: - # If the core does not exist, create it - result = repo.create_file( - core_data.core_file_name, - f'Add FuseSoC core {core_data.vlnv_name}', - encoded_core_content, - branch='main') - - # Get core url from GitHub and add core to database - serializer.validated_data['core_url'] = result['content'].download_url - - # Handle the optional signature file - if encoded_signature_content := core_data.read_signature_content(): - result = repo.create_file( - core_data.signature_file_name, - f'Add signature for {core_data.vlnv_name}', - encoded_signature_content, - branch='main' - ) - - serializer.validated_data['sig_url'] = result['content'].download_url - - # Save new core in DB - serializer.save() - - return Response( - {'message': 'Core published successfully'}, - status=status.HTTP_201_CREATED - ) - except GithubException as err: - # Handle specific GitHub API errors - return Response( - {'error': f'GitHub error: {err.data}'}, - status=status.HTTP_500_INTERNAL_SERVER_ERROR - ) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) class Validate(APIView): @@ -350,7 +268,7 @@ def post(self, request, *args, **kwargs): """ file_obj = request.data.get('core_file') if not file_obj: - return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) + return Response({'error': 'No core file provided'}, status=status.HTTP_400_BAD_REQUEST) serializer = CoreSerializer(data=request.data) if serializer.is_valid(): diff --git a/project/settings.py b/project/settings.py index b385af1..5f85da9 100644 --- a/project/settings.py +++ b/project/settings.py @@ -174,6 +174,16 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' +# Default file storage backend +# https://docs.djangoproject.com/en/stable/ref/settings/#default-file-storage +STORAGES = { + "default": { + "BACKEND": "core_directory.storages.github.GitHubStorage", + }, + "staticfiles": { + "BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage", + }, +} REST_FRAMEWORK = { 'DEFAULT_SCHEMA_CLASS': 'drf_spectacular.openapi.AutoSchema', diff --git a/project/settings_test.py b/project/settings_test.py new file mode 100644 index 0000000..2bbe655 --- /dev/null +++ b/project/settings_test.py @@ -0,0 +1,28 @@ +# pylint: skip-file +# flake8: noqa +""" +Django settings for running tests. + +This settings module overrides certain production settings to ensure that +tests run quickly, safely, and without side effects. In particular, it sets +DEFAULT_FILE_STORAGE to use DummyStorage so that no files are written to disk +or external services during tests. + +Usage: + Set DJANGO_SETTINGS_MODULE=project.settings_test when running tests, + or configure pytest.ini accordingly. + +Note: + Linting is disabled for this file to avoid warnings about unused imports + or test-specific overrides. +""" +from .settings import * + +STORAGES = { + "default": { + "BACKEND": "core_directory.storages.dummy_storage.DummyStorage", + }, + "staticfiles": { + "BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage", + }, +} diff --git a/pytest.ini b/pytest.ini index 43883f7..2370b4d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,3 @@ [pytest] -DJANGO_SETTINGS_MODULE = project.settings +DJANGO_SETTINGS_MODULE = project.settings_test python_files = tests.py test_*.py *_tests.py \ No newline at end of file diff --git a/utils/files.py b/utils/files.py new file mode 100644 index 0000000..d20d3c6 --- /dev/null +++ b/utils/files.py @@ -0,0 +1,20 @@ +""" +Utility functions for file and storage handling in the FuseSoC package database. + +Includes helpers for working with Django FileFields and storage backends, +such as checking for file existence and avoiding duplicate uploads. +""" + +from django.core.files.storage import default_storage + +def filefield_value_for_storage(filename, fileobj): + """ + Returns the correct value to assign to a FileField: + - If the file exists in storage, returns the filename (string). + - If not, returns the file object (triggers upload). + """ + if fileobj is None: + return None + if default_storage.exists(filename): + return filename + return fileobj diff --git a/utils/tests/test_files.py b/utils/tests/test_files.py new file mode 100644 index 0000000..ca7fcab --- /dev/null +++ b/utils/tests/test_files.py @@ -0,0 +1,37 @@ +import pytest +from unittest import mock + +from django.core.files.base import ContentFile + +from utils.files import filefield_value_for_storage + +def test_returns_none_if_fileobj_is_none(): + assert filefield_value_for_storage("foo.txt", None) is None + +def test_returns_filename_if_exists(monkeypatch): + # Patch default_storage.exists to return True + monkeypatch.setattr("django.core.files.storage.default_storage.exists", lambda name: True) + fileobj = ContentFile(b"dummy", name="foo.txt") + result = filefield_value_for_storage("foo.txt", fileobj) + assert result == "foo.txt" + +def test_returns_fileobj_if_not_exists(monkeypatch): + # Patch default_storage.exists to return False + monkeypatch.setattr("django.core.files.storage.default_storage.exists", lambda name: False) + fileobj = ContentFile(b"dummy", name="foo.txt") + result = filefield_value_for_storage("foo.txt", fileobj) + assert result is fileobj + +def test_returns_fileobj_even_if_filename_differs(monkeypatch): + # Patch default_storage.exists to return False + monkeypatch.setattr("django.core.files.storage.default_storage.exists", lambda name: False) + fileobj = ContentFile(b"dummy", name="bar.txt") + result = filefield_value_for_storage("foo.txt", fileobj) + assert result is fileobj + +def test_returns_filename_even_if_fileobj_given(monkeypatch): + # Patch default_storage.exists to return True + monkeypatch.setattr("django.core.files.storage.default_storage.exists", lambda name: True) + fileobj = ContentFile(b"dummy", name="bar.txt") + result = filefield_value_for_storage("foo.txt", fileobj) + assert result == "foo.txt"