From a11a86c8c82fb9f7075ce7d2a774cf192751571b Mon Sep 17 00:00:00 2001 From: "pensarappdev[bot]" <182706286+pensarappdev[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 15:32:33 +0000 Subject: [PATCH] Fix security issue: Unbounded N-gram Processing Resource Exhaustion (CWE-400, CWE-248) --- nexus/utils.py | 63 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/nexus/utils.py b/nexus/utils.py index 852482b..96890f5 100644 --- a/nexus/utils.py +++ b/nexus/utils.py @@ -9,6 +9,9 @@ from typing import List, Dict, Tuple import string import re +# Cap the total unique n-gram counts in the corpus and per document +MAX_NGRAMS_TOTAL = 100000 # Adjust as needed for typical corpus size +MAX_NGRAMS_PER_DOC = 10000 # Protects against outlier documents stop_words = set(stopwords.words('english')) load_dotenv() @@ -70,7 +73,10 @@ def create_topics(self, documents: List[Dict], id_to_cluster_label: Dict, id_fie ngram_cluster_frequency = defaultdict(lambda: defaultdict(lambda: 0)) ngram_rarity_cluster_list = [] - documents.pop(-1) + if documents: + documents.pop(-1) + + total_unique_ngrams = set() for datum in documents: cluster_id = id_to_cluster_label[datum[id_field]] @@ -79,6 +85,8 @@ def create_topics(self, documents: List[Dict], id_to_cluster_label: Dict, id_fie tokens = re.split(r'\W+', text) ngrams_in_document = set() + unique_ngrams_added = 0 + for i in range(len(tokens) - 1): unigram = tokens[i] bigram = " ".join(tokens[i:i + 2]) @@ -89,25 +97,44 @@ def create_topics(self, documents: List[Dict], id_to_cluster_label: Dict, id_fie unigram_contains_digits = any(j.isdigit() for j in tokens[i]) bigram_contains_digits = unigram_contains_digits or any(j.isdigit() for j in tokens[i + 1]) - # ngrams only count if they do not contain digits. - # If an ngram appears multiple times in a document, count it only once. - - if not unigram_contains_digits and not unigram in ngrams_in_document: - ngram_corpus_frequency[unigram] += 1 - ngram_cluster_frequency[cluster_id][unigram] += 1 - ngrams_in_document.add(unigram) - - # if bigram.isalpha(): - if not bigram_contains_digits and not bigram in ngrams_in_document: - ngram_corpus_frequency[bigram] += 1 - ngram_cluster_frequency[cluster_id][bigram] += 1 - ngrams_in_document.add(bigram) + # Skip n-grams with digits or if per-doc ngram cap reached + allow_unigram = not unigram_contains_digits and unigram not in ngrams_in_document and unigram != '' + allow_bigram = not bigram_contains_digits and bigram not in ngrams_in_document and bigram.strip() != '' + + # Check TOTAL n-gram cap before adding new n-gram + # If ngram already tracked, allow, otherwise check cap + unigram_is_new = allow_unigram and (unigram not in ngram_corpus_frequency) + bigram_is_new = allow_bigram and (bigram not in ngram_corpus_frequency) + + # Enforce per-document cap to prevent pathological documents + if unique_ngrams_added >= MAX_NGRAMS_PER_DOC: + break # Stop counting further n-grams in this document + + # Unigram + if allow_unigram: + if not unigram_is_new or (len(ngram_corpus_frequency) < MAX_NGRAMS_TOTAL): + if unigram_is_new: + unique_ngrams_added += 1 + ngram_corpus_frequency[unigram] += 1 + ngram_cluster_frequency[cluster_id][unigram] += 1 + ngrams_in_document.add(unigram) + total_unique_ngrams.add(unigram) + + # Bigram + if allow_bigram: + if not bigram_is_new or (len(ngram_corpus_frequency) < MAX_NGRAMS_TOTAL): + if bigram_is_new: + unique_ngrams_added += 1 + ngram_corpus_frequency[bigram] += 1 + ngram_cluster_frequency[cluster_id][bigram] += 1 + ngrams_in_document.add(bigram) + total_unique_ngrams.add(bigram) ngrams_in_document = set() for cluster_id, ngram_frequencies in ngram_cluster_frequency.items(): ngram_rarity: List[Tuple] = [] for ngram, cluster_frequency in ngram_frequencies.items(): - if cluster_frequency > 5: # ensure they are not just rare tokens in the corpus, but do occur with some freqency in the clusters + if cluster_frequency > 5: # ensure they are not just rare tokens in the corpus, but do occur with some frequency in the clusters rarity = float(cluster_frequency) / float(ngram_corpus_frequency[ngram]) ngram_rarity.append((ngram, rarity)) try: @@ -116,6 +143,8 @@ def create_topics(self, documents: List[Dict], id_to_cluster_label: Dict, id_fie rarest_ngram = sorted(rarest_ngrams, key=len, reverse=True)[0][0] except IndexError: print("empty abstract") + rarest_ngrams = [] + rarest_ngram = None print(cluster_id) print(rarest_ngrams) @@ -133,6 +162,4 @@ def create_topics(self, documents: List[Dict], id_to_cluster_label: Dict, id_fie bot = Utils() lookup = bot.user_lookup_sns("JoeBiden", 5000) print(len(lookup)) - print(lookup[-1]) - - + print(lookup[-1]) \ No newline at end of file