334 lines
11 KiB
Python
334 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Manage WPT case status transitions in tests/external/wpt/wpt_manifest.toml."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
DEFAULT_MANIFEST = PROJECT_ROOT / "tests" / "external" / "wpt" / "wpt_manifest.toml"
|
|
|
|
CASE_START = "[[case]]"
|
|
KEY_VALUE_RE = re.compile(r'^(\s*)([A-Za-z_][A-Za-z0-9_]*)\s*=\s*"(.*)"\s*$')
|
|
|
|
|
|
@dataclass
|
|
class CaseBlock:
|
|
start: int
|
|
end: int
|
|
id_value: str | None
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Manage WPT known-fail status in manifest")
|
|
parser.add_argument(
|
|
"--manifest",
|
|
type=Path,
|
|
default=DEFAULT_MANIFEST,
|
|
help=f"Path to manifest (default: {DEFAULT_MANIFEST})",
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest="cmd", required=True)
|
|
|
|
promote = subparsers.add_parser("promote", help="Promote known_fail cases to pass")
|
|
promote.add_argument("--id", dest="ids", action="append", required=True, help="Case id")
|
|
promote.add_argument("--dry-run", action="store_true", help="Show changes without writing")
|
|
|
|
demote = subparsers.add_parser("demote", help="Demote pass cases to known_fail")
|
|
demote.add_argument("--id", dest="ids", action="append", required=True, help="Case id")
|
|
demote.add_argument("--reason", required=True, help="Reason for known_fail")
|
|
demote.add_argument(
|
|
"--allow-skip-to-known-fail",
|
|
action="store_true",
|
|
help="Allow skip -> known_fail transitions",
|
|
)
|
|
demote.add_argument("--dry-run", action="store_true", help="Show changes without writing")
|
|
|
|
show = subparsers.add_parser("show", help="Show status for specific cases")
|
|
show.add_argument("--id", dest="ids", action="append", required=True, help="Case id")
|
|
|
|
list_cmd = subparsers.add_parser("list", help="List cases by status")
|
|
list_cmd.add_argument(
|
|
"--status",
|
|
choices=["pass", "known_fail", "skip"],
|
|
required=True,
|
|
help="Status to list",
|
|
)
|
|
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def split_case_blocks(lines: list[str]) -> list[CaseBlock]:
|
|
starts = [idx for idx, line in enumerate(lines) if line.strip() == CASE_START]
|
|
blocks: list[CaseBlock] = []
|
|
for idx, start in enumerate(starts):
|
|
end = starts[idx + 1] if idx + 1 < len(starts) else len(lines)
|
|
case_id = extract_case_id(lines[start:end])
|
|
blocks.append(CaseBlock(start=start, end=end, id_value=case_id))
|
|
return blocks
|
|
|
|
|
|
def parse_key_value_line(line: str) -> tuple[str, str, str] | None:
|
|
match = KEY_VALUE_RE.match(line.rstrip("\n"))
|
|
if not match:
|
|
return None
|
|
indent, key, value = match.groups()
|
|
return indent, key, value
|
|
|
|
|
|
def extract_case_id(block_lines: Iterable[str]) -> str | None:
|
|
for line in block_lines:
|
|
parsed = parse_key_value_line(line)
|
|
if parsed is None:
|
|
continue
|
|
_indent, key, value = parsed
|
|
if key == "id":
|
|
return value
|
|
return None
|
|
|
|
|
|
def get_case_field(block_lines: list[str], field: str) -> tuple[int | None, str | None, str]:
|
|
for idx, line in enumerate(block_lines):
|
|
parsed = parse_key_value_line(line)
|
|
if parsed is None:
|
|
continue
|
|
indent, key, value = parsed
|
|
if key == field:
|
|
return idx, value, indent
|
|
return None, None, ""
|
|
|
|
|
|
def quote_toml_string(value: str) -> str:
|
|
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
|
|
|
|
def set_field(block_lines: list[str], key: str, value: str, indent_hint: str = "") -> list[str]:
|
|
idx, _existing, indent = get_case_field(block_lines, key)
|
|
line = f"{indent or indent_hint}{key} = {quote_toml_string(value)}\n"
|
|
if idx is not None:
|
|
block_lines[idx] = line
|
|
return block_lines
|
|
|
|
status_idx, _status, status_indent = get_case_field(block_lines, "status")
|
|
insert_at = status_idx + 1 if status_idx is not None else len(block_lines)
|
|
block_lines.insert(insert_at, f"{status_indent or indent_hint}{key} = {quote_toml_string(value)}\n")
|
|
return block_lines
|
|
|
|
|
|
def remove_field(block_lines: list[str], key: str) -> list[str]:
|
|
return [
|
|
line
|
|
for line in block_lines
|
|
if not (parse_key_value_line(line) and parse_key_value_line(line)[1] == key)
|
|
]
|
|
|
|
|
|
def extract_case_status(block_lines: list[str]) -> str | None:
|
|
_idx, value, _indent = get_case_field(block_lines, "status")
|
|
return value
|
|
|
|
|
|
def index_cases_by_id(lines: list[str]) -> dict[str, CaseBlock]:
|
|
mapping: dict[str, CaseBlock] = {}
|
|
for block in split_case_blocks(lines):
|
|
if block.id_value is None:
|
|
continue
|
|
mapping[block.id_value] = block
|
|
return mapping
|
|
|
|
|
|
def dedup_preserve_order(values: Iterable[str]) -> list[str]:
|
|
seen: set[str] = set()
|
|
out: list[str] = []
|
|
for value in values:
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
out.append(value)
|
|
return out
|
|
|
|
|
|
def apply_promote(block_lines: list[str], case_id: str) -> tuple[list[str], str]:
|
|
status = extract_case_status(block_lines)
|
|
if status is None:
|
|
raise ValueError(f"case '{case_id}' is missing status")
|
|
if status == "pass":
|
|
return block_lines, "unchanged (already pass)"
|
|
if status == "skip":
|
|
raise ValueError(f"case '{case_id}' is skip; refusing skip -> pass transition")
|
|
if status != "known_fail":
|
|
raise ValueError(f"case '{case_id}' has unsupported status '{status}'")
|
|
|
|
_status_idx, _status_val, status_indent = get_case_field(block_lines, "status")
|
|
out = set_field(block_lines, "status", "pass", status_indent)
|
|
out = remove_field(out, "reason")
|
|
return out, "promoted known_fail -> pass"
|
|
|
|
|
|
def apply_demote(
|
|
block_lines: list[str],
|
|
case_id: str,
|
|
reason: str,
|
|
allow_skip_to_known_fail: bool,
|
|
) -> tuple[list[str], str]:
|
|
status = extract_case_status(block_lines)
|
|
if status is None:
|
|
raise ValueError(f"case '{case_id}' is missing status")
|
|
if status == "skip" and not allow_skip_to_known_fail:
|
|
raise ValueError(
|
|
f"case '{case_id}' is skip; refusing skip -> known_fail transition "
|
|
"(use --allow-skip-to-known-fail)"
|
|
)
|
|
if status not in ("pass", "known_fail", "skip"):
|
|
raise ValueError(f"case '{case_id}' has unsupported status '{status}'")
|
|
|
|
_status_idx, _status_val, status_indent = get_case_field(block_lines, "status")
|
|
out = set_field(block_lines, "status", "known_fail", status_indent)
|
|
out = set_field(out, "reason", reason, status_indent)
|
|
if status == "known_fail":
|
|
return out, "updated known_fail reason"
|
|
return out, f"demoted {status} -> known_fail"
|
|
|
|
|
|
def write_manifest_atomic(path: Path, content: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with tempfile.NamedTemporaryFile("w", delete=False, dir=path.parent, encoding="utf-8") as tmp:
|
|
tmp.write(content)
|
|
tmp_name = tmp.name
|
|
os.replace(tmp_name, path)
|
|
|
|
|
|
def run_mutating_command(args: argparse.Namespace) -> int:
|
|
manifest = args.manifest
|
|
lines = manifest.read_text(encoding="utf-8").splitlines(keepends=True)
|
|
case_map = index_cases_by_id(lines)
|
|
|
|
ids = dedup_preserve_order(args.ids)
|
|
missing = [case_id for case_id in ids if case_id not in case_map]
|
|
if missing:
|
|
for case_id in missing:
|
|
print(f"missing case id: {case_id}", file=sys.stderr)
|
|
return 1
|
|
|
|
replacements: dict[tuple[int, int], list[str]] = {}
|
|
failures: list[str] = []
|
|
actions: list[str] = []
|
|
|
|
for case_id in ids:
|
|
block = case_map[case_id]
|
|
original = lines[block.start:block.end]
|
|
try:
|
|
if args.cmd == "promote":
|
|
updated, action = apply_promote(original.copy(), case_id)
|
|
else:
|
|
updated, action = apply_demote(
|
|
original.copy(),
|
|
case_id,
|
|
args.reason,
|
|
args.allow_skip_to_known_fail,
|
|
)
|
|
except ValueError as err:
|
|
failures.append(str(err))
|
|
continue
|
|
replacements[(block.start, block.end)] = updated
|
|
actions.append(f"{case_id}: {action}")
|
|
|
|
for failure in failures:
|
|
print(failure, file=sys.stderr)
|
|
for action in actions:
|
|
print(action)
|
|
|
|
if failures:
|
|
return 1
|
|
|
|
if not replacements:
|
|
print("no changes")
|
|
return 0
|
|
|
|
new_lines: list[str] = []
|
|
cursor = 0
|
|
for start, end in sorted(replacements):
|
|
new_lines.extend(lines[cursor:start])
|
|
new_lines.extend(replacements[(start, end)])
|
|
cursor = end
|
|
new_lines.extend(lines[cursor:])
|
|
|
|
new_content = "".join(new_lines)
|
|
|
|
if getattr(args, "dry_run", False):
|
|
print("dry-run: manifest not written")
|
|
return 0
|
|
|
|
write_manifest_atomic(manifest, new_content)
|
|
print(f"updated manifest: {manifest}")
|
|
return 0
|
|
|
|
|
|
def run_show_command(args: argparse.Namespace) -> int:
|
|
lines = args.manifest.read_text(encoding="utf-8").splitlines(keepends=True)
|
|
case_map = index_cases_by_id(lines)
|
|
ids = dedup_preserve_order(args.ids)
|
|
missing = [case_id for case_id in ids if case_id not in case_map]
|
|
if missing:
|
|
for case_id in missing:
|
|
print(f"missing case id: {case_id}", file=sys.stderr)
|
|
return 1
|
|
|
|
for case_id in ids:
|
|
block = case_map[case_id]
|
|
block_lines = lines[block.start:block.end]
|
|
_status_idx, status, _status_indent = get_case_field(block_lines, "status")
|
|
_reason_idx, reason, _reason_indent = get_case_field(block_lines, "reason")
|
|
if status is None:
|
|
print(f"{case_id}: status=<missing>")
|
|
continue
|
|
if reason is None:
|
|
print(f"{case_id}: status={status}")
|
|
else:
|
|
print(f"{case_id}: status={status}; reason={reason}")
|
|
return 0
|
|
|
|
|
|
def run_list_command(args: argparse.Namespace) -> int:
|
|
lines = args.manifest.read_text(encoding="utf-8").splitlines(keepends=True)
|
|
blocks = split_case_blocks(lines)
|
|
found = 0
|
|
for block in blocks:
|
|
if block.id_value is None:
|
|
continue
|
|
block_lines = lines[block.start:block.end]
|
|
status = extract_case_status(block_lines)
|
|
if status == args.status:
|
|
print(block.id_value)
|
|
found += 1
|
|
print(f"count={found}", file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(argv if argv is not None else sys.argv[1:])
|
|
try:
|
|
if args.cmd in ("promote", "demote"):
|
|
return run_mutating_command(args)
|
|
if args.cmd == "show":
|
|
return run_show_command(args)
|
|
if args.cmd == "list":
|
|
return run_list_command(args)
|
|
print(f"unsupported command: {args.cmd}", file=sys.stderr)
|
|
return 2
|
|
except FileNotFoundError as err:
|
|
print(f"manifest not found: {err.filename}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|