b9874d0941
Add redbear-usb-storage-check in-guest binary that validates USB mass storage read and write I/O: discovers /scheme/disk/ devices, writes a test pattern to sector 2048, reads it back, verifies match, restores original content. Updates test-usb-storage-qemu.sh with write-proof verification step. Includes all accumulated Red Bear OS work: kernel patches, relibc patches, driver infrastructure, DRM/GPU, KDE recipes, firmware, validation tooling, build system hardening, and documentation.
190 lines
6.5 KiB
Bash
Executable File
190 lines
6.5 KiB
Bash
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path, PurePosixPath
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--release", required=True)
|
|
parser.add_argument("--force", action="store_true")
|
|
parser.add_argument("recipes", nargs="*")
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_manifest(manifest_path: Path):
|
|
if not manifest_path.exists():
|
|
raise FileNotFoundError(f"Release manifest not found: {manifest_path}")
|
|
with manifest_path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
|
|
|
|
def archive_path(project_root: Path, release_dir: Path, archive_name: str):
|
|
candidates = [
|
|
release_dir / "tarballs" / archive_name,
|
|
release_dir / "snapshots" / archive_name,
|
|
project_root / "sources/x86_64-unknown-redox" / archive_name,
|
|
]
|
|
for candidate in candidates:
|
|
if candidate.is_file():
|
|
return candidate
|
|
if archive_name.endswith("-patched.tar.gz"):
|
|
prefix = archive_name[: -len("-patched.tar.gz")]
|
|
search_dirs = [
|
|
release_dir / "tarballs",
|
|
release_dir / "snapshots",
|
|
project_root / "sources/x86_64-unknown-redox",
|
|
]
|
|
for search_dir in search_dirs:
|
|
if not search_dir.is_dir():
|
|
continue
|
|
matches = sorted(search_dir.glob(f"{prefix}*-patched.tar.gz"))
|
|
if matches:
|
|
return matches[0]
|
|
return None
|
|
|
|
|
|
def strip_components(archive: Path):
|
|
result = subprocess.run(
|
|
["tar", "tf", str(archive)],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
first_entry = next((line for line in result.stdout.splitlines() if line.strip()), "")
|
|
parts = [part for part in PurePosixPath(first_entry).parts if part]
|
|
if "source" not in parts:
|
|
return 0
|
|
return parts.index("source") + 1
|
|
|
|
|
|
def extract_archive(archive: Path, target_source_dir: Path):
|
|
if target_source_dir.exists() or target_source_dir.is_symlink():
|
|
if target_source_dir.is_dir() and not target_source_dir.is_symlink():
|
|
shutil.rmtree(target_source_dir)
|
|
else:
|
|
target_source_dir.unlink()
|
|
target_source_dir.mkdir(parents=True, exist_ok=True)
|
|
components = strip_components(archive)
|
|
command = ["tar", "xzf", str(archive), "-C", str(target_source_dir)]
|
|
if components > 0:
|
|
command.append(f"--strip-components={components}")
|
|
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
|
|
def make_relative_symlink(source_dir: Path, target_source_dir: Path):
|
|
if source_dir.exists() or source_dir.is_symlink():
|
|
if source_dir.is_dir() and not source_dir.is_symlink():
|
|
shutil.rmtree(source_dir)
|
|
else:
|
|
source_dir.unlink()
|
|
source_dir.parent.mkdir(parents=True, exist_ok=True)
|
|
relative_target = os.path.relpath(target_source_dir, source_dir.parent)
|
|
source_dir.symlink_to(relative_target)
|
|
|
|
|
|
def restore_recipe(
|
|
recipe_path: str,
|
|
entries: dict,
|
|
project_root: Path,
|
|
release_dir: Path,
|
|
force: bool,
|
|
seen: set,
|
|
stats: dict,
|
|
):
|
|
if recipe_path in seen:
|
|
return
|
|
seen.add(recipe_path)
|
|
|
|
entry = entries.get(recipe_path)
|
|
if entry is None:
|
|
print(f"ERROR: no manifest entry for {recipe_path}", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
return
|
|
|
|
source_dir = project_root / "recipes" / recipe_path / "source"
|
|
if source_dir.is_dir() and any(source_dir.iterdir()) and not force:
|
|
print(f"WARN: source exists: recipes/{recipe_path}/source/ (use --force to overwrite)")
|
|
stats["skipped"] += 1
|
|
return
|
|
|
|
entry_type = entry.get("type", "")
|
|
if entry_type == "same_as":
|
|
target = entry.get("target", "")
|
|
if not target:
|
|
print(f"ERROR: same_as entry missing target: {recipe_path}", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
return
|
|
restore_recipe(target, entries, project_root, release_dir, force, seen, stats)
|
|
target_source_dir = project_root / "recipes" / target / "source"
|
|
if not target_source_dir.is_dir() or not any(target_source_dir.iterdir()):
|
|
print(f"ERROR: same_as target missing after restore: {target}", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
return
|
|
print(f"==> restoring: {recipe_path}")
|
|
make_relative_symlink(source_dir, target_source_dir)
|
|
stats["restored"] += 1
|
|
return
|
|
|
|
archive_name = entry.get("archive", "")
|
|
if not archive_name:
|
|
if entry_type == "path":
|
|
print(f"WARN: local path source (no archive): {recipe_path}")
|
|
stats["skipped"] += 1
|
|
return
|
|
print(f"ERROR: no archive found for {recipe_path}", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
return
|
|
archive = archive_path(project_root, release_dir, archive_name)
|
|
if archive is None:
|
|
print(f"ERROR: archive missing for {recipe_path}: {archive_name}", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
return
|
|
|
|
print(f"==> restoring: {recipe_path}")
|
|
extract_archive(archive, source_dir)
|
|
if source_dir.is_dir() and any(source_dir.iterdir()):
|
|
stats["restored"] += 1
|
|
else:
|
|
print(f"ERROR: extraction failed: {recipe_path} (archive: {archive})", file=sys.stderr)
|
|
stats["failed"] += 1
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
project_root = Path(__file__).resolve().parents[2]
|
|
release_dir = project_root / "sources" / f"redbear-{args.release}"
|
|
manifest_path = release_dir / "manifest.json"
|
|
|
|
try:
|
|
manifest = load_manifest(manifest_path)
|
|
except FileNotFoundError as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
print(f"Run: ./local/scripts/provision-release.sh --release={args.release}", file=sys.stderr)
|
|
return 1
|
|
|
|
entries = manifest.get("entries", {})
|
|
recipe_paths = args.recipes or sorted(entries.keys())
|
|
stats = {"restored": 0, "skipped": 0, "failed": 0}
|
|
seen = set()
|
|
|
|
for recipe_path in recipe_paths:
|
|
restore_recipe(recipe_path, entries, project_root, release_dir, args.force, seen, stats)
|
|
|
|
print()
|
|
print("=========================================")
|
|
print(" Restore complete")
|
|
print(f" Restored: {stats['restored']}")
|
|
print(f" Skipped: {stats['skipped']}")
|
|
print(f" Failed: {stats['failed']}")
|
|
print("=========================================")
|
|
return 0 if stats["failed"] == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|