Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 185 additions & 147 deletions nexus/profile.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,185 @@
from typing import List, Optional
import preprocessor as p
from nomic import AtlasClient, CohereEmbedder
from utils import Utils
import jsonlines
import json
from tqdm import tqdm
from loguru import logger
import numpy as np
from sklearn.cluster import KMeans
import os
from dotenv import load_dotenv

load_dotenv()


class Profile:
def __init__(self):
self.utils = Utils()
self.atlas = AtlasClient()

def create_social_profile_tweepy(self, map_name: str, map_description: str, users: List[str], outdir: str):
"""Create social profile with tweepy as tweet source

:param map_name: name of atlas map
:param map_description: description of atlas map
:param users: handle of twitter user that is used to create the social profile
:param outdir: specified directory of where the tweets(in JSON format) shoudl go
"""
lookup_amount = 10000
for user in users:
tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in
self.utils.user_lookup(user, lookup_amount)]
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
for idx, tweet in enumerate(tweets):
if len(tweet["text"]) < 10:
tweets.pop(idx)
continue
writer.write(tweet)

self.atlas.map_text(data=tweets,
indexed_field='text',
is_public=True,
map_name=map_name,
map_description=map_description,
organization_name=None, # defaults to your current user.
num_workers=10
)

def create_social_profile_sns(self,
map_name: str,
map_description: str,
users: List[str],
outdir: Optional[str] = '',
topics: bool = False,
embedding_path: str = ''
):
"""

:param embedding_path: path of npy file for topic extraction
:param topics: indicating whether or not you want to automatic topic extraction
:param map_name: name of atlas map
:param map_description: description of atlas map
:param users: handle of twitter user that is used to create the social profile
:param outdir: specified directory of where the tweets(in JSON format) should go
"""
all_tweets = []


for user in tqdm(users):
try:
logger.info(f"Loading {user}'s tweets from disk")
data_path = os.path.join(outdir, f"{user}_tweets.jsonl")
with jsonlines.open(data_path, mode="r") as tweets:
for tweet in tweets:
all_tweets.append(tweet)
except BaseException:
logger.info(f"Not on disk! scraping {users}'s tweets now")
tweets = self.utils.user_lookup_sns(user, 10000)
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
for idx, tweet in enumerate(tweets):
tweet["full_text"] = p.clean(tweet["full_text"])
if len(tweet["full_text"]) > 30:
tweet["created_at"] = str(tweet["created_at"])
all_tweets.append(tweet)
writer.write(tweet)


for idx, tweet in enumerate(all_tweets):
tweet["id"] = str(idx)

if topics:
n_cluster_docs = [40]
for n_clusters in n_cluster_docs:
logger.info(f"computing {n_clusters} cluster layer")
try:
with open(f"data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}", "r") as f:
id_to_cluster_label = json.load(f)
logger.info("Loaded all resources from disk")
print(id_to_cluster_label[-1])
except BaseException:
# remake clusters
id_to_cluster_label = {}

try:
logger.info("Loading embeddings from disk.")
embeddings = np.load(embedding_path)
except BaseException:
logger.info("Embedding with Cohere")
cohere_api_key = os.getenv("COHERE_KEY")
embedder = CohereEmbedder(cohere_api_key=cohere_api_key)
embeddings = np.array(embedder.embed(texts=[datum['full_text'] for datum in all_tweets])).squeeze()
with open(embedding_path, 'wb') as f:
np.save(f, embeddings)
logger.info("Running Kmeans to generate clusters")
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(embeddings)
for datum, cluster_id in zip(all_tweets, [int(i) for i in list(kmeans.labels_)]):
id_to_cluster_label[datum['id']] = cluster_id

with open(f'data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}', 'w') as f:
json.dump(id_to_cluster_label, f)
print(len(all_tweets))
logger.info("Computing Topics")
cluster_rarity_list = self.utils.create_topics(all_tweets, id_to_cluster_label=id_to_cluster_label)

for idx, datum in enumerate(all_tweets):
datum_cluster = id_to_cluster_label[str(idx)]
datum[f"cluster_{n_clusters}"] = datum_cluster
datum[f"topic_{n_clusters}"] = cluster_rarity_list[datum_cluster][1]

self.atlas.map_text(data=all_tweets,
indexed_field='full_text',
is_public=True,
map_name=map_name,
map_description=map_description,
colorable_fields=["topic_40","user"],
)


if __name__ == "__main__":
profiler = Profile()
profiler.create_social_profile_sns(outdir='data/',
map_name='Social Profile of the current POTUS',
map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand",
users=["JoeBiden", "POTUS"],
topics=True,
embedding_path="embeddings/JoeBiden.npy")
from typing import List, Optional
import preprocessor as p
from nomic import AtlasClient, CohereEmbedder
from utils import Utils
import jsonlines
import json
from tqdm import tqdm
from loguru import logger
import numpy as np
from sklearn.cluster import KMeans
import os
from dotenv import load_dotenv

load_dotenv()

EMBEDDINGS_DIR = "embeddings"


def is_safe_embedding_path(path: str) -> str:
"""
Validates and constructs an absolute, safe path for embedding files.
Ensures the path is within the EMBEDDINGS_DIR directory, prevents path
traversal and absolute path usage.

:param path: User-supplied relative embedding file path.
:return: The safe, absolute path to use.
:raises ValueError: If the path is unsafe or not inside EMBEDDINGS_DIR.
"""
if not path:
raise ValueError("Empty embedding path provided.")
if os.path.isabs(path):
raise ValueError("Absolute paths are not allowed for embedding_path.")

# Ensure embedding_path is relative to EMBEDDINGS_DIR
abs_embeddings_dir = os.path.abspath(EMBEDDINGS_DIR)
joined_path = os.path.join(EMBEDDINGS_DIR, path)
abs_target_path = os.path.abspath(joined_path)

if not abs_target_path.startswith(abs_embeddings_dir + os.sep):
raise ValueError(
f"Insecure embedding_path '{path}' detected. "
f"Path must be within '{EMBEDDINGS_DIR}/' directory."
)

# Ensure the target directory exists
target_dir = os.path.dirname(abs_target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)

return abs_target_path


class Profile:
def __init__(self):
self.utils = Utils()
self.atlas = AtlasClient()

def create_social_profile_tweepy(self, map_name: str, map_description: str, users: List[str], outdir: str):
"""Create social profile with tweepy as tweet source

:param map_name: name of atlas map
:param map_description: description of atlas map
:param users: handle of twitter user that is used to create the social profile
:param outdir: specified directory of where the tweets(in JSON format) shoudl go
"""
lookup_amount = 10000
for user in users:
tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in
self.utils.user_lookup(user, lookup_amount)]
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
for idx, tweet in enumerate(tweets):
if len(tweet["text"]) < 10:
tweets.pop(idx)
continue
writer.write(tweet)

self.atlas.map_text(data=tweets,
indexed_field='text',
is_public=True,
map_name=map_name,
map_description=map_description,
organization_name=None, # defaults to your current user.
num_workers=10
)

def create_social_profile_sns(self,
map_name: str,
map_description: str,
users: List[str],
outdir: Optional[str] = '',
topics: bool = False,
embedding_path: str = ''
):
"""

:param embedding_path: path of npy file for topic extraction
:param topics: indicating whether or not you want to automatic topic extraction
:param map_name: name of atlas map
:param map_description: description of atlas map
:param users: handle of twitter user that is used to create the social profile
:param outdir: specified directory of where the tweets(in JSON format) should go
"""
all_tweets = []


for user in tqdm(users):
try:
logger.info(f"Loading {user}'s tweets from disk")
data_path = os.path.join(outdir, f"{user}_tweets.jsonl")
with jsonlines.open(data_path, mode="r") as tweets:
for tweet in tweets:
all_tweets.append(tweet)
except BaseException:
logger.info(f"Not on disk! scraping {users}'s tweets now")
tweets = self.utils.user_lookup_sns(user, 10000)
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
for idx, tweet in enumerate(tweets):
tweet["full_text"] = p.clean(tweet["full_text"])
if len(tweet["full_text"]) > 30:
tweet["created_at"] = str(tweet["created_at"])
all_tweets.append(tweet)
writer.write(tweet)


for idx, tweet in enumerate(all_tweets):
tweet["id"] = str(idx)

if topics:
n_cluster_docs = [40]
for n_clusters in n_cluster_docs:
logger.info(f"computing {n_clusters} cluster layer")
try:
with open(f"data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}", "r") as f:
id_to_cluster_label = json.load(f)
logger.info("Loaded all resources from disk")
print(id_to_cluster_label[-1])
except BaseException:
# remake clusters
id_to_cluster_label = {}

try:
logger.info("Loading embeddings from disk.")
safe_embedding_path = is_safe_embedding_path(embedding_path)
embeddings = np.load(safe_embedding_path)
except BaseException:
logger.info("Embedding with Cohere")
cohere_api_key = os.getenv("COHERE_KEY")
embedder = CohereEmbedder(cohere_api_key=cohere_api_key)
embeddings = np.array(embedder.embed(texts=[datum['full_text'] for datum in all_tweets])).squeeze()
safe_embedding_path = is_safe_embedding_path(embedding_path)
with open(safe_embedding_path, 'wb') as f:
np.save(f, embeddings)
logger.info("Running Kmeans to generate clusters")
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(embeddings)
for datum, cluster_id in zip(all_tweets, [int(i) for i in list(kmeans.labels_)]):
id_to_cluster_label[datum['id']] = cluster_id

with open(f'data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}', 'w') as f:
json.dump(id_to_cluster_label, f)
print(len(all_tweets))
logger.info("Computing Topics")
cluster_rarity_list = self.utils.create_topics(all_tweets, id_to_cluster_label=id_to_cluster_label)

for idx, datum in enumerate(all_tweets):
datum_cluster = id_to_cluster_label[str(idx)]
datum[f"cluster_{n_clusters}"] = datum_cluster
datum[f"topic_{n_clusters}"] = cluster_rarity_list[datum_cluster][1]

self.atlas.map_text(data=all_tweets,
indexed_field='full_text',
is_public=True,
map_name=map_name,
map_description=map_description,
colorable_fields=["topic_40","user"],
)


if __name__ == "__main__":
profiler = Profile()
profiler.create_social_profile_sns(outdir='data/',
map_name='Social Profile of the current POTUS',
map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand",
users=["JoeBiden", "POTUS"],
topics=True,
embedding_path="JoeBiden.npy")