refactor(skills): slim AST diagnostic to single entry point

Trim ~600 LOC off the original contribution while keeping the same
operator-facing surface and detection coverage.

- Collapse three entry points (file / dir / bundle) into one
  ast_scan_path(path) that handles both files and directories.
- Drop AstFinding dataclass + severity field — replaced with plain
  (file, line, pattern_id, description) tuples. Severity ordering was
  display-only for a diagnostic that explicitly disclaims security
  verdicts, so the field added bookkeeping without earning its place.
- Replace Rich-markup formatter with plain text grouped by file.
- Drop the 'inspect --ast-deep' surface — same scanner, same output as
  'audit --deep', single CLI entry is enough. Operators audit after
  install; pre-install inspection signal isn't worth the second surface.
- Trim test file to the cases that earn their place: bypass payload,
  syntax error survival, RecursionError survival, false-positive guard
  (importer lookalike), literal-arg false-positive guard, non-.py
  ignored, directory recursion + cache-dir skipping, missing-path,
  getattr/__dict__ detection, formatter empty + populated.

Net: tools/skills_ast_audit.py 353 -> 133 LOC,
tests/tools/test_skills_ast_audit.py 299 -> 103 LOC, full diff
+704/-12 -> +264/-6. No change to tools/skills_guard.py — Skills Guard
verdicts remain untouched per SECURITY.md §2.4.
This commit is contained in:
teknium1 2026-05-23 16:36:37 -07:00 committed by Teknium
parent 7255050c99
commit 4254f7dd17
4 changed files with 175 additions and 609 deletions

View File

@ -12267,11 +12267,6 @@ Examples:
"inspect", help="Preview a skill without installing" "inspect", help="Preview a skill without installing"
) )
skills_inspect.add_argument("identifier", help="Skill identifier") skills_inspect.add_argument("identifier", help="Skill identifier")
skills_inspect.add_argument(
"--ast-deep",
action="store_true",
help="Run AST-level diagnostics on Python files before installing",
)
skills_list = skills_subparsers.add_parser("list", help="List installed skills") skills_list = skills_subparsers.add_parser("list", help="List installed skills")
skills_list.add_argument( skills_list.add_argument(

View File

@ -630,13 +630,8 @@ def do_install(identifier: str, category: str = "", force: bool = False,
c.print("[dim]Use /reset to start a new session now, or --now to activate immediately (invalidates prompt cache).[/]\n") c.print("[dim]Use /reset to start a new session now, or --now to activate immediately (invalidates prompt cache).[/]\n")
def do_inspect(identifier: str, console: Optional[Console] = None, def do_inspect(identifier: str, console: Optional[Console] = None) -> None:
ast_deep: bool = False) -> None: """Preview a skill's SKILL.md content without installing."""
"""Preview a skill's SKILL.md content without installing.
When ``ast_deep=True``, also runs AST-level diagnostics against Python
files in the fetched bundle before installation.
"""
from tools.skills_hub import GitHubAuth, create_source_router from tools.skills_hub import GitHubAuth, create_source_router
c = console or _console c = console or _console
@ -682,11 +677,6 @@ def do_inspect(identifier: str, console: Optional[Console] = None,
preview += f"\n\n... ({len(lines) - 50} more lines)" preview += f"\n\n... ({len(lines) - 50} more lines)"
c.print(Panel(preview, title="SKILL.md Preview", subtitle="hermes skills install <id> to install")) c.print(Panel(preview, title="SKILL.md Preview", subtitle="hermes skills install <id> to install"))
if bundle and ast_deep:
from tools.skills_ast_audit import ast_scan_bundle_files, format_ast_report
ast_findings = ast_scan_bundle_files(bundle.files)
c.print(format_ast_report(ast_findings, skill_name=meta.name))
c.print() c.print()
@ -920,8 +910,9 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None,
deep: bool = False) -> None: deep: bool = False) -> None:
"""Re-run security scan on installed hub skills. """Re-run security scan on installed hub skills.
When ``deep=True``, also runs AST-level analysis on Python files When ``deep=True``, also runs an opt-in AST-level diagnostic on Python
(opt-in diagnostic, not a security gate). files (review aid only not a security gate; skills_guard.py verdicts
are unchanged).
""" """
from tools.skills_hub import HubLockFile, SKILLS_DIR from tools.skills_hub import HubLockFile, SKILLS_DIR
from tools.skills_guard import scan_skill, format_scan_report from tools.skills_guard import scan_skill, format_scan_report
@ -943,9 +934,8 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None,
c.print(f"\n[bold]Auditing {len(targets)} skill(s)...[/]\n") c.print(f"\n[bold]Auditing {len(targets)} skill(s)...[/]\n")
ast_scan_skill = format_ast_report = None
if deep: if deep:
from tools.skills_ast_audit import ast_scan_skill, format_ast_report from tools.skills_ast_audit import ast_scan_path, format_ast_report
for entry in targets: for entry in targets:
skill_path = SKILLS_DIR / entry["install_path"] skill_path = SKILLS_DIR / entry["install_path"]
@ -957,8 +947,7 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None,
c.print(format_scan_report(result)) c.print(format_scan_report(result))
if deep: if deep:
ast_findings = ast_scan_skill(skill_path) c.print(format_ast_report(ast_scan_path(skill_path), skill_name=entry["name"]))
c.print(format_ast_report(ast_findings, skill_name=entry["name"]))
c.print() c.print()
@ -1356,7 +1345,7 @@ def skills_command(args) -> None:
skip_confirm=getattr(args, "yes", False), skip_confirm=getattr(args, "yes", False),
name_override=getattr(args, "name", "") or "") name_override=getattr(args, "name", "") or "")
elif action == "inspect": elif action == "inspect":
do_inspect(args.identifier, ast_deep=getattr(args, "ast_deep", False)) do_inspect(args.identifier)
elif action == "list": elif action == "list":
do_list( do_list(
source_filter=args.source, source_filter=args.source,
@ -1414,7 +1403,6 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
/skills install openai/skills/skill-creator --force /skills install openai/skills/skill-creator --force
/skills install https://example.com/path/SKILL.md /skills install https://example.com/path/SKILL.md
/skills inspect openai/skills/skill-creator /skills inspect openai/skills/skill-creator
/skills inspect openai/skills/skill-creator --ast-deep
/skills list /skills list
/skills list --source hub /skills list --source hub
/skills check /skills check
@ -1514,11 +1502,10 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
name_override=name_override, console=c) name_override=name_override, console=c)
elif action == "inspect": elif action == "inspect":
non_flag_args = [arg for arg in args if not arg.startswith("--")] if not args:
if not non_flag_args: c.print("[bold red]Usage:[/] /skills inspect <identifier>\n")
c.print("[bold red]Usage:[/] /skills inspect <identifier> [--ast-deep]\n")
return return
do_inspect(non_flag_args[0], console=c, ast_deep="--ast-deep" in args) do_inspect(args[0], console=c)
elif action == "list": elif action == "list":
source_filter = "all" source_filter = "all"

View File

@ -1,299 +1,103 @@
""" """Tests for tools.skills_ast_audit — opt-in AST diagnostic scanner."""
Tests for tools.skills_ast_audit the opt-in AST diagnostic scanner.
These tests verify detection of dynamic import/access patterns that can
bypass line-by-line regex scanning, without crashing on hostile or
pathological input.
"""
import sys import sys
from pathlib import Path from pathlib import Path
from tools.skills_ast_audit import ( from tools.skills_ast_audit import ast_scan_path, format_ast_report
AstFinding,
ast_scan_bundle_files,
ast_scan_file,
ast_scan_skill,
format_ast_report,
)
# --------------------------------------------------------------------------- def _pids(findings):
# Core detection tests return [pid for (_f, _l, pid, _d) in findings]
# ---------------------------------------------------------------------------
class TestAstScanPython: def test_bypass_payload_detected(tmp_path):
"""AST scanner detects dynamic import and access patterns.""" """The exact bypass shape from #7072 is caught."""
f = tmp_path / "exfil.py"
def test_importlib_import_module_detected(self, tmp_path): f.write_text(
"""importlib.import_module() calls are flagged.""" "import importlib\n"
f = tmp_path / "evil.py" "parts = ['o', 's']\n"
f.write_text("import importlib\nm = importlib.import_module('os')\n") "m = importlib.import_module(''.join(parts))\n"
findings = ast_scan_file(f) "e = m.__dict__[''.join(['e','n','v'])]\n"
pids = [f.pattern_id for f in findings] )
assert "ast_dynamic_import" in pids pids = _pids(ast_scan_path(f))
assert "ast_importlib_import" in pids assert "dynamic_import" in pids
assert "importlib_import" in pids
def test_importlib_submodule_import_detected(self, tmp_path): assert "dict_access" in pids
"""`import importlib.util` and similar submodules are flagged."""
f = tmp_path / "evil.py"
f.write_text("import importlib.util\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" in pids
def test_importlib_submodule_aliased_import_detected(self, tmp_path):
"""`import importlib.machinery as m` (aliased submodule) is flagged."""
f = tmp_path / "evil.py"
f.write_text("import importlib.machinery as m\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" in pids
def test_from_importlib_import_detected(self, tmp_path):
"""`from importlib import import_module` is flagged."""
f = tmp_path / "evil.py"
f.write_text("from importlib import import_module\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" in pids
def test_from_importlib_submodule_import_detected(self, tmp_path):
"""`from importlib.util import find_spec` is flagged."""
f = tmp_path / "evil.py"
f.write_text("from importlib.util import find_spec\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" in pids
def test_importer_lookalike_not_flagged(self, tmp_path):
"""`import importer` must NOT match — prefix check is dot-bounded."""
f = tmp_path / "ok.py"
f.write_text("import importer\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" not in pids
def test_from_importer_lookalike_not_flagged(self, tmp_path):
"""`from importer import something` must NOT match the importlib check."""
f = tmp_path / "ok.py"
f.write_text("from importer import something\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_importlib_import" not in pids
def test_dunder_import_with_computed_arg_detected(self, tmp_path):
"""__import__ with non-literal argument is flagged."""
f = tmp_path / "evil.py"
f.write_text("name = 'os'\nm = __import__(name)\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_dynamic_import_computed" in pids
def test_dunder_dict_computed_key_detected(self, tmp_path):
"""__dict__[<computed>] access is flagged."""
f = tmp_path / "evil.py"
f.write_text("key = 'environ'\nval = obj.__dict__[key]\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_dict_access" in pids
def test_getattr_with_computed_name_detected(self, tmp_path):
"""getattr(obj, computed_name) is flagged."""
f = tmp_path / "evil.py"
f.write_text("name = 'system'\nfn = getattr(os, name)\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_dynamic_getattr" in pids
def test_syntax_error_handled_gracefully(self, tmp_path):
"""Files with syntax errors should not crash the scanner."""
f = tmp_path / "bad.py"
f.write_text("def broken(\n")
findings = ast_scan_file(f)
assert isinstance(findings, list)
def test_literal_dunder_import_not_flagged_by_ast(self, tmp_path):
"""__import__('os') with literal string is NOT flagged by AST."""
f = tmp_path / "ok.py"
f.write_text("m = __import__('os')\n")
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_dynamic_import_computed" not in pids
def test_full_bypass_payload_now_detected(self, tmp_path):
"""The exact bypass payload from #7072 should now be caught."""
payload = """
import importlib
parts = ['o', 's']
m = importlib.import_module(''.join(parts))
e = m.__dict__[''.join(['e','n','v','i','r','o','n'])]
"""
f = tmp_path / "exfil.py"
f.write_text(payload)
findings = ast_scan_file(f)
pids = [f.pattern_id for f in findings]
assert "ast_dynamic_import" in pids
assert "ast_dict_access" in pids
assert "ast_importlib_import" in pids
def test_non_python_files_return_empty(self, tmp_path):
"""AST scan returns empty list for non-.py files."""
f = tmp_path / "script.sh"
f.write_text("import importlib\nimportlib.import_module('os')\n")
findings = ast_scan_file(f)
assert findings == []
def test_scan_handles_recursion_error_gracefully(self, tmp_path):
"""Deeply-nested expressions that blow the visitor recursion limit
must not crash the scan return whatever findings were collected so far."""
src = "a" + ".x" * 5000 + "\n"
f = tmp_path / "deep.py"
f.write_text(src)
original_limit = sys.getrecursionlimit()
sys.setrecursionlimit(200)
try:
findings = ast_scan_file(f)
finally:
sys.setrecursionlimit(original_limit)
assert isinstance(findings, list)
# --------------------------------------------------------------------------- def test_syntax_error_does_not_crash(tmp_path):
# Directory scanner tests f = tmp_path / "bad.py"
# --------------------------------------------------------------------------- f.write_text("def broken(\n")
assert ast_scan_path(f) == []
class TestAstScanSkill: def test_recursion_error_does_not_crash(tmp_path):
"""Directory-level scanning via ast_scan_skill().""" f = tmp_path / "deep.py"
f.write_text("a" + ".x" * 5000 + "\n")
def test_scans_all_py_files_in_tree(self, tmp_path): orig = sys.getrecursionlimit()
"""All .py files in a skill directory are scanned recursively.""" sys.setrecursionlimit(200)
skill = tmp_path / "my-skill" try:
skill.mkdir() result = ast_scan_path(f)
sub = skill / "subpkg" finally:
sub.mkdir() sys.setrecursionlimit(orig)
assert isinstance(result, list)
(skill / "main.py").write_text("import importlib\n")
(sub / "utils.py").write_text("import importlib.util\n")
findings = ast_scan_skill(skill)
pids = [f.pattern_id for f in findings]
# Both files should have importlib findings
assert pids.count("ast_importlib_import") == 2
def test_skips_ignored_dirs(self, tmp_path):
"""__pycache__, venv, .venv, and node_modules directories are skipped."""
skill = tmp_path / "my-skill"
skill.mkdir()
for dirname in ("__pycache__", "venv", ".venv", "node_modules"):
ignored = skill / dirname
ignored.mkdir()
(ignored / "cached.py").write_text("import importlib\n")
findings = ast_scan_skill(skill)
assert findings == []
def test_skips_non_existent_dir(self, tmp_path):
"""Non-existent directory returns empty list."""
findings = ast_scan_skill(Path("/nonexistent/skill/path"))
assert findings == []
def test_non_dir_path(self, tmp_path):
"""A file path (not a directory) returns empty list."""
f = tmp_path / "not_a_dir.py"
f.write_text("import importlib\n")
findings = ast_scan_skill(f)
assert findings == []
class TestAstScanBundleFiles: def test_importer_lookalike_not_flagged(tmp_path):
"""In-memory bundle scanning for pre-install inspect diagnostics.""" """`import importer` must NOT match — dot-bounded prefix."""
f = tmp_path / "ok.py"
def test_scans_python_files_from_bundle(self): f.write_text("import importer\nfrom importer import x\n")
"""Python files in source adapter bundle mappings are scanned.""" assert _pids(ast_scan_path(f)) == []
findings = ast_scan_bundle_files({
"SKILL.md": "---\nname: test\n---\n",
"scripts/run.py": "import importlib\n",
"references/readme.md": "import importlib\n",
})
assert [f.pattern_id for f in findings] == ["ast_importlib_import"]
assert findings[0].file == "scripts/run.py"
def test_decodes_bytes_bundle_content(self):
"""Bundle file content may be bytes; decode with replacement."""
findings = ast_scan_bundle_files({
"scripts/run.py": b"from importlib.util import find_spec\n",
})
assert [f.pattern_id for f in findings] == ["ast_importlib_import"]
def test_skips_bundle_cache_dirs(self):
"""Virtualenv/cache paths in a bundle are ignored."""
findings = ast_scan_bundle_files({
"venv/lib/run.py": "import importlib\n",
"__pycache__/cached.py": "import importlib\n",
})
assert findings == []
# --------------------------------------------------------------------------- def test_literal_dunder_import_not_flagged(tmp_path):
# Report formatting tests """__import__('os') with a literal is not flagged (regex catches those)."""
# --------------------------------------------------------------------------- f = tmp_path / "ok.py"
f.write_text("m = __import__('os')\n")
assert "dynamic_import_computed" not in _pids(ast_scan_path(f))
class TestFormatAstReport: def test_non_python_file_returns_empty(tmp_path):
"""Rich report formatting.""" f = tmp_path / "script.sh"
f.write_text("import importlib\n")
assert ast_scan_path(f) == []
def test_empty_findings(self):
"""Empty findings list produces a clean 'nothing found' message."""
report = format_ast_report([])
assert "No AST-level patterns detected" in report
def test_empty_with_skill_name(self): def test_directory_scans_recursively_and_skips_cache_dirs(tmp_path):
"""Report with skill name but no findings.""" skill = tmp_path / "s"
report = format_ast_report([], skill_name="test-skill") skill.mkdir()
assert "test-skill" in report (skill / "main.py").write_text("import importlib\n")
assert "No AST-level patterns detected" in report (skill / "sub").mkdir()
(skill / "sub" / "u.py").write_text("from importlib.util import find_spec\n")
for d in ("__pycache__", ".venv", "venv", "node_modules"):
ignored = skill / d
ignored.mkdir()
(ignored / "junk.py").write_text("import importlib\n")
pids = _pids(ast_scan_path(skill))
assert pids.count("importlib_import") == 2
def test_findings_grouped_by_file(self):
"""Findings from the same file appear together."""
findings = [
AstFinding(
pattern_id="ast_importlib_import",
severity="medium",
category="obfuscation",
file="main.py",
line=1,
match="import importlib",
description="importlib imported",
),
AstFinding(
pattern_id="ast_dynamic_import",
severity="high",
category="obfuscation",
file="main.py",
line=3,
match="importlib.import_module()",
description="dynamic import via importlib",
),
]
report = format_ast_report(findings)
assert "main.py" in report
assert "importlib imported" in report
assert "dynamic import via importlib" in report
assert "2 finding" in report # summary line
assert "Note: AST findings are diagnostic hints" in report
def test_severity_summary(self): def test_missing_path_returns_empty(tmp_path):
"""Report header includes severity counts.""" assert ast_scan_path(tmp_path / "does_not_exist") == []
findings = [
AstFinding("id1", "high", "x", "f.py", 1, "m", "desc"),
AstFinding("id2", "high", "x", "f.py", 2, "m", "desc"), def test_dynamic_getattr_and_dict_access_detected(tmp_path):
AstFinding("id3", "medium", "x", "f.py", 3, "m", "desc"), f = tmp_path / "g.py"
] f.write_text("name = 'x'\nv = getattr(o, name)\nv = o.__dict__[name]\n")
report = format_ast_report(findings) pids = _pids(ast_scan_path(f))
assert "2 high" in report assert "dynamic_getattr" in pids
assert "1 medium" in report assert "dict_access" in pids
def test_format_report_empty():
assert "No dynamic" in format_ast_report([])
def test_format_report_with_findings():
findings = [
("a.py", 1, "importlib_import", "import importlib — ..."),
("a.py", 3, "dynamic_import", "importlib.import_module() — ..."),
]
out = format_ast_report(findings, skill_name="test")
assert "test" in out and "a.py" in out and "L1" in out and "L3" in out
assert "diagnostic hints" in out

View File

@ -1,353 +1,133 @@
""" """
AST-level deep audit for skill Python files opt-in diagnostic, not a security gate. AST-level deep audit for skill Python files opt-in diagnostic, not a security gate.
This is a standalone diagnostic tool per SECURITY.md spirit: it helps operators Per SECURITY.md §2.4, Skills Guard is in-process heuristics ("useful — not
inspect skill code for patterns that *could* enable dynamic import/access boundaries"). This module is a separate opt-in diagnostic that flags dynamic
obfuscation, but it is NOT a security boundary. Every pattern flagged here has import / dynamic attribute access patterns operators may want to eyeball when
legitimate uses. Use your judgment. reviewing third-party skill code. Every pattern flagged here has legitimate
uses; findings are hints for human review, not verdicts.
Usage:: CLI: ``hermes skills audit --deep``
from tools.skills_ast_audit import ast_scan_skill, format_ast_report
findings = ast_scan_skill(Path("~/.hermes/skills/some-skill"))
if findings:
print(format_ast_report(findings))
CLI integration: ``hermes skills audit --deep``
""" """
from __future__ import annotations from __future__ import annotations
import ast import ast
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Mapping, List, Optional, Union from typing import List, Tuple
# (file, line, pattern_id, description)
Finding = Tuple[str, int, str, str]
_IGNORED_DIRS = {"__pycache__", ".venv", "venv", "node_modules"}
# --------------------------------------------------------------------------- def _scan_source(content: str, rel_path: str) -> List[Finding]:
# Data model
# ---------------------------------------------------------------------------
@dataclass
class AstFinding:
"""A single finding from AST-level analysis."""
pattern_id: str
"""Short identifier for deduplication and grouping (e.g. 'ast_importlib_import')."""
severity: str
"""One of 'high', 'medium', 'low' — for display only, not a security claim."""
category: str
"""Grouping label — currently always 'obfuscation'."""
file: str
"""Relative path to the file containing the finding."""
line: int
"""1-based line number."""
match: str
"""The matched source construct (human-readable snippet)."""
description: str
"""Why this pattern is worth reviewing."""
# ---------------------------------------------------------------------------
# Scanner
# ---------------------------------------------------------------------------
def _ast_scan_python(content: str, rel_path: str) -> List[AstFinding]:
"""Detect obfuscation via dynamic imports, attribute access, and string construction.
Hostile or pathological input (deeply-nested expressions, malformed source)
must not crash the scan. Both ``ast.parse`` and the visitor traversal are
guarded so parse/visit failures degrade gracefully to "no AST findings"
rather than raising.
"""
try: try:
tree = ast.parse(content) tree = ast.parse(content)
except (SyntaxError, ValueError, RecursionError): except (SyntaxError, ValueError, RecursionError):
return [] return []
findings: List[AstFinding] = [] findings: List[Finding] = []
class _Visitor(ast.NodeVisitor): class V(ast.NodeVisitor):
def visit_Call(self, node): def visit_Call(self, node):
# Detect importlib.import_module(...) f = node.func
if ( # importlib.import_module(...)
isinstance(node.func, ast.Attribute) if isinstance(f, ast.Attribute) and f.attr == "import_module":
and node.func.attr == "import_module" findings.append((rel_path, node.lineno, "dynamic_import",
): "importlib.import_module() — loads arbitrary modules at runtime"))
findings.append( # __import__(<computed>)
AstFinding( elif isinstance(f, ast.Name) and f.id == "__import__":
pattern_id="ast_dynamic_import",
severity="high",
category="obfuscation",
file=rel_path,
line=node.lineno,
match="importlib.import_module()",
description="dynamic import via importlib — can load arbitrary modules at runtime",
)
)
# Detect __import__ with non-literal argument
if isinstance(node.func, ast.Name) and node.func.id == "__import__":
if node.args and not isinstance(node.args[0], ast.Constant): if node.args and not isinstance(node.args[0], ast.Constant):
findings.append( findings.append((rel_path, node.lineno, "dynamic_import_computed",
AstFinding( "__import__ with non-literal module name"))
pattern_id="ast_dynamic_import_computed", # getattr(obj, <computed>)
severity="high", elif isinstance(f, ast.Name) and f.id == "getattr":
category="obfuscation", if len(node.args) >= 2 and not isinstance(node.args[1], ast.Constant):
file=rel_path, findings.append((rel_path, node.lineno, "dynamic_getattr",
line=node.lineno, "getattr with non-literal attribute name"))
match="__import__(<computed>)",
description="__import__ with dynamically constructed module name",
)
)
# Detect getattr with computed attribute name
if isinstance(node.func, ast.Name) and node.func.id == "getattr":
if len(node.args) >= 2 and not isinstance(
node.args[1], ast.Constant
):
findings.append(
AstFinding(
pattern_id="ast_dynamic_getattr",
severity="medium",
category="obfuscation",
file=rel_path,
line=node.lineno,
match="getattr(<obj>, <computed>)",
description="getattr with dynamically constructed attribute name",
)
)
self.generic_visit(node) self.generic_visit(node)
def visit_Subscript(self, node): def visit_Subscript(self, node):
# Detect obj.__dict__[<computed>] # obj.__dict__[<computed>]
if ( if (isinstance(node.value, ast.Attribute)
isinstance(node.value, ast.Attribute) and node.value.attr == "__dict__"
and node.value.attr == "__dict__" and not isinstance(node.slice, ast.Constant)):
): findings.append((rel_path, node.lineno, "dict_access",
if not isinstance(node.slice, ast.Constant): "__dict__[<computed>] — dynamic attribute access"))
findings.append(
AstFinding(
pattern_id="ast_dict_access",
severity="high",
category="obfuscation",
file=rel_path,
line=node.lineno,
match="__dict__[<computed>]",
description="dynamic attribute access via __dict__ with computed key",
)
)
self.generic_visit(node) self.generic_visit(node)
def visit_Import(self, node): def visit_Import(self, node):
# Flag importlib and any importlib.* submodule. for a in node.names:
for alias in node.names: if a.name == "importlib" or a.name.startswith("importlib."):
if alias.name == "importlib" or alias.name.startswith( findings.append((rel_path, node.lineno, "importlib_import",
"importlib." f"import {a.name} — enables dynamic module loading"))
):
findings.append(
AstFinding(
pattern_id="ast_importlib_import",
severity="medium",
category="obfuscation",
file=rel_path,
line=node.lineno,
match=f"import {alias.name}",
description="importlib imported — enables dynamic module loading",
)
)
self.generic_visit(node) self.generic_visit(node)
def visit_ImportFrom(self, node): def visit_ImportFrom(self, node):
module = node.module or "" m = node.module or ""
if module == "importlib" or module.startswith("importlib."): if m == "importlib" or m.startswith("importlib."):
findings.append( findings.append((rel_path, node.lineno, "importlib_import",
AstFinding( f"from {m} import ... — enables dynamic module loading"))
pattern_id="ast_importlib_import",
severity="medium",
category="obfuscation",
file=rel_path,
line=node.lineno,
match=f"from {module} import ...",
description="importlib imported — enables dynamic module loading",
)
)
self.generic_visit(node) self.generic_visit(node)
try: try:
_Visitor().visit(tree) V().visit(tree)
except (RecursionError, ValueError, RuntimeError): except (RecursionError, ValueError, RuntimeError):
# Visitor traversal can fail on hostile input even when ast.parse # Hostile/pathological input: return what we collected so far.
# succeeded (e.g. deeply-nested call/attribute chains). Return pass
# whatever findings we collected before the failure.
return findings
return findings return findings
def ast_scan_file(file_path: Path, rel_path: Optional[str] = None) -> List[AstFinding]: def ast_scan_path(path: Path) -> List[Finding]:
"""Scan a single Python file and return AST-level findings. """Scan a single .py file or recursively scan all .py under a directory.
Args: Returns a list of (file, line, pattern_id, description) tuples. Empty for
file_path: Absolute path to the .py file. non-Python paths, missing paths, or paths with no matching patterns.
rel_path: Relative path for display (defaults to file_path.name).
Returns:
List of :class:`AstFinding` empty if the file isn't Python or scan yields nothing.
""" """
if file_path.suffix.lower() != ".py": if path.is_file():
if path.suffix.lower() != ".py":
return []
try:
content = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return []
return _scan_source(content, path.name)
if not path.is_dir():
return [] return []
if rel_path is None: out: List[Finding] = []
rel_path = file_path.name for py in sorted(path.rglob("*.py")):
if set(py.parent.parts) & _IGNORED_DIRS:
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
except (OSError, UnicodeDecodeError):
return []
return _ast_scan_python(content, rel_path)
def ast_scan_skill(skill_path: Path) -> List[AstFinding]:
"""Recursively scan all Python files in a skill directory.
Args:
skill_path: Path to the installed skill directory.
Returns:
Combined list of :class:`AstFinding` across all .py files.
"""
if not skill_path.is_dir():
return []
all_findings: List[AstFinding] = []
for py_file in sorted(skill_path.rglob("*.py")):
# Skip __pycache__ and .venv/venv directories
parts = set(py_file.parent.parts)
if parts & {"__pycache__", ".venv", "venv", "node_modules"}:
continue continue
try: try:
rel = py_file.relative_to(skill_path).as_posix() content = py.read_text(encoding="utf-8", errors="replace")
except OSError:
continue
try:
rel = py.relative_to(path).as_posix()
except ValueError: except ValueError:
rel = py_file.name rel = py.name
all_findings.extend(ast_scan_file(py_file, rel)) out.extend(_scan_source(content, rel))
return out
return all_findings
def ast_scan_bundle_files( def format_ast_report(findings: List[Finding], skill_name: str = "") -> str:
files: Mapping[str, Union[str, bytes]], """Plain-text report (Rich-markup-free) grouped by file."""
) -> List[AstFinding]: header = f"AST deep scan: {skill_name}" if skill_name else "AST deep scan"
"""Scan Python files from an in-memory skill bundle.
This powers ``hermes skills inspect --ast-deep`` so operators can review
a skill before installing it. The input is the bundle's filename -> content
mapping, as returned by the skills hub source adapters.
"""
all_findings: List[AstFinding] = []
for rel_path, content in sorted(files.items()):
path = Path(rel_path)
if path.suffix.lower() != ".py":
continue
if set(path.parts) & {"__pycache__", ".venv", "venv", "node_modules"}:
continue
if isinstance(content, bytes):
text = content.decode("utf-8", errors="replace")
else:
text = str(content)
all_findings.extend(_ast_scan_python(text, rel_path))
return all_findings
# ---------------------------------------------------------------------------
# Rich formatting
# ---------------------------------------------------------------------------
def format_ast_report(
findings: List[AstFinding],
skill_name: str = "",
) -> str:
"""Format AST findings as a Rich-markup string.
Args:
findings: List of findings from :func:`ast_scan_skill`.
skill_name: Optional skill name for the report header.
Returns:
Rich-markup string suitable for ``console.print()``.
"""
if not findings: if not findings:
header = ( return f"{header}\n No dynamic import/access patterns detected."
f"[bold]AST Deep Scan: {skill_name}[/]"
if skill_name
else "[bold]AST Deep Scan[/]"
)
return f"{header}\n[dim green]No AST-level patterns detected.[/]"
lines: List[str] = [] lines = [header, f" {len(findings)} finding(s):"]
severity_order = {"high": 0, "medium": 1, "low": 2} current = None
findings_sorted = sorted( for f, line, pid, desc in sorted(findings):
findings, if f != current:
key=lambda f: ( current = f
severity_order.get(f.severity, 99), lines.append(f" {f}")
f.file, lines.append(f" L{line} {pid}{desc}")
f.line,
),
)
if skill_name:
lines.append(f"[bold]AST Deep Scan: {skill_name}[/]")
else:
lines.append("[bold]AST Deep Scan[/]")
total = len(findings_sorted)
high_count = sum(1 for f in findings_sorted if f.severity == "high")
med_count = sum(1 for f in findings_sorted if f.severity == "medium")
low_count = sum(1 for f in findings_sorted if f.severity == "low")
summary_parts = []
if high_count:
summary_parts.append(f"[bold red]{high_count} high[/]")
if med_count:
summary_parts.append(f"[yellow]{med_count} medium[/]")
if low_count:
summary_parts.append(f"[dim]{low_count} low[/]")
lines.append(
f"[dim]{total} finding(s)[/] — "
+ ", ".join(summary_parts)
if summary_parts
else f"[dim]{total} finding(s)[/]"
)
lines.append("") lines.append("")
lines.append(" Note: diagnostic hints for human review, not security verdicts.")
current_file = None
for f in findings_sorted:
if f.file != current_file:
current_file = f.file
lines.append(f" [bold cyan]{f.file}[/]")
sev_color = {"high": "bold red", "medium": "yellow", "low": "dim"}.get(
f.severity, "dim"
)
lines.append(
f" L{f.line:>4} [{sev_color}]{f.severity:6}[/] {f.description}"
)
lines.append(f" [dim]{f.match}[/]")
lines.append("")
lines.append(
"[dim]Note: AST findings are diagnostic hints, not security verdicts. "
"Review each pattern in context.[/]"
)
return "\n".join(lines) return "\n".join(lines)