From 55744088ddb78a62dcf0deec4ba9832c0cdbadf5 Mon Sep 17 00:00:00 2001 From: SS <60110107+SS-4@users.noreply.github.com> Date: Mon, 27 Apr 2026 12:37:39 +1000 Subject: [PATCH] Refactor scraping logic for person dataFix KeyError when metadata.container is missing in People parser Fixes crash caused by missing "metadata.container" in Google responses. Replaced unsafe dictionary access with safe .get() checks across people.py. Prevents KeyError and allows parsing to continue when fields are missing. --- ghunt/parsers/people.py | 84 +++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/ghunt/parsers/people.py b/ghunt/parsers/people.py index 43554121..4900a2e4 100644 --- a/ghunt/parsers/people.py +++ b/ghunt/parsers/people.py @@ -16,10 +16,11 @@ def __init__(self): def _scrape(self, gplus_data): self.contentRestriction = gplus_data.get("contentRestriction") - + if (isEnterpriseUser := gplus_data.get("isEnterpriseUser")): self.isEntrepriseUser = isEnterpriseUser + class PersonDynamiteExtendedData(Parser): def __init__(self): self.presence: str = "" @@ -31,14 +32,18 @@ def _scrape(self, dynamite_data): self.presence = dynamite_data.get("presence") self.entityType = dynamite_data.get("entityType") self.dndState = dynamite_data.get("dndState") - if (customerId := dynamite_data.get("organizationInfo", {}).get("customerInfo", {}). - get("customerId", {}).get("customerId")): + + if (customerId := dynamite_data.get("organizationInfo", {}) + .get("customerInfo", {}) + .get("customerId", {}) + .get("customerId")): self.customerId = customerId + class PersonExtendedData(Parser): def __init__(self): - self.dynamiteData: PersonDynamiteExtendedData = PersonDynamiteExtendedData() - self.gplusData: PersonGplusExtendedData = PersonGplusExtendedData() + self.dynamiteData: PersonDynamiteExtendedData = PersonDynamiteExtendedData() + self.gplusData: PersonGplusExtendedData = PersonGplusExtendedData() def _scrape(self, extended_data: Dict[str, any]): if (dynamite_data := extended_data.get("dynamiteExtendedData")): @@ -47,6 +52,7 @@ def _scrape(self, extended_data: Dict[str, any]): if (gplus_data := extended_data.get("gplusExtendedData")): self.gplusData._scrape(gplus_data) + class PersonPhoto(Parser): def __init__(self): self.url: str = "" @@ -56,23 +62,25 @@ def __init__(self): async def _scrape(self, as_client: httpx.AsyncClient, photo_data: Dict[str, any], photo_type: str): if photo_type == "profile_photo": self.url = photo_data.get("url") - self.isDefault, self.flathash = await is_default_profile_pic(as_client, self.url) - + elif photo_type == "cover_photo": - self.url = '='.join(photo_data.get("imageUrl").split("=")[:-1]) - if (isDefault := photo_data.get("isDefault")): - self.isDefault = isDefault + image_url = photo_data.get("imageUrl", "") + if image_url: + self.url = '='.join(image_url.split("=")[:-1]) + self.isDefault = photo_data.get("isDefault", False) else: raise GHuntAPIResponseParsingError(f'The provided photo type "{photo_type}" weren\'t recognized.') + class PersonEmail(Parser): def __init__(self): self.value: str = "" - + def _scrape(self, email_data: Dict[str, any]): self.value = email_data.get("value") + class PersonName(Parser): def __init__(self): self.fullname: str = "" @@ -80,10 +88,8 @@ def __init__(self): self.lastName: str = "" def _scrape(self, name_data: Dict[str, any]): - # self.fullname = unicode_patch(x) if (x := name_data.get("displayName")) else None - # self.firstName = unicode_patch(x) if (x := name_data.get("givenName")) else None - # self.lastName = unicode_patch(x) if (x := name_data.get("familyName")) else None - pass # Google patched the names :/ very sad + pass + class PersonProfileInfo(Parser): def __init__(self): @@ -91,7 +97,8 @@ def __init__(self): def _scrape(self, profile_data: Dict[str, any]): if "ownerUserType" in profile_data: - self.userTypes += profile_data.get("ownerUserType") + self.userTypes += profile_data.get("ownerUserType", []) + class PersonSourceIds(Parser): def __init__(self): @@ -101,22 +108,25 @@ def _scrape(self, source_ids_data: Dict[str, any]): if (timestamp := source_ids_data.get("lastUpdatedMicros")): self.lastUpdated = datetime.utcfromtimestamp(float(timestamp[:10])) + class PersonInAppReachability(Parser): def __init__(self): self.apps: List[str] = [] def _scrape(self, apps_data, container_name: str): for app in apps_data: - if app["metadata"]["container"] == container_name: - self.apps.append(app["appType"].title()) + if app.get("metadata", {}).get("container") == container_name: + self.apps.append(app.get("appType", "").title()) + class PersonContainers(dict): pass + class Person(Parser): def __init__(self): self.personId: str = "" - self.sourceIds: Dict[str, PersonSourceIds] = PersonContainers() # All the fetched containers + self.sourceIds: Dict[str, PersonSourceIds] = PersonContainers() self.emails: Dict[str, PersonEmail] = PersonContainers() self.names: Dict[str, PersonName] = PersonContainers() self.profileInfos: Dict[str, PersonProfileInfo] = PersonContainers() @@ -127,47 +137,65 @@ def __init__(self): async def _scrape(self, as_client: httpx.AsyncClient, person_data: Dict[str, any]): self.personId = person_data.get("personId") + if person_data.get("email"): for email_data in person_data["email"]: + container = email_data.get("metadata", {}).get("container") + if not container: + continue person_email = PersonEmail() person_email._scrape(email_data) - self.emails[email_data["metadata"]["container"]] = person_email + self.emails[container] = person_email if person_data.get("name"): for name_data in person_data["name"]: + container = name_data.get("metadata", {}).get("container") + if not container: + continue person_name = PersonName() person_name._scrape(name_data) - self.names[name_data["metadata"]["container"]] = person_name + self.names[container] = person_name if person_data.get("readOnlyProfileInfo"): for profile_data in person_data["readOnlyProfileInfo"]: + container = profile_data.get("metadata", {}).get("container") + if not container: + continue + person_profile = PersonProfileInfo() person_profile._scrape(profile_data) - self.profileInfos[profile_data["metadata"]["container"]] = person_profile + self.profileInfos[container] = person_profile if person_data.get("photo"): for photo_data in person_data["photo"]: person_photo = PersonPhoto() await person_photo._scrape(as_client, photo_data, "profile_photo") - self.profilePhotos[profile_data["metadata"]["container"]] = person_photo + self.profilePhotos[container] = person_photo if (source_ids := person_data.get("metadata", {}).get("identityInfo", {}).get("sourceIds")): for source_ids_data in source_ids: + container = source_ids_data.get("container") + if not container: + continue person_source_ids = PersonSourceIds() person_source_ids._scrape(source_ids_data) - self.sourceIds[source_ids_data["container"]] = person_source_ids + self.sourceIds[container] = person_source_ids if person_data.get("coverPhoto"): for cover_photo_data in person_data["coverPhoto"]: + container = cover_photo_data.get("metadata", {}).get("container") + if not container: + continue person_cover_photo = PersonPhoto() await person_cover_photo._scrape(as_client, cover_photo_data, "cover_photo") - container = cover_photo_data.get("metadata", {}).get("container", "unknown") self.coverPhotos[container] = person_cover_photo if (apps_data := person_data.get("inAppReachability")): containers_names = set() - for app_data in person_data["inAppReachability"]: - containers_names.add(app_data["metadata"]["container"]) + for app_data in apps_data: + container = app_data.get("metadata", {}).get("container") + if container: + containers_names.add(container) for container_name in containers_names: person_app_reachability = PersonInAppReachability() @@ -175,4 +203,4 @@ async def _scrape(self, as_client: httpx.AsyncClient, person_data: Dict[str, any self.inAppReachability[container_name] = person_app_reachability if (extended_data := person_data.get("extendedData")): - self.extendedData._scrape(extended_data) \ No newline at end of file + self.extendedData._scrape(extended_data)