diff --git a/nexus/profile.py b/nexus/profile.py index bceb62d..3775a84 100644 --- a/nexus/profile.py +++ b/nexus/profile.py @@ -10,6 +10,8 @@ from sklearn.cluster import KMeans import os from dotenv import load_dotenv +import re +import unicodedata load_dotenv() @@ -19,6 +21,70 @@ def __init__(self): self.utils = Utils() self.atlas = AtlasClient() + def _sanitize_filename(self, username: str) -> str: + """ + Sanitizes the input username for safe use in filenames: + - Removes any slashes, backslashes, or dot-dot patterns + - Removes illegal/suspicious characters + - Only allows alphanumeric, underscore, hyphen, and dot + - Optionally trims length if needed + """ + # Remove directory traversal patterns + username = re.sub(r'(\.\.[/\\])+', '', username) + # Remove all slashes and backslashes + username = username.replace('/', '').replace('\\', '') + # Remove all but allowed characters + username = re.sub(r'[^A-Za-z0-9._-]', '', username) + # Optionally, trim overly long usernames for safety + return username[:128] + + @staticmethod + def _is_valid_tweet_content(text: str) -> bool: + """ + Checks if the tweet text is valid for ingestion into ML pipeline. + - Checks for unicode control chars and invisibles + - Length thresholding (not too short/long) + - Detects repeated substrings (simple backdoor attempts) + - Checks for abnormal non-latin characters (rudimentary language check) + - Blocks strings with excessive links, mentions, hashtags + """ + if not isinstance(text, str): + return False + # Remove whitespace for effective length + content = text.strip() + # Length between 30 and 400 (Twitter hard max is 280, maybe longer for threads) + if len(content) < 30 or len(content) > 400: + return False + # Block excessive URLs (>2) + url_count = len(re.findall(r'https?://', content)) + if url_count > 2: + return False + # Block excessive mentions (>5) + mention_count = len(re.findall(r'@\w+', content)) + if mention_count > 5: + return False + # Block excessive hashtags (>7) + hashtag_count = len(re.findall(r'#\w+', content)) + if hashtag_count > 7: + return False + # Block control/unprintable chars + if any(unicodedata.category(c)[0] == "C" and c not in ("\n", "\r", "\t") for c in content): + return False + # Check for non-latin (heuristic; could be improved) + nonlatin = re.sub(r"[a-zA-Z0-9 .,;:?!'\"()\-\[\]{}@#/]", '', content) + # If >30% of text is non-latin non-punctuation, suspicious + if len(nonlatin) > 0.3 * len(content): + return False + # Block repeated n-grams (simple backdoor triggers, e.g. 'xyz xyz xyz ...') + repeated_word_seq = re.search(r'(\b\w+\b)( \1\b)+', content) + if repeated_word_seq is not None: + return False + # Block extremely high character repetition (e.g. 'A' * 50) + for c in set(content): + if content.count(c) > 40 and c != ' ': + return False + return True + def create_social_profile_tweepy(self, map_name: str, map_description: str, users: List[str], outdir: str): """Create social profile with tweepy as tweet source @@ -31,7 +97,10 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user for user in users: tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in self.utils.user_lookup(user, lookup_amount)] - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: + safe_user = self._sanitize_filename(user) + user_file_path = os.path.join(outdir, f'{safe_user}_tweets.jsonl') + os.makedirs(os.path.dirname(user_file_path), exist_ok=True) + with jsonlines.open(user_file_path, mode='a') as writer: for idx, tweet in enumerate(tweets): if len(tweet["text"]) < 10: tweets.pop(idx) @@ -70,20 +139,31 @@ def create_social_profile_sns(self, for user in tqdm(users): try: logger.info(f"Loading {user}'s tweets from disk") - data_path = os.path.join(outdir, f"{user}_tweets.jsonl") + safe_user = self._sanitize_filename(user) + data_path = os.path.join(outdir, f"{safe_user}_tweets.jsonl") with jsonlines.open(data_path, mode="r") as tweets: for tweet in tweets: - all_tweets.append(tweet) + # VALIDATION: Filter for valid tweets only + text_check = tweet.get("full_text", tweet.get("text", "")) + if Profile._is_valid_tweet_content(str(text_check)): + all_tweets.append(tweet) + else: + logger.warning(f"Filtered suspicious tweet from disk for user {user}: {text_check!r}") except BaseException: logger.info(f"Not on disk! scraping {users}'s tweets now") tweets = self.utils.user_lookup_sns(user, 10000) - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: + safe_user = self._sanitize_filename(user) + user_file_path = os.path.join(outdir, f'{safe_user}_tweets.jsonl') + os.makedirs(os.path.dirname(user_file_path), exist_ok=True) + with jsonlines.open(user_file_path, mode='a') as writer: for idx, tweet in enumerate(tweets): tweet["full_text"] = p.clean(tweet["full_text"]) - if len(tweet["full_text"]) > 30: + if Profile._is_valid_tweet_content(tweet["full_text"]): tweet["created_at"] = str(tweet["created_at"]) all_tweets.append(tweet) writer.write(tweet) + else: + logger.warning(f"Filtered suspicious scraped tweet for user {user}: {tweet['full_text']!r}") for idx, tweet in enumerate(all_tweets): @@ -144,4 +224,4 @@ def create_social_profile_sns(self, map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand", users=["JoeBiden", "POTUS"], topics=True, - embedding_path="embeddings/JoeBiden.npy") + embedding_path="embeddings/JoeBiden.npy") \ No newline at end of file