diff --git a/nexus/profile.py b/nexus/profile.py index bceb62d..774d6aa 100644 --- a/nexus/profile.py +++ b/nexus/profile.py @@ -9,11 +9,34 @@ import numpy as np from sklearn.cluster import KMeans import os +import re from dotenv import load_dotenv load_dotenv() +def sanitize_handle(user: str) -> str: + # Only allow alphanumeric and underscores (Twitter handles only allow these) + if not re.match(r'^[A-Za-z0-9_]+$', user): + raise ValueError(f"Unsafe username '{user}'. Only letters, digits, and underscores are allowed.") + # Prevent path traversal (should be redundant above, but for extra safety) + if os.sep in user or '/' in user or '\\' in user or '..' in user: + raise ValueError(f"Unsafe username '{user}'. Path separators are not allowed.") + return user + + +def safe_user_tweet_path(outdir: str, user: str) -> str: + handle = sanitize_handle(user) + filename = f"{handle}_tweets.jsonl" + # Compose path and ensure it's inside the intended outdir (defense-in-depth) + file_path = os.path.join(outdir, filename) + abs_outdir = os.path.abspath(outdir) + abs_file_path = os.path.abspath(file_path) + if not abs_file_path.startswith(abs_outdir + os.sep) and abs_file_path != abs_outdir: + raise ValueError(f"Unsafe path resolution for user '{user}'.") + return file_path + + class Profile: def __init__(self): self.utils = Utils() @@ -29,9 +52,10 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user """ lookup_amount = 10000 for user in users: + safe_path = safe_user_tweet_path(outdir, user) tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in self.utils.user_lookup(user, lookup_amount)] - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: + with jsonlines.open(safe_path, mode='a') as writer: for idx, tweet in enumerate(tweets): if len(tweet["text"]) < 10: tweets.pop(idx) @@ -70,14 +94,16 @@ def create_social_profile_sns(self, for user in tqdm(users): try: logger.info(f"Loading {user}'s tweets from disk") - data_path = os.path.join(outdir, f"{user}_tweets.jsonl") + safe_path = safe_user_tweet_path(outdir, user) + data_path = safe_path with jsonlines.open(data_path, mode="r") as tweets: for tweet in tweets: all_tweets.append(tweet) except BaseException: logger.info(f"Not on disk! scraping {users}'s tweets now") tweets = self.utils.user_lookup_sns(user, 10000) - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: + safe_path = safe_user_tweet_path(outdir, user) + with jsonlines.open(safe_path, mode='a') as writer: for idx, tweet in enumerate(tweets): tweet["full_text"] = p.clean(tweet["full_text"]) if len(tweet["full_text"]) > 30: @@ -144,4 +170,4 @@ def create_social_profile_sns(self, map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand", users=["JoeBiden", "POTUS"], topics=True, - embedding_path="embeddings/JoeBiden.npy") + embedding_path="embeddings/JoeBiden.npy") \ No newline at end of file