diff --git a/.github/workflows/reusable_test.yml b/.github/workflows/reusable_test.yml index 3a5f346..52f70b5 100644 --- a/.github/workflows/reusable_test.yml +++ b/.github/workflows/reusable_test.yml @@ -26,6 +26,8 @@ jobs: - name: Install PrivacyGuard library run: | + printf 'tree-sitter==0.20.4\nsetuptools<75\n' > /tmp/build-constraints.txt + UV_BUILD_CONSTRAINT=/tmp/build-constraints.txt uv pip install codebleu==0.6.0 uv pip install -e . - name: Tests and coverage diff --git a/privacy_guard/analysis/code_similarity/code_bleu_node.py b/privacy_guard/analysis/code_similarity/code_bleu_node.py new file mode 100644 index 0000000..6ecd95a --- /dev/null +++ b/privacy_guard/analysis/code_similarity/code_bleu_node.py @@ -0,0 +1,236 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pyre-strict + +import logging +from dataclasses import dataclass, field +from typing import Any, cast + +import pandas as pd +from codebleu.bleu import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + corpus_bleu, +) +from codebleu.weighted_ngram_match import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + corpus_bleu as corpus_bleu_weighted, +) +from privacy_guard.analysis.base_analysis_node import BaseAnalysisNode +from privacy_guard.analysis.base_analysis_output import BaseAnalysisOutput +from privacy_guard.analysis.code_similarity.code_similarity_analysis_input import ( + CodeBleuAnalysisInput, +) + +# pyre-ignore[21]: tree-sitter doesn't have properly exposed type stubs +from tree_sitter import Node + +logger: logging.Logger = logging.getLogger(__name__) + + +@dataclass +class CodeBleuNodeOutput(BaseAnalysisOutput): + """Output of :class:`CodeBleuNode`. + + Attributes: + num_samples: total number of sample rows. + per_sample_code_bleu: DataFrame with a ``code_bleu`` column. + avg_code_bleu: average CodeBLEU across all pairs. + avg_code_bleu_by_language: per-language average similarity, or + ``None`` when no ``language`` column is present. + """ + + num_samples: int + per_sample_code_bleu: pd.DataFrame = field(repr=False) + avg_code_bleu: float + avg_code_bleu_by_language: dict[str, float] | None + + +class CodeBleuNode(BaseAnalysisNode): + """Compute CodeBLEU similarity between two pieces of code. + + The metric represents a weighted sum of the following components: + - alpha * ngram_match_score + - beta * weighted_ngram_match_score, where language-specific, generic tokens are given less weight + - gamma * syntax_match_score, where syntax_match_score is computed using the distance between ASTs of the code + - theta * dataflow_match_score, where dataflow_match_score is computed using the distance between data flows of the code + + Args: + analysis_input: a :class:`CodeBleuAnalysisInput` produced + by :class:`CodeBleuAttack`. + """ + + def __init__(self, analysis_input: CodeBleuAnalysisInput) -> None: + super().__init__(analysis_input=analysis_input) + + @staticmethod + # pyre-ignore[11]: Annotation `Node` is not defined as a type. + def syntax_match(target_tree: Node, generated_tree: Node) -> float: + def _node_sexp(node: Node) -> str: + """Build a position-independent s-expression string for a subtree.""" + if not node.children: + return node.type + return f"({node.type} {' '.join(_node_sexp(c) for c in node.children)})" + + def get_all_sub_trees(root_node: Node) -> list[str]: + node_stack: list[Node] = [root_node] + sub_tree_sexp_list = [] + while node_stack: + cur_node = node_stack.pop() + sub_tree_sexp_list.append(_node_sexp(cur_node)) + for child_node in cur_node.children: + if child_node.children: + node_stack.append(child_node) + return sub_tree_sexp_list + + target_sexps = get_all_sub_trees(target_tree) + generated_sexps = get_all_sub_trees(generated_tree) + + # Per §3.2 of https://arxiv.org/pdf/2009.10297.pdf: + # Match(T_candidate, T_reference) = |ST(T_candidate) ∩ ST(T_reference)| / |ST(T_reference)| + # Iterate over generated (candidate) subtrees and count matches in target (reference). + # this follows their definition in the paper and addresses the TODO mentioned in their code + if len(target_sexps) == 0: + logger.warning("Empty target AST, syntax match score degenerates to 0.") + return 0.0 + + target_sexps_copy = list(target_sexps) + match_count = 0 + for sub_tree in generated_sexps: + if sub_tree in target_sexps_copy: + match_count += 1 + target_sexps_copy.remove(sub_tree) + + return match_count / len(target_sexps) + + @staticmethod + def dataflow_match(target_dfg: Any, generated_dfg: Any) -> float: + total_count = len(target_dfg) + + if total_count == 0: + logger.warning("Empty target DFG, dataflow match score degenerates to 0.") + return 0.0 + + generated_dfg_copy = list(generated_dfg) # Shallow copy to avoid mutating input + match_count = 0 + + for dataflow in target_dfg: + if dataflow in generated_dfg_copy: + match_count += 1 + generated_dfg_copy.remove(dataflow) + + return match_count / total_count + + @staticmethod + def calc_codebleu( + target_tokens: list[str], + generated_tokens: list[str], + target_tokens_with_weights: tuple[list[str], dict[str, float]], + target_ast: Node, + generated_ast: Node, + target_normalized_dataflow: Any, + generated_normalized_dataflow: Any, + weights: tuple[float, float, float, float] = (0.25, 0.25, 0.25, 0.25), + ) -> float: + """Calculate the CodeBLEU similarity score between target and generated code. + + CodeBLEU is a composite metric that combines lexical, syntactic, and semantic + similarity measures. The final score is a weighted sum of four components: + + score = α * ngram_match + β * weighted_ngram_match + γ * syntax_match + θ * dataflow_match + + Where: + - ngram_match: Standard BLEU score measuring n-gram overlap + - weighted_ngram_match: BLEU score with reduced weight (0.2) for non-keyword tokens (specific per language) + - syntax_match: Fraction of target AST subtrees found in generated AST + - dataflow_match: Fraction of target dataflow edges found in generated code + + Note that, if the target AST or DFG is empty, the syntax and dataflow matchs are set to 0. + + See: https://arxiv.org/pdf/2009.10297 + + Args: + target_tokens: Tokenized target (reference) code. + generated_tokens: Tokenized generated (hypothesis) code. + target_tokens_with_weights: Target tokens with keyword weight dict [tokens, {token: weight}]. + target_ast: Parsed AST root node for target code. + generated_ast: Parsed AST root node for generated code. + target_normalized_dataflow: Normalized dataflow graph for target code. + generated_normalized_dataflow: Normalized dataflow graph for generated code. + weights: Tuple of (α, β, γ, θ) weights for the four components. + Defaults to equal weighting (0.25, 0.25, 0.25, 0.25). + + Returns: + CodeBLEU similarity score in the range [0, 1], where 1 indicates + identical code. + """ + + ngram_match_score = corpus_bleu([[target_tokens]], [generated_tokens]) + + weighted_ngram_match_score = corpus_bleu_weighted( + [[target_tokens_with_weights]], [generated_tokens] + ) + + # calculate syntax match + syntax_match_score = CodeBleuNode.syntax_match(target_ast, generated_ast) + + # calculate dataflow match + dataflow_match_score = CodeBleuNode.dataflow_match( + target_normalized_dataflow, generated_normalized_dataflow + ) + + alpha, beta, gamma, theta = weights + code_bleu_score = ( + alpha * ngram_match_score + + beta * weighted_ngram_match_score + + gamma * syntax_match_score + + theta * dataflow_match_score + ) + + return code_bleu_score + + # ------------------------------------------------------------------ + # BaseAnalysisNode interface + # ------------------------------------------------------------------ + + def run_analysis(self) -> CodeBleuNodeOutput: + analysis_input = cast(CodeBleuAnalysisInput, self.analysis_input) + df = analysis_input.generation_df + + def _row_similarity(row: pd.Series) -> float: # type: ignore[type-arg] + return CodeBleuNode.calc_codebleu( + row["target_tokens"], + row["generated_tokens"], + row["target_tokens_with_weights"], + row["target_ast"], + row["generated_ast"], + row["target_normalized_dfg"], + row["generated_normalized_dfg"], + ) + + similarities = df.apply(_row_similarity, axis=1) + per_sample = pd.DataFrame({"code_bleu": similarities}) + + avg_code_bleu = float(similarities.mean()) if len(similarities) > 0 else 0.0 + + avg_by_lang: dict[str, float] | None = None + if "language" in df.columns: + per_sample["language"] = df["language"].values + grouped = per_sample.groupby("language")["code_bleu"].mean() + avg_by_lang = grouped.to_dict() + + return CodeBleuNodeOutput( + num_samples=len(df), + per_sample_code_bleu=per_sample, + avg_code_bleu=avg_code_bleu, + avg_code_bleu_by_language=avg_by_lang, + ) diff --git a/privacy_guard/analysis/code_similarity/code_similarity_analysis_input.py b/privacy_guard/analysis/code_similarity/code_similarity_analysis_input.py index 8f53b86..782703e 100644 --- a/privacy_guard/analysis/code_similarity/code_similarity_analysis_input.py +++ b/privacy_guard/analysis/code_similarity/code_similarity_analysis_input.py @@ -57,3 +57,50 @@ def __init__(self, generation_df: pd.DataFrame) -> None: def generation_df(self) -> pd.DataFrame: """Property accessor for the generation DataFrame.""" return self._df_train_user + + +class CodeBleuAnalysisInput(BaseAnalysisInput): + """ + Analysis input for CodeBLEU similarity analysis. + + Stores a generation DataFrame containing target and model-generated code strings + along with their tokenized representations, ASTs, and normalized dataflows. + + Required columns: + - target_code_string: the original target code + - model_generated_code_string: the model's generated code + - target_tokens: tokenized target code (List[str]) + - generated_tokens: tokenized generated code (List[str]) + - target_tokens_with_weights: tokens with keyword weights for weighted BLEU + - target_ast: parsed AST (tree_sitter.Node) for the target code + - generated_ast: parsed AST (tree_sitter.Node) for the generated code + - target_normalized_dfg: normalized dataflow graph for target code + - generated_normalized_dfg: normalized dataflow graph for generated code + + Args: + generation_df: DataFrame containing code strings and parsed representations + """ + + REQUIRED_COLUMNS: list[str] = [ + "target_code_string", + "model_generated_code_string", + "target_tokens", + "generated_tokens", + "target_tokens_with_weights", + "target_ast", + "generated_ast", + "target_normalized_dfg", + "generated_normalized_dfg", + ] + + def __init__(self, generation_df: pd.DataFrame) -> None: + missing = set(self.REQUIRED_COLUMNS) - set(generation_df.columns) + if missing: + raise ValueError(f"Missing required columns in generation_df: {missing}") + + super().__init__(df_train_user=generation_df, df_test_user=pd.DataFrame()) + + @property + def generation_df(self) -> pd.DataFrame: + """Property accessor for the generation DataFrame.""" + return self._df_train_user diff --git a/privacy_guard/analysis/tests/test_code_bleu_node.py b/privacy_guard/analysis/tests/test_code_bleu_node.py new file mode 100644 index 0000000..a9980de --- /dev/null +++ b/privacy_guard/analysis/tests/test_code_bleu_node.py @@ -0,0 +1,632 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pyre-strict + +import unittest + +import pandas as pd +from privacy_guard.analysis.code_similarity.code_bleu_node import ( + CodeBleuNode, + CodeBleuNodeOutput, +) +from privacy_guard.attacks.code_similarity.code_bleu_attack import CodeBleuAttack + + +def _run_e2e(df: pd.DataFrame, default_language: str = "python") -> CodeBleuNodeOutput: + """Run the full CodeBLEU pipeline: attack preprocessing → analysis node.""" + attack_output = CodeBleuAttack( + data=df, default_language=default_language + ).run_attack() + return CodeBleuNode(analysis_input=attack_output).run_analysis() + + +def _attack_row(target: str, generated: str, language: str = "python") -> pd.Series: # type: ignore[type-arg] + """Return a single-row attack result as a pd.Series for static-helper tests.""" + df = pd.DataFrame( + { + "target_code_string": [target], + "model_generated_code_string": [generated], + } + ) + attack_output = CodeBleuAttack(data=df, default_language=language).run_attack() + return attack_output.generation_df.iloc[0] + + +class TestCodeBleuNode(unittest.TestCase): + # ------------------------------------------------------------------ + # dataflow_match – pure tuple comparisons, no tree-sitter needed + # ------------------------------------------------------------------ + + def test_dataflow_match_identical(self) -> None: + """Identical DFGs produce a match score of 1.0.""" + dfg = [("var_0", "comesFrom", []), ("var_1", "comesFrom", ["var_0"])] + self.assertAlmostEqual(CodeBleuNode.dataflow_match(dfg, dfg), 1.0, places=5) + + def test_dataflow_match_empty_target(self) -> None: + """Empty target DFG returns 0.0 (degenerate case).""" + generated = [("var_0", "comesFrom", [])] + self.assertAlmostEqual( + CodeBleuNode.dataflow_match([], generated), 0.0, places=5 + ) + + def test_dataflow_match_both_empty(self) -> None: + """Both target and generated empty returns 0.0.""" + self.assertAlmostEqual(CodeBleuNode.dataflow_match([], []), 0.0, places=5) + + def test_dataflow_match_no_overlap(self) -> None: + """Completely different relationships return 0.0.""" + target = [("var_0", "comesFrom", [])] + generated = [("var_0", "assignTo", [])] + self.assertAlmostEqual( + CodeBleuNode.dataflow_match(target, generated), 0.0, places=5 + ) + + def test_dataflow_match_partial(self) -> None: + """Partially overlapping DFGs return an intermediate score.""" + target = [ + ("var_0", "comesFrom", []), + ("var_1", "comesFrom", ["var_0"]), + ] + generated = [ + ("var_0", "comesFrom", []), # matches + ("var_1", "assignTo", ["var_0"]), # different relationship – no match + ] + score = CodeBleuNode.dataflow_match(target, generated) + self.assertGreater(score, 0.0) + self.assertLess(score, 1.0) + self.assertAlmostEqual(score, 0.5, places=5) + + def test_dataflow_match_does_not_double_count(self) -> None: + """Each generated edge can only match one target edge (no double counting).""" + target = [ + ("var_0", "comesFrom", []), + ("var_0", "comesFrom", []), # duplicate + ] + generated = [("var_0", "comesFrom", [])] # only one copy + # match_count should be 1, total_count 2 → score 0.5 + score = CodeBleuNode.dataflow_match(target, generated) + self.assertAlmostEqual(score, 0.5, places=5) + + # ------------------------------------------------------------------ + # syntax_match – requires tree-sitter nodes via run_attack + # ------------------------------------------------------------------ + + def test_syntax_match_identical_code(self) -> None: + """Identical code produces syntax match of 1.0.""" + code = "def foo():\n return 1\n" + row = _attack_row(code, code) + score = CodeBleuNode.syntax_match(row["target_ast"], row["generated_ast"]) + self.assertAlmostEqual(score, 1.0, places=5) + + def test_syntax_match_same_structure_different_names(self) -> None: + """Same AST structure but different identifiers still gets a high score.""" + # Both are single-assignment modules – identical structure + row = _attack_row("x = 1\n", "y = 2\n") + score = CodeBleuNode.syntax_match(row["target_ast"], row["generated_ast"]) + # AST node types match exactly, so score should be 1.0 + self.assertAlmostEqual(score, 1.0, places=5) + + def test_syntax_match_unrelated_code(self) -> None: + """Structurally different code produces a lower syntax score than identical.""" + identical_row = _attack_row( + "def foo():\n x = 1\n return x\n", + "def foo():\n x = 1\n return x\n", + ) + unrelated_row = _attack_row( + "def foo():\n x = 1\n return x\n", + "y = 42\n", + ) + score_identical = CodeBleuNode.syntax_match( + identical_row["target_ast"], identical_row["generated_ast"] + ) + score_unrelated = CodeBleuNode.syntax_match( + unrelated_row["target_ast"], unrelated_row["generated_ast"] + ) + self.assertGreater(score_identical, score_unrelated) + + # ------------------------------------------------------------------ + # calc_codebleu – static composite score + # ------------------------------------------------------------------ + + def test_calc_codebleu_identical_code(self) -> None: + """calc_codebleu returns a high score (> 0.8) for identical code.""" + code = "def foo():\n x = 1\n return x\n" + row = _attack_row(code, code) + score = CodeBleuNode.calc_codebleu( + row["target_tokens"], + row["generated_tokens"], + row["target_tokens_with_weights"], + row["target_ast"], + row["generated_ast"], + row["target_normalized_dfg"], + row["generated_normalized_dfg"], + ) + self.assertGreater(score, 0.8) + + def test_calc_codebleu_score_in_range(self) -> None: + """calc_codebleu always returns a value in [0, 1].""" + pairs = [ + ("def foo():\n return 1\n", "def foo():\n return 1\n"), + ("def foo():\n return 1\n", "x = 42\n"), + ("x = 1\n", "y = 2\n"), + ] + for target, generated in pairs: + with self.subTest(target=target[:20]): + row = _attack_row(target, generated) + score = CodeBleuNode.calc_codebleu( + row["target_tokens"], + row["generated_tokens"], + row["target_tokens_with_weights"], + row["target_ast"], + row["generated_ast"], + row["target_normalized_dfg"], + row["generated_normalized_dfg"], + ) + self.assertGreaterEqual(score, 0.0) + self.assertLessEqual(score, 1.0) + + def test_calc_codebleu_custom_weights_sum_to_score(self) -> None: + """With only one component active (weight 1.0), score equals that component.""" + code = "def foo():\n x = 1\n return x\n" + row = _attack_row(code, code) + args = ( + row["target_tokens"], + row["generated_tokens"], + row["target_tokens_with_weights"], + row["target_ast"], + row["generated_ast"], + row["target_normalized_dfg"], + row["generated_normalized_dfg"], + ) + # All-syntax weight: result should equal syntax_match alone + score_syntax_only = CodeBleuNode.calc_codebleu( + *args, weights=(0.0, 0.0, 1.0, 0.0) + ) + syntax_score = CodeBleuNode.syntax_match( + row["target_ast"], row["generated_ast"] + ) + self.assertAlmostEqual(score_syntax_only, syntax_score, places=5) + + # All-dataflow weight: result should equal dataflow_match alone + score_df_only = CodeBleuNode.calc_codebleu(*args, weights=(0.0, 0.0, 0.0, 1.0)) + df_score = CodeBleuNode.dataflow_match( + row["target_normalized_dfg"], row["generated_normalized_dfg"] + ) + self.assertAlmostEqual(score_df_only, df_score, places=5) + + def test_calc_codebleu_weight_scaling(self) -> None: + """Doubling a weight proportionally increases its contribution to the score.""" + code = "def foo():\n x = 1\n return x\n" + row = _attack_row(code, code) + args = ( + row["target_tokens"], + row["generated_tokens"], + row["target_tokens_with_weights"], + row["target_ast"], + row["generated_ast"], + row["target_normalized_dfg"], + row["generated_normalized_dfg"], + ) + # For identical code all components ≈ 1.0, so both configs should be > 0.8 + score_equal = CodeBleuNode.calc_codebleu( + *args, weights=(0.25, 0.25, 0.25, 0.25) + ) + score_ngram_heavy = CodeBleuNode.calc_codebleu( + *args, weights=(0.5, 0.25, 0.25, 0.0) + ) + self.assertGreater(score_equal, 0.8) + self.assertGreater(score_ngram_heavy, 0.8) + + # ------------------------------------------------------------------ + # run_analysis – output structure + # ------------------------------------------------------------------ + + def test_run_analysis_returns_correct_type(self) -> None: + """run_analysis() returns a CodeBleuNodeOutput instance.""" + df = pd.DataFrame( + { + "target_code_string": ["def foo():\n return 1\n"], + "model_generated_code_string": ["def foo():\n return 1\n"], + } + ) + output = _run_e2e(df) + self.assertIsInstance(output, CodeBleuNodeOutput) + + def test_run_analysis_num_samples(self) -> None: + """num_samples matches the number of input rows.""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "def bar():\n return 2\n", + "x = 1\n", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "def baz():\n return 3\n", + "y = 2\n", + ], + } + ) + output = _run_e2e(df) + self.assertEqual(output.num_samples, 3) + + def test_run_analysis_per_sample_column(self) -> None: + """per_sample_code_bleu contains a 'code_bleu' column with one row per sample.""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "x = 1\n", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "y = 2\n", + ], + } + ) + output = _run_e2e(df) + self.assertIn("code_bleu", output.per_sample_code_bleu.columns) + self.assertEqual(len(output.per_sample_code_bleu), 2) + + def test_run_analysis_avg_equals_mean_of_per_sample(self) -> None: + """avg_code_bleu equals the arithmetic mean of per_sample_code_bleu.""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "def bar():\n x = 1\n return x\n", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "x = 42\n", + ], + } + ) + output = _run_e2e(df) + expected_mean = float(output.per_sample_code_bleu["code_bleu"].mean()) + self.assertAlmostEqual(output.avg_code_bleu, expected_mean, places=5) + + def test_run_analysis_scores_in_range(self) -> None: + """All per-sample scores are in [0, 1].""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "def bar():\n x = 1\n return x\n", + "x = 1\n", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "x = 42\n", + "", + ], + } + ) + output = _run_e2e(df) + for score in output.per_sample_code_bleu["code_bleu"]: + self.assertGreaterEqual(score, 0.0) + self.assertLessEqual(score, 1.0) + + # ------------------------------------------------------------------ + # run_analysis – score semantics + # ------------------------------------------------------------------ + + def test_run_analysis_identical_code_high_score(self) -> None: + """Identical code produces avg_code_bleu above 0.8.""" + df = pd.DataFrame( + { + "target_code_string": ["def foo():\n x = 1\n return x\n"], + "model_generated_code_string": [ + "def foo():\n x = 1\n return x\n" + ], + } + ) + output = _run_e2e(df) + self.assertGreater(output.avg_code_bleu, 0.8) + + def test_run_analysis_identical_scores_higher_than_unrelated(self) -> None: + """Identical code scores higher than structurally unrelated code.""" + df_identical = pd.DataFrame( + { + "target_code_string": ["def foo():\n x = 1\n return x\n"], + "model_generated_code_string": [ + "def foo():\n x = 1\n return x\n" + ], + } + ) + df_unrelated = pd.DataFrame( + { + "target_code_string": ["def foo():\n x = 1\n return x\n"], + "model_generated_code_string": ["y = 42\n"], + } + ) + score_identical = _run_e2e(df_identical).avg_code_bleu + score_unrelated = _run_e2e(df_unrelated).avg_code_bleu + self.assertGreater(score_identical, score_unrelated) + + # ------------------------------------------------------------------ + # run_analysis – language grouping + # ------------------------------------------------------------------ + + def test_run_analysis_no_language_column(self) -> None: + """avg_code_bleu_by_language is None when no 'language' column is present.""" + df = pd.DataFrame( + { + "target_code_string": ["def foo():\n return 1\n"], + "model_generated_code_string": ["def foo():\n return 1\n"], + } + ) + output = _run_e2e(df) + self.assertIsNone(output.avg_code_bleu_by_language) + + def test_run_analysis_with_language_column(self) -> None: + """avg_code_bleu_by_language is populated with one key per language.""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "int main() { return 0; }", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "int main() { return 0; }", + ], + "language": ["python", "cpp"], + } + ) + output = _run_e2e(df, default_language="python") + self.assertIsNotNone(output.avg_code_bleu_by_language) + by_lang = output.avg_code_bleu_by_language + assert by_lang is not None # narrow for type checker + self.assertIn("python", by_lang) + self.assertIn("cpp", by_lang) + self.assertIsInstance(by_lang["python"], float) + self.assertIsInstance(by_lang["cpp"], float) + + def test_run_analysis_language_scores_in_range(self) -> None: + """Per-language average scores are in [0, 1].""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "function add(a, b) { return a + b; }", + "func add(a, b int) int { return a + b }", + ], + "model_generated_code_string": [ + "def bar():\n return 2\n", + "function sub(a, b) { return a - b; }", + "func add(a, b int) int { return a + b }", + ], + "language": ["python", "javascript", "go"], + } + ) + output = _run_e2e(df, default_language="python") + by_lang = output.avg_code_bleu_by_language + assert by_lang is not None + for lang, score in by_lang.items(): + with self.subTest(language=lang): + self.assertGreaterEqual(score, 0.0) + self.assertLessEqual(score, 1.0) + + # ------------------------------------------------------------------ + # run_analysis – multi-language E2E + # ------------------------------------------------------------------ + + def test_run_analysis_java(self) -> None: + """E2E pipeline works for Java code.""" + df = pd.DataFrame( + { + "target_code_string": [ + "public int add(int a, int b) { return a + b; }", + ], + "model_generated_code_string": [ + "public int add(int a, int b) { return a + b; }", + ], + } + ) + output = _run_e2e(df, default_language="java") + self.assertGreater(output.avg_code_bleu, 0.8) + + def test_run_analysis_javascript(self) -> None: + """E2E pipeline works for JavaScript code.""" + df = pd.DataFrame( + { + "target_code_string": ["function add(a, b) { return a + b; }"], + "model_generated_code_string": ["function add(a, b) { return a + b; }"], + } + ) + output = _run_e2e(df, default_language="javascript") + self.assertGreater(output.avg_code_bleu, 0.8) + + def test_run_analysis_go(self) -> None: + """E2E pipeline works for Go code.""" + df = pd.DataFrame( + { + "target_code_string": ["func add(a int, b int) int { return a + b }"], + "model_generated_code_string": [ + "func add(a int, b int) int { return a + b }" + ], + } + ) + output = _run_e2e(df, default_language="go") + self.assertGreater(output.avg_code_bleu, 0.8) + + def test_run_analysis_rust(self) -> None: + """E2E pipeline works for Rust code.""" + df = pd.DataFrame( + { + "target_code_string": ["fn add(a: i32, b: i32) -> i32 { a + b }"], + "model_generated_code_string": [ + "fn add(a: i32, b: i32) -> i32 { a + b }" + ], + } + ) + output = _run_e2e(df, default_language="rust") + self.assertGreater(output.avg_code_bleu, 0.8) + + def test_run_analysis_ruby(self) -> None: + """E2E pipeline works for Ruby code.""" + df = pd.DataFrame( + { + "target_code_string": ["def add(a, b)\n a + b\nend\n"], + "model_generated_code_string": ["def add(a, b)\n a + b\nend\n"], + } + ) + output = _run_e2e(df, default_language="ruby") + self.assertGreater(output.avg_code_bleu, 0.8) + + # ------------------------------------------------------------------ + # Complex real-world use cases + # ------------------------------------------------------------------ + + def test_renamed_variables_high_syntax_and_dfg_match(self) -> None: + """Same algorithm with renamed variables: syntax and DFG match are perfect, + which drives a high overall score despite the token strings differing. + + This tests a key property of CodeBLEU: variable-name normalization in the DFG + and type-only AST comparison make the metric invariant to renaming. + """ + target = ( + "def compute_sum(numbers):\n" + " total = 0\n" + " for num in numbers:\n" + " total += num\n" + " return total\n" + ) + generated = ( + "def compute_sum(items):\n" + " acc = 0\n" + " for val in items:\n" + " acc += val\n" + " return acc\n" + ) + row = _attack_row(target, generated) + + # AST node types are identical → syntax_match = 1.0 + syntax = CodeBleuNode.syntax_match(row["target_ast"], row["generated_ast"]) + self.assertAlmostEqual(syntax, 1.0, places=5) + + # Normalized DFGs are structurally identical → dataflow_match = 1.0 + dfg = CodeBleuNode.dataflow_match( + row["target_normalized_dfg"], row["generated_normalized_dfg"] + ) + self.assertAlmostEqual(dfg, 1.0, places=5) + + # Overall: two out of four components are perfect → score > 0.5 guaranteed, + # even though ngram BLEU is low because variable names all differ as tokens. + # The metric correctly boosts the score via syntax + DFG invariance. + output = _run_e2e( + pd.DataFrame( + { + "target_code_string": [target], + "model_generated_code_string": [generated], + } + ) + ) + self.assertGreater(output.avg_code_bleu, 0.5) + self.assertLess(output.avg_code_bleu, 1.0) + + def test_iterative_vs_recursive_factorial_score_ordering(self) -> None: + """Iterative and recursive implementations of factorial share tokens and + the function signature but differ structurally (if+call vs for+accumulator). + + Score should be strictly between identical and completely unrelated code, + showing the metric captures partial structural similarity. + """ + recursive = ( + "def factorial(n):\n" + " if n == 0:\n" + " return 1\n" + " return n * factorial(n - 1)\n" + ) + iterative = ( + "def factorial(n):\n" + " result = 1\n" + " for i in range(1, n + 1):\n" + " result *= i\n" + " return result\n" + ) + unrelated = "def greet(name):\n print('hello', name)\n" + + def _score(t: str, g: str) -> float: + return _run_e2e( + pd.DataFrame( + {"target_code_string": [t], "model_generated_code_string": [g]} + ) + ).avg_code_bleu + + score_identical = _score(recursive, recursive) + score_diff_impl = _score(recursive, iterative) + score_unrelated = _score(recursive, unrelated) + + # Identical must beat different implementation + self.assertGreater(score_identical, score_diff_impl) + # Different implementation shares signature/tokens, so beats unrelated + self.assertGreater(score_diff_impl, score_unrelated) + + def test_sorting_algorithms_score_higher_than_unrelated(self) -> None: + """Bubble sort and selection sort share substantial structure (nested loops, + array indexing, conditional swap pattern, same signature) but differ in the + inner update logic. + + The metric should give them a noticeably higher score than comparing either + to an unrelated string-processing function, reflecting the shared skeleton. + """ + bubble_sort = ( + "def bubble_sort(arr):\n" + " n = len(arr)\n" + " for i in range(n):\n" + " for j in range(0, n - i - 1):\n" + " if arr[j] > arr[j + 1]:\n" + " arr[j], arr[j + 1] = arr[j + 1], arr[j]\n" + " return arr\n" + ) + selection_sort = ( + "def selection_sort(arr):\n" + " n = len(arr)\n" + " for i in range(n):\n" + " min_idx = i\n" + " for j in range(i + 1, n):\n" + " if arr[min_idx] > arr[j]:\n" + " min_idx = j\n" + " arr[i], arr[min_idx] = arr[min_idx], arr[i]\n" + " return arr\n" + ) + unrelated = ( + "def tokenize(text):\n" + " words = text.strip().split()\n" + " return [w.lower() for w in words]\n" + ) + + def _score(t: str, g: str) -> float: + return _run_e2e( + pd.DataFrame( + {"target_code_string": [t], "model_generated_code_string": [g]} + ) + ).avg_code_bleu + + score_sorting_pair = _score(bubble_sort, selection_sort) + score_vs_unrelated = _score(bubble_sort, unrelated) + + # Two sorting algorithms score higher than bubble sort vs string processing + self.assertGreater(score_sorting_pair, score_vs_unrelated) + + # Both sorting algorithms share enough structure that their score is non-trivial + self.assertGreater(score_sorting_pair, 0.3) + + +if __name__ == "__main__": + unittest.main() diff --git a/privacy_guard/attacks/code_similarity/code_bleu_attack.py b/privacy_guard/attacks/code_similarity/code_bleu_attack.py new file mode 100644 index 0000000..9a8965b --- /dev/null +++ b/privacy_guard/attacks/code_similarity/code_bleu_attack.py @@ -0,0 +1,204 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pyre-strict + +import importlib.resources +import logging +from typing import Any + +import pandas as pd +from codebleu.codebleu import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + AVAILABLE_LANGS, +) +from codebleu.dataflow_match import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + dfg_function, + get_data_flow, + normalize_dataflow, +) +from codebleu.parser import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + remove_comments_and_docstrings, +) +from privacy_guard.analysis.code_similarity.code_similarity_analysis_input import ( + CodeBleuAnalysisInput, +) +from privacy_guard.attacks.base_attack import BaseAttack + +# pyre-ignore[21]: tree-sitter doesn't have properly exposed type stubs +from tree_sitter import ( # @manual=fbsource//third-party/pypi/tree-sitter:tree-sitter + Language, + Node, + Parser, +) + +logger: logging.Logger = logging.getLogger(__name__) + + +class CodeBleuAttack(BaseAttack): + """Prepare target and generated code for similarity analysis using CodeBLEU. + CodeBLEU combines BLEU scores, syntax similarity through AST and semantic similarity using data flow (DFG). + See: https://arxiv.org/pdf/2009.10297, or https://github.com/k4black/codebleu/tree/main. + + Expects a DataFrame with ``target_code_string`` and + ``model_generated_code_string`` columns. Produces a + :class:`CodeBleuAnalysisInput` with additional AST and DFG columns + ready for downstream similarity analysis. + + Args: + data: DataFrame with code string columns. + default_language: default language for parsing (e.g. "python", "cpp"). + Rows may override this via a ``language`` column. + """ + + REQUIRED_COLUMNS: list[str] = [ + "target_code_string", + "model_generated_code_string", + ] + + def __init__( + self, + data: pd.DataFrame, + default_language: str = "python", + ) -> None: + missing = set(self.REQUIRED_COLUMNS) - set(data.columns) + if missing: + raise ValueError(f"Missing required columns: {missing}") + + self._data: pd.DataFrame = data.copy() + self._default_language: str = default_language + + # ------------------------------------------------------------------ + # Public static helpers + # ------------------------------------------------------------------ + + @staticmethod + def tokenizer(s: str) -> list[str]: + return s.split() + + @staticmethod + def make_weights( + reference_tokens: list[str], key_word_list: list[str] + ) -> dict[str, float]: + return { + token: 1 if token in key_word_list else 0.2 for token in reference_tokens + } + + # ------------------------------------------------------------------ + # BaseAttack interface + # ------------------------------------------------------------------ + + def run_attack(self) -> CodeBleuAnalysisInput: + """Parse every row's code strings into ASTs and extract normalized dataflows. + + Adds the following columns to the DataFrame: + - ``target_tokens``: List[str] + - ``generated_tokens``: List[str] + - ``target_tokens_with_weights``: List + - ``target_ast``: tree_sitter.Node + - ``generated_ast``: tree_sitter.Node + - ``target_normalized_dfg``: list of normalized dataflow items + - ``generated_normalized_dfg``: list of normalized dataflow items + + Returns: + A :class:`CodeBleuAnalysisInput` wrapping the + augmented DataFrame. + """ + df = self._data + has_language_col = "language" in df.columns + + target_tokens: list[list[str]] = [] + generated_tokens: list[list[str]] = [] + target_tokens_with_weights: list[list[Any]] = [] + # pyre-ignore[11]: Annotation `Node` is not defined as a type + target_asts: list[Node] = [] + generated_asts: list[Node] = [] + target_normalized_dfgs: list[Any] = [] + generated_normalized_dfgs: list[Any] = [] + + # keep a cache for parser and keywords + # pyre-ignore[11]: Annotation `Parser` is not defined as a type + parser_cache: dict[str, Parser] = {} + keywords_cache: dict[str, list[str]] = {} + + for _, row in df.iterrows(): + lang = str(row["language"]) if has_language_col else self._default_language + + # Get parser and DFG function for this language + if lang not in parser_cache: + if lang not in AVAILABLE_LANGS: + raise ValueError(f"Language {lang} not supported by CodeBLEU.") + tree_sitter_language = Language( + importlib.resources.files("codebleu") / "my-languages.so", lang + ) + # pyre-ignore[16]: Module `tree_sitter` has no attribute `Parser`. + parser = Parser() + parser.set_language(tree_sitter_language) + parser_cache[lang] = parser + parser = parser_cache[lang] + dfg_func = dfg_function.get(lang) + if dfg_func is None: + raise ValueError(f"No DFG function available for language: {lang}") + + if lang not in keywords_cache: + keywords_file = ( + importlib.resources.files("codebleu") / "keywords" / f"{lang}.txt" + ) + keywords_cache[lang] = keywords_file.read_text( + encoding="utf-8" + ).splitlines() + keywords = keywords_cache[lang] + + # (1) Process target code + target_str = str(row["target_code_string"]).strip() + + # get the (weighted) tokens to compute BLEU, only needed for the target code + raw_target_tokens = self.tokenizer(target_str) + target_tokens.append(raw_target_tokens) + target_tokens_with_weights.append( + [raw_target_tokens, self.make_weights(raw_target_tokens, keywords)] + ) + + # get the AST + target_code = remove_comments_and_docstrings(target_str, lang) + target_tree = parser.parse(bytes(target_code, "utf8")).root_node + target_asts.append(target_tree) + + # get the data flow + target_dfg = get_data_flow(target_code, [parser, dfg_func]) + target_normalized_dfgs.append(normalize_dataflow(target_dfg)) + + # (2) Process generated code + generated_str = str(row["model_generated_code_string"]).strip() + + # get the tokens to compute BLEU + generated_tokens.append(self.tokenizer(generated_str)) + + # get the AST + generated_code = remove_comments_and_docstrings(generated_str, lang) + generated_tree = parser.parse(bytes(generated_code, "utf8")).root_node + generated_asts.append(generated_tree) + + # get the data flow + generated_dfg = get_data_flow(generated_code, [parser, dfg_func]) + generated_normalized_dfgs.append(normalize_dataflow(generated_dfg)) + + df["target_tokens"] = target_tokens + df["generated_tokens"] = generated_tokens + df["target_tokens_with_weights"] = target_tokens_with_weights + df["target_ast"] = target_asts + df["generated_ast"] = generated_asts + df["target_normalized_dfg"] = target_normalized_dfgs + df["generated_normalized_dfg"] = generated_normalized_dfgs + + return CodeBleuAnalysisInput(generation_df=df) diff --git a/privacy_guard/attacks/code_similarity/py_tree_sitter_attack.py b/privacy_guard/attacks/code_similarity/py_tree_sitter_attack.py index 267d5c2..2df8b28 100644 --- a/privacy_guard/attacks/code_similarity/py_tree_sitter_attack.py +++ b/privacy_guard/attacks/code_similarity/py_tree_sitter_attack.py @@ -88,6 +88,7 @@ def _get_parser(language: str) -> Parser: # pyre-ignore[11] ts_language = _language_from_capsule(ts_module) parser = Parser() # pyre-ignore[16] + # pyre-ignore[16]: Module `tree_sitter` has no attribute `Parser` parser.set_language(ts_language) return parser diff --git a/privacy_guard/attacks/tests/test_code_bleu_attack.py b/privacy_guard/attacks/tests/test_code_bleu_attack.py new file mode 100644 index 0000000..53cbcaa --- /dev/null +++ b/privacy_guard/attacks/tests/test_code_bleu_attack.py @@ -0,0 +1,301 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pyre-strict + +import importlib.resources +import unittest + +import pandas as pd +from codebleu.dataflow_match import ( # @manual=fbsource//third-party/pypi/codebleu:codebleu + dfg_function, + get_data_flow, + normalize_dataflow, +) +from privacy_guard.attacks.code_similarity.code_bleu_attack import CodeBleuAttack + +# pyre-ignore[21]: tree-sitter doesn't have properly exposed type stubs +from tree_sitter import ( # @manual=fbsource//third-party/pypi/tree-sitter:tree-sitter + Language, + Parser, +) + + +# pyre-ignore[11]: Annotation `Parser` is not defined as a type +def _make_parser(language: str) -> Parser: + tree_sitter_language = Language( + importlib.resources.files("codebleu") / "my-languages.so", language + ) + # pyre-ignore[16]: Module `tree_sitter` has no attribute `Parser` + parser = Parser() + parser.set_language(tree_sitter_language) + return parser + + +def _run_attack(target: str, generated: str, language: str = "python") -> pd.DataFrame: + df = pd.DataFrame( + { + "target_code_string": [target], + "model_generated_code_string": [generated], + } + ) + return CodeBleuAttack(data=df, default_language=language).run_attack().generation_df + + +class CodeBleuAttackTest(unittest.TestCase): + # ------------------------------------------------------------------ + # tokenizer + # ------------------------------------------------------------------ + + def test_tokenizer(self) -> None: + self.assertEqual( + CodeBleuAttack.tokenizer("x = 1 + 2"), + ["x", "=", "1", "+", "2"], + ) + self.assertEqual(CodeBleuAttack.tokenizer(""), []) + + # ------------------------------------------------------------------ + # make_weights + # ------------------------------------------------------------------ + + def test_make_weights(self) -> None: + tokens = ["def", "return", "foo", "x"] + keywords = ["def", "return", "if", "for"] + weights = CodeBleuAttack.make_weights(tokens, keywords) + self.assertEqual(weights["def"], 1) + self.assertEqual(weights["return"], 1) + self.assertAlmostEqual(weights["foo"], 0.2) + self.assertAlmostEqual(weights["x"], 0.2) + self.assertEqual(set(weights.keys()), set(tokens)) + + # ------------------------------------------------------------------ + # get_data_flow (from codebleu package) + # ------------------------------------------------------------------ + + def test_get_data_flow_none_dfg_func_returns_empty(self) -> None: + """codebleu's get_data_flow returns [] when dfg_func is None. + Note: run_attack() now raises ValueError before reaching this state; + this test documents the underlying library behaviour only. + """ + parser = _make_parser("python") + self.assertEqual(get_data_flow("x = 1 + 2", [parser, None]), []) + + def test_get_data_flow_python_captures_dependency(self) -> None: + """Python DFG captures that y depends on x in 'x=1; y=x+2'.""" + parser = _make_parser("python") + code = "x = 1\ny = x + 2" + raw_dfg = get_data_flow(code, [parser, dfg_function["python"]]) + normalized = normalize_dataflow(raw_dfg) + + # x appears first → var_0; y depends on x, so var_0 should appear as a parent + parent_sets = [set(item[2]) for item in normalized] + self.assertTrue( + any("var_0" in parents for parents in parent_sets), + f"Expected var_0 (x) as a parent in DFG, got: {normalized}", + ) + + def test_get_data_flow_java_captures_dependency(self) -> None: + """Java DFG captures that y depends on x in an equivalent snippet.""" + parser = _make_parser("java") + code = "class T { void f() { int x = 1; int y = x + 2; } }" + raw_dfg = get_data_flow(code, [parser, dfg_function["java"]]) + normalized = normalize_dataflow(raw_dfg) + + # At least one variable must list another as its source + self.assertTrue( + any(len(item[2]) > 0 for item in normalized), + f"Expected at least one data-flow dependency in Java DFG, got: {normalized}", + ) + + # ------------------------------------------------------------------ + # normalize_dataflow (from codebleu package) + # ------------------------------------------------------------------ + + def test_normalize_dataflow_renames_variables(self) -> None: + # Raw DFG item format: (name, index, relationship, [parent_names], [...]) + raw_dfg = [ + ("x", 0, "comesFrom", [], []), + ("y", 1, "comesFrom", ["x"], []), + ("z", 2, "comesFrom", ["x", "y"], []), + ] + normalized = normalize_dataflow(raw_dfg) + self.assertEqual(normalized[0], ("var_0", "comesFrom", [])) + self.assertEqual(normalized[1], ("var_1", "comesFrom", ["var_0"])) + self.assertEqual(normalized[2], ("var_2", "comesFrom", ["var_0", "var_1"])) + + def test_normalize_dataflow_consistent_renaming(self) -> None: + """Same structural DFG normalizes identically regardless of original variable names.""" + raw_a = [ + ("alpha", 0, "comesFrom", [], []), + ("beta", 1, "comesFrom", ["alpha"], []), + ] + raw_b = [("foo", 0, "comesFrom", [], []), ("bar", 1, "comesFrom", ["foo"], [])] + self.assertEqual( + normalize_dataflow(raw_a), + normalize_dataflow(raw_b), + ) + + # ------------------------------------------------------------------ + # run_attack – tokens (exact values) + # ------------------------------------------------------------------ + + def test_run_attack_exact_tokens(self) -> None: + """Tokenization of known code produces the exact expected token list.""" + gen_df = _run_attack("x = 1", "y = 2") + self.assertEqual(gen_df["target_tokens"].iloc[0], ["x", "=", "1"]) + self.assertEqual(gen_df["generated_tokens"].iloc[0], ["y", "=", "2"]) + + def test_run_attack_tokens_with_weights_python_keywords(self) -> None: + """Python keywords 'def'/'return' get weight 1; identifiers get 0.2.""" + gen_df = _run_attack("def foo(): return 1", "x = 1") + tokens, weight_dict = gen_df["target_tokens_with_weights"].iloc[0] + self.assertEqual(tokens, ["def", "foo():", "return", "1"]) + self.assertEqual(weight_dict["def"], 1) + self.assertEqual(weight_dict["return"], 1) + self.assertAlmostEqual(weight_dict["foo():"], 0.2) + + # ------------------------------------------------------------------ + # run_attack – AST content for Python and Java + # ------------------------------------------------------------------ + + def test_run_attack_ast_python(self) -> None: + """Python AST root is a 'module' containing a 'function_definition'.""" + gen_df = _run_attack( + "def add(a, b):\n return a + b\n", + "def add(a, b):\n return a + b\n", + ) + ast = gen_df["target_ast"].iloc[0] + self.assertEqual(ast.type, "module") + child_types = {child.type for child in ast.children} + self.assertIn("function_definition", child_types) + + def test_run_attack_ast_java(self) -> None: + """Java AST root is a 'program' containing a 'class_declaration'.""" + code = "class Foo { int add(int a, int b) { return a + b; } }" + gen_df = _run_attack(code, code, language="java") + ast = gen_df["target_ast"].iloc[0] + self.assertEqual(ast.type, "program") + child_types = {child.type for child in ast.children} + self.assertIn("class_declaration", child_types) + + def test_run_attack_ast_same_code_same_structure(self) -> None: + """Identical target and generated code produce ASTs with the same structure.""" + code = "def foo(x):\n return x * 2\n" + gen_df = _run_attack(code, code) + target_ast = gen_df["target_ast"].iloc[0] + generated_ast = gen_df["generated_ast"].iloc[0] + self.assertEqual(target_ast.type, generated_ast.type) + self.assertEqual( + [c.type for c in target_ast.children], + [c.type for c in generated_ast.children], + ) + + def test_run_attack_ast_different_code_different_structure(self) -> None: + """Structurally different code (function def vs assignment) yields different AST child types.""" + gen_df = _run_attack( + "def foo(x):\n return x\n", + "x = 1\n", + ) + target_child_types = {c.type for c in gen_df["target_ast"].iloc[0].children} + generated_child_types = { + c.type for c in gen_df["generated_ast"].iloc[0].children + } + self.assertIn("function_definition", target_child_types) + self.assertNotIn("function_definition", generated_child_types) + + # ------------------------------------------------------------------ + # run_attack – DFG content for Python and Java + # ------------------------------------------------------------------ + + def test_run_attack_normalized_dfg_python(self) -> None: + """Python normalized DFG for 'x=1; y=x+2' shows var_0 as y's parent.""" + gen_df = _run_attack( + "def foo():\n x = 1\n y = x + 2\n return y\n", + "def foo():\n x = 1\n y = x + 2\n return y\n", + ) + normalized_dfg = gen_df["target_normalized_dfg"].iloc[0] + parent_sets = [set(item[2]) for item in normalized_dfg] + self.assertTrue( + any("var_0" in p for p in parent_sets), + f"Expected var_0 as a parent in Python DFG, got: {normalized_dfg}", + ) + + def test_run_attack_normalized_dfg_java(self) -> None: + """Java normalized DFG for equivalent snippet shows at least one data-flow edge.""" + code = "class T { void f() { int x = 1; int y = x + 2; } }" + gen_df = _run_attack(code, code, language="java") + normalized_dfg = gen_df["target_normalized_dfg"].iloc[0] + self.assertTrue( + any(len(item[2]) > 0 for item in normalized_dfg), + f"Expected at least one DFG dependency in Java, got: {normalized_dfg}", + ) + + def test_run_attack_identical_code_same_normalized_dfg(self) -> None: + """Identical target and generated code produce equal normalized DFGs.""" + code = "def foo():\n x = 1\n y = x + 2\n return y\n" + gen_df = _run_attack(code, code) + self.assertEqual( + gen_df["target_normalized_dfg"].iloc[0], + gen_df["generated_normalized_dfg"].iloc[0], + ) + + # ------------------------------------------------------------------ + # run_attack – error handling and language column + # ------------------------------------------------------------------ + + def test_run_attack_missing_columns_raise(self) -> None: + with self.subTest("missing_target"): + with self.assertRaises(ValueError): + CodeBleuAttack(pd.DataFrame({"model_generated_code_string": ["x = 1"]})) + with self.subTest("missing_generated"): + with self.assertRaises(ValueError): + CodeBleuAttack(pd.DataFrame({"target_code_string": ["x = 1"]})) + + def test_run_attack_unsupported_language_raises(self) -> None: + """run_attack raises ValueError for a language not in AVAILABLE_LANGS.""" + df = pd.DataFrame( + { + "target_code_string": ["x = 1"], + "model_generated_code_string": ["y = 2"], + } + ) + with self.assertRaises(ValueError): + CodeBleuAttack(data=df, default_language="cobol").run_attack() + + def test_run_attack_language_column_overrides_default(self) -> None: + """Per-row 'language' column controls which parser is used.""" + df = pd.DataFrame( + { + "target_code_string": [ + "def foo():\n return 1\n", + "class Foo { int add(int a, int b) { return a + b; } }", + ], + "model_generated_code_string": [ + "def foo():\n return 1\n", + "class Foo { int add(int a, int b) { return a + b; } }", + ], + "language": ["python", "java"], + } + ) + gen_df = ( + CodeBleuAttack(data=df, default_language="python") + .run_attack() + .generation_df + ) + self.assertEqual(gen_df["target_ast"].iloc[0].type, "module") + self.assertEqual(gen_df["target_ast"].iloc[1].type, "program") + + +if __name__ == "__main__": + unittest.main() diff --git a/pyproject.toml b/pyproject.toml index 5b7fff1..2e86e7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,16 @@ dependencies = [ 'matplotlib', 'tree-sitter==0.20.4', 'tree-sitter-python<=0.23.2', + 'tree-sitter-c', + 'tree-sitter-c-sharp', 'tree-sitter-cpp<=0.23.4', + 'tree-sitter-go', + 'tree-sitter-java', + 'tree-sitter-javascript', + 'tree-sitter-php', + 'tree-sitter-ruby', + 'tree-sitter-rust', + 'codebleu==0.6.0', 'zss', ]