Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions autograder/autograder.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def build_pipeline(
StepName.SANDBOX,
StepName.PRE_FLIGHT,
StepName.AI_BATCH,
StepName.STRUCTURAL_ANALYSIS,
StepName.GRADE,
StepName.FOCUS,
StepName.FEEDBACK,
Expand Down
9 changes: 8 additions & 1 deletion autograder/models/abstract/test_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod
from typing import List, Optional
from typing import List, Optional, Type

from pydantic import BaseModel

from autograder.models.dataclass.submission import SubmissionFile
from autograder.models.dataclass.test_result import TestResult
Expand All @@ -12,6 +14,11 @@ class TestFunction(ABC):
An abstract base class for a single, executable test function.
"""

@property
def config_schema(self) -> Optional[Type[BaseModel]]:
"""Optional Pydantic model to validate test parameters during tree building."""
return None

@property
@abstractmethod
def name(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions autograder/models/dataclass/step_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StepName(Enum):
PRE_FLIGHT = "PreFlightStep"
SANDBOX = "SandboxStep"
AI_BATCH = "AiBatchStep"
STRUCTURAL_ANALYSIS = "StructuralAnalysisStep"
GRADE = "GradeStep"
FOCUS = "FocusStep"
FEEDBACK = "FeedbackStep"
Expand Down
16 changes: 16 additions & 0 deletions autograder/models/dataclass/structural_analysis_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass
from typing import Dict, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from ast_grep_py import SgRoot

@dataclass
class StructuralAnalysisResult:
"""
Holds the results of structural analysis for a submission.

Attributes:
roots: A dictionary mapping filenames to their corresponding ast-grep root nodes.
If a file could not be parsed, the value is None.
"""
roots: Dict[str, Optional['SgRoot']]
13 changes: 13 additions & 0 deletions autograder/models/pipeline_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from autograder.models.criteria_tree import CriteriaTree
from autograder.models.dataclass.focus import Focus
from autograder.models.dataclass.grade_step_result import GradeStepResult
from autograder.models.dataclass.structural_analysis_result import StructuralAnalysisResult
from autograder.models.result_tree import ResultTree
from sandbox_manager.sandbox_container import SandboxContainer

Expand Down Expand Up @@ -116,6 +117,18 @@ def get_built_criteria_tree(self) -> "CriteriaTree":
self._require_step_data(StepName.BUILD_TREE, "criteria tree"),
)

def get_structural_analysis_result(self) -> Optional["StructuralAnalysisResult"]:
"""
Retrieves the StructuralAnalysisResult object if it was produced during the pipeline.
"""
if not self.has_step_result(StepName.STRUCTURAL_ANALYSIS):
return None
from autograder.models.dataclass.structural_analysis_result import StructuralAnalysisResult
return cast(
StructuralAnalysisResult,
self._require_step_data(StepName.STRUCTURAL_ANALYSIS, "structural analysis result"),
Comment on lines +126 to +129

Copilot AI Apr 25, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the STRUCTURAL_ANALYSIS step ran but produced data=None (e.g., StepResult.fail), _require_step_data() will raise here. Consider checking the step result’s status/data and returning None when it’s not SUCCESS so later steps can proceed safely.

Suggested change
from autograder.models.dataclass.structural_analysis_result import StructuralAnalysisResult
return cast(
StructuralAnalysisResult,
self._require_step_data(StepName.STRUCTURAL_ANALYSIS, "structural analysis result"),
step_result = self.get_step_result(StepName.STRUCTURAL_ANALYSIS)
if step_result.status != StepStatus.SUCCESS or step_result.data is None:
return None
from autograder.models.dataclass.structural_analysis_result import StructuralAnalysisResult
return cast(
StructuralAnalysisResult,
step_result.data,

Copilot uses AI. Check for mistakes.
)

def get_sandbox(self) -> Optional["SandboxContainer"]:
"""
Retrieves the SandboxContainer object if it was created during the pipeline.
Expand Down
17 changes: 14 additions & 3 deletions autograder/services/criteria_tree_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from typing import List, Optional

from pydantic import ValidationError

from autograder.models.abstract.template import Template
from autograder.models.abstract.test_function import TestFunction
from autograder.models.config.category import CategoryConfig
Expand Down Expand Up @@ -99,17 +101,26 @@ def __parse_test(self, config: TestConfig) -> TestNode:
raise ValueError(f"Couldn't find test function '{function_name}'")

file_target = [config.file] if config.file else None
test_params = config.get_kwargs_dict() or {}

# Perform early validation if the test function provides a schema.
if test_function.config_schema:
try:
# We use model_validate to leverage Pydantic's validation logic.
test_function.config_schema(**test_params)

Copilot AI Apr 25, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says this uses Pydantic’s model_validate, but the code is instantiating the model via config_schema(**test_params). Either update the comment to match the implementation or switch to calling model_validate (Pydantic v2) for clarity/consistency.

Suggested change
test_function.config_schema(**test_params)
test_function.config_schema.model_validate(test_params)

Copilot uses AI. Check for mistakes.
except ValidationError as e:
raise ValueError(
f"Invalid parameters for test '{config.name}' ({function_name}): {e}"
) from e

test = TestNode(
config.name,
test_function,
config.get_kwargs_dict() or {},
test_params,
file_target,
config.weight if config.weight is not None else 100.0,
)



return test

def __parse_category(self, category_name, config: CategoryConfig) -> CategoryNode:
Expand Down
14 changes: 14 additions & 0 deletions autograder/services/grader_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def grade_from_tree(
submission_language=None,
locale: str = "en",
pre_computed_results: Optional[Dict[str, TestResult]] = None,
structural_analysis=None,
) -> ResultTree:
"""Traverse the generic built criteria tree to resolve inputs, grades and report to ResultTree."""
base_result = self.process_category(
Expand All @@ -43,6 +44,7 @@ def grade_from_tree(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
root = RootResultNode(name="root", base=base_result)

Expand All @@ -54,6 +56,7 @@ def grade_from_tree(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
root.bonus = bonus_result

Expand All @@ -65,6 +68,7 @@ def grade_from_tree(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
root.penalty = penalty_result

Expand Down Expand Up @@ -104,6 +108,7 @@ def __process_holder(
submission_language=None,
locale: str = "en",
pre_computed_results: Optional[Dict[str, TestResult]] = None,
structural_analysis=None,
) -> CategoryResultNode | SubjectResultNode:
"""Process a category or subject node and create corresponding result node."""

Expand All @@ -128,6 +133,7 @@ def __process_holder(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
for inner_subject in holder.subjects
]
Expand All @@ -144,6 +150,7 @@ def __process_holder(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
for test in holder.tests
]
Expand Down Expand Up @@ -174,6 +181,7 @@ def process_subject(
submission_language=None,
locale: str = "en",
pre_computed_results: Optional[Dict[str, TestResult]] = None,
structural_analysis=None,
) -> SubjectResultNode:
"""Process a subject node from criteria tree and create result node."""
return self.__process_holder(
Expand All @@ -183,6 +191,7 @@ def process_subject(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)

def process_test(
Expand All @@ -193,6 +202,7 @@ def process_test(
submission_language=None,
locale: str = "en",
pre_computed_results: Optional[Dict[str, TestResult]] = None,
structural_analysis=None,
) -> TestResultNode:
"""Execute a test and create a test result node.

Expand Down Expand Up @@ -230,6 +240,8 @@ def process_test(
sandbox=sandbox,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
submission_language=submission_language,
**test_params,
)
return TestResultNode(
Expand Down Expand Up @@ -274,6 +286,7 @@ def process_category(
submission_language=None,
locale: str = "en",
pre_computed_results: Optional[Dict[str, TestResult]] = None,
structural_analysis=None,
) -> CategoryResultNode:
"""Process a category node from criteria tree and create result node."""
return self.__process_holder(
Expand All @@ -283,4 +296,5 @@ def process_category(
submission_language=submission_language,
locale=locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)
3 changes: 3 additions & 0 deletions autograder/steps/grade_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ def _execute(self, pipeline_exec: PipelineExecution) -> PipelineExecution:
if pipeline_exec.has_step_result(StepName.AI_BATCH):
pre_computed_results = pipeline_exec.get_step_result(StepName.AI_BATCH).data

structural_analysis = pipeline_exec.get_structural_analysis_result()

result_tree = self._grader_service.grade_from_tree(
criteria_tree=criteria_tree,
submission_files=pipeline_exec.submission.submission_files,
sandbox=sandbox,
submission_language=pipeline_exec.submission.language,
locale=pipeline_exec.locale,
pre_computed_results=pre_computed_results,
structural_analysis=structural_analysis,
)

# Create grading result
Expand Down
5 changes: 5 additions & 0 deletions autograder/steps/step_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from autograder.steps.pre_flight_step import PreFlightStep
from autograder.steps.sandbox_step import SandboxStep
from autograder.steps.ai_batch_step import AiBatchStep
from autograder.steps.structural_analysis_step import StructuralAnalysisStep
from autograder.steps.grade_step import GradeStep
from autograder.steps.focus_step import FocusStep
from autograder.steps.feedback_step import FeedbackStep
Expand Down Expand Up @@ -37,6 +38,7 @@ def __init__(self, config: Dict[str, Any]):
StepName.PRE_FLIGHT: self._build_pre_flight,
StepName.SANDBOX: self._build_sandbox,
StepName.AI_BATCH: self._build_ai_batch,
StepName.STRUCTURAL_ANALYSIS: self._build_structural_analysis,
StepName.GRADE: self._build_grade,
StepName.FOCUS: self._build_focus,
StepName.FEEDBACK: self._build_feedback,
Expand Down Expand Up @@ -65,6 +67,9 @@ def _build_sandbox(self) -> Optional[Step]:
def _build_ai_batch(self) -> Optional[Step]:
return AiBatchStep()

def _build_structural_analysis(self) -> Optional[Step]:
return StructuralAnalysisStep()

def _build_grade(self) -> Optional[Step]:
return GradeStep()

Expand Down
73 changes: 73 additions & 0 deletions autograder/steps/structural_analysis_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import logging
from typing import Dict, Optional

from autograder.models.abstract.step import Step
from autograder.models.pipeline_execution import PipelineExecution
from autograder.models.dataclass.step_result import StepResult, StepName
from autograder.models.dataclass.structural_analysis_result import StructuralAnalysisResult
from sandbox_manager.models.sandbox_models import Language

logger = logging.getLogger(__name__)

try:
from ast_grep_py import SgRoot
except ImportError:
SgRoot = None

class StructuralAnalysisStep(Step):
"""
Parses submission files into ast-grep SgRoot objects.
This enables structural pattern matching in subsequent grading steps.
"""

@property
def step_name(self) -> StepName:
return StepName.STRUCTURAL_ANALYSIS

def _execute(self, pipeline_exec: PipelineExecution) -> PipelineExecution:
submission = pipeline_exec.submission
language = submission.language

if not language:
logger.warning("No language specified for submission; skipping structural analysis.")
return pipeline_exec.add_step_result(StepResult.success(self.step_name, StructuralAnalysisResult(roots={})))

if SgRoot is None:
logger.error("ast-grep-py is not installed; structural analysis will be skipped.")
return pipeline_exec.add_step_result(StepResult.fail(self.step_name, "ast-grep-py not installed"))
Comment thread
jaoppb marked this conversation as resolved.

ast_grep_lang = self._map_language(language)
if not ast_grep_lang:
logger.warning(f"Language {language.value} is not supported by ast-grep; skipping.")
return pipeline_exec.add_step_result(StepResult.success(self.step_name, StructuralAnalysisResult(roots={})))

roots: Dict[str, Optional[SgRoot]] = {}
for filename, sub_file in submission.submission_files.items():
# Only parse files that likely contain code
if not self._is_code_file(filename):
continue

try:
roots[filename] = SgRoot(sub_file.content, ast_grep_lang)
except Exception as e:
logger.warning(f"Failed to parse {filename} with ast-grep: {e}")
roots[filename] = None

result = StructuralAnalysisResult(roots=roots)
return pipeline_exec.add_step_result(StepResult.success(self.step_name, result))

def _map_language(self, language: Language) -> Optional[str]:
mapping = {
Language.PYTHON: "python",
Language.JAVA: "java",
Language.NODE: "javascript",
Language.CPP: "cpp",
Language.C: "c",
}
return mapping.get(language)

def _is_code_file(self, filename: str) -> bool:
"""Heuristic to avoid parsing non-code files."""
# Common binary/config/doc extensions to ignore
ignored_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.pdf', '.zip', '.tar', '.gz', '.json', '.yaml', '.yml', '.md', '.txt'}
return not any(filename.lower().endswith(ext) for ext in ignored_extensions)
Loading
Loading