Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 86 additions & 6 deletions nexus/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from sklearn.cluster import KMeans
import os
from dotenv import load_dotenv
import re
import unicodedata

load_dotenv()

Expand All @@ -19,6 +21,70 @@ def __init__(self):
self.utils = Utils()
self.atlas = AtlasClient()

def _sanitize_filename(self, username: str) -> str:
"""
Sanitizes the input username for safe use in filenames:
- Removes any slashes, backslashes, or dot-dot patterns
- Removes illegal/suspicious characters
- Only allows alphanumeric, underscore, hyphen, and dot
- Optionally trims length if needed
"""
# Remove directory traversal patterns
username = re.sub(r'(\.\.[/\\])+', '', username)
# Remove all slashes and backslashes
username = username.replace('/', '').replace('\\', '')
# Remove all but allowed characters
username = re.sub(r'[^A-Za-z0-9._-]', '', username)
# Optionally, trim overly long usernames for safety
return username[:128]

@staticmethod
def _is_valid_tweet_content(text: str) -> bool:
"""
Checks if the tweet text is valid for ingestion into ML pipeline.
- Checks for unicode control chars and invisibles
- Length thresholding (not too short/long)
- Detects repeated substrings (simple backdoor attempts)
- Checks for abnormal non-latin characters (rudimentary language check)
- Blocks strings with excessive links, mentions, hashtags
"""
if not isinstance(text, str):
return False
# Remove whitespace for effective length
content = text.strip()
# Length between 30 and 400 (Twitter hard max is 280, maybe longer for threads)
if len(content) < 30 or len(content) > 400:
return False
# Block excessive URLs (>2)
url_count = len(re.findall(r'https?://', content))
if url_count > 2:
return False
# Block excessive mentions (>5)
mention_count = len(re.findall(r'@\w+', content))
if mention_count > 5:
return False
# Block excessive hashtags (>7)
hashtag_count = len(re.findall(r'#\w+', content))
if hashtag_count > 7:
return False
# Block control/unprintable chars
if any(unicodedata.category(c)[0] == "C" and c not in ("\n", "\r", "\t") for c in content):
return False
# Check for non-latin (heuristic; could be improved)
nonlatin = re.sub(r"[a-zA-Z0-9 .,;:?!'\"()\-\[\]{}@#/]", '', content)
# If >30% of text is non-latin non-punctuation, suspicious
if len(nonlatin) > 0.3 * len(content):
return False
# Block repeated n-grams (simple backdoor triggers, e.g. 'xyz xyz xyz ...')
repeated_word_seq = re.search(r'(\b\w+\b)( \1\b)+', content)
if repeated_word_seq is not None:
return False
# Block extremely high character repetition (e.g. 'A' * 50)
for c in set(content):
if content.count(c) > 40 and c != ' ':
return False
return True

def create_social_profile_tweepy(self, map_name: str, map_description: str, users: List[str], outdir: str):
"""Create social profile with tweepy as tweet source

Expand All @@ -31,7 +97,10 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user
for user in users:
tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in
self.utils.user_lookup(user, lookup_amount)]
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
safe_user = self._sanitize_filename(user)
user_file_path = os.path.join(outdir, f'{safe_user}_tweets.jsonl')
os.makedirs(os.path.dirname(user_file_path), exist_ok=True)
with jsonlines.open(user_file_path, mode='a') as writer:
for idx, tweet in enumerate(tweets):
if len(tweet["text"]) < 10:
tweets.pop(idx)
Expand Down Expand Up @@ -70,20 +139,31 @@ def create_social_profile_sns(self,
for user in tqdm(users):
try:
logger.info(f"Loading {user}'s tweets from disk")
data_path = os.path.join(outdir, f"{user}_tweets.jsonl")
safe_user = self._sanitize_filename(user)
data_path = os.path.join(outdir, f"{safe_user}_tweets.jsonl")
with jsonlines.open(data_path, mode="r") as tweets:
for tweet in tweets:
all_tweets.append(tweet)
# VALIDATION: Filter for valid tweets only
text_check = tweet.get("full_text", tweet.get("text", ""))
if Profile._is_valid_tweet_content(str(text_check)):
all_tweets.append(tweet)
else:
logger.warning(f"Filtered suspicious tweet from disk for user {user}: {text_check!r}")
except BaseException:
logger.info(f"Not on disk! scraping {users}'s tweets now")
tweets = self.utils.user_lookup_sns(user, 10000)
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
safe_user = self._sanitize_filename(user)
user_file_path = os.path.join(outdir, f'{safe_user}_tweets.jsonl')
os.makedirs(os.path.dirname(user_file_path), exist_ok=True)
with jsonlines.open(user_file_path, mode='a') as writer:
for idx, tweet in enumerate(tweets):
tweet["full_text"] = p.clean(tweet["full_text"])
if len(tweet["full_text"]) > 30:
if Profile._is_valid_tweet_content(tweet["full_text"]):
tweet["created_at"] = str(tweet["created_at"])
all_tweets.append(tweet)
writer.write(tweet)
else:
logger.warning(f"Filtered suspicious scraped tweet for user {user}: {tweet['full_text']!r}")


for idx, tweet in enumerate(all_tweets):
Expand Down Expand Up @@ -144,4 +224,4 @@ def create_social_profile_sns(self,
map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand",
users=["JoeBiden", "POTUS"],
topics=True,
embedding_path="embeddings/JoeBiden.npy")
embedding_path="embeddings/JoeBiden.npy")