-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
135 lines (101 loc) · 4.16 KB
/
models.py
File metadata and controls
135 lines (101 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Pydantic models for OSINT scan: Entity, Node, Edge, ScanConfig, ScanResult."""
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel, Field
class EntityType(str, Enum):
USERNAME = "username"
EMAIL = "email"
PHONE = "phone"
WALLET = "wallet"
DOMAIN = "domain"
IP = "ip"
class Entity(BaseModel):
"""An identity or identifier discovered during a scan."""
type: EntityType
value: str
source: str = Field(..., description="Resolver or source that found this entity")
confidence: float = Field(ge=0.0, le=1.0, default=1.0)
depth: int = Field(ge=0, description="Hops from seed")
def entity_key(self) -> str:
"""Canonical key for deduplication (normalized type:value)."""
v = (str(self.value) if not isinstance(self.value, str) else self.value).strip().lower()
if self.type == EntityType.EMAIL:
v = v.lower()
return f"{self.type.value}:{v}"
class Node(BaseModel):
"""Graph node representing an entity."""
id: str = Field(..., description="Same as entity_key for the entity")
type: EntityType
value: str
metadata: dict[str, Any] = Field(default_factory=dict)
depth: int = 0
class Edge(BaseModel):
"""Directed edge between two nodes."""
source: str = Field(..., description="Node id (entity_key)")
target: str = Field(..., description="Node id (entity_key)")
relationship: str = Field(default="linked_to")
confidence: float = Field(ge=0.0, le=1.0, default=1.0)
class ScanConfig(BaseModel):
"""Limits and options for a scan."""
max_entities: int = Field(default=500, ge=1, le=10_000)
max_depth: int = Field(default=3, ge=0, le=10)
timeout_minutes: int = Field(default=20, ge=1, le=120)
demo_mode: bool = Field(default=False, description=(
"When True, caps the scan at max_depth=1, max_entities=50, timeout=3 min, "
"and skips GPU post-processing to keep total wall-clock time under 3 minutes "
"for live demonstrations."
))
def model_post_init(self, __context: object) -> None:
"""If demo_mode is set, enforce fast limits."""
if self.demo_mode:
self.max_depth = min(self.max_depth, 1)
self.max_entities = min(self.max_entities, 50)
self.timeout_minutes = min(self.timeout_minutes, 3)
class ScanStatus(str, Enum):
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class ScanResult(BaseModel):
"""Result stored per scan_id (status + optional graph)."""
status: ScanStatus
graph: dict[str, Any] | None = Field(default=None, description="nodes + edges when completed")
report: str | None = Field(default=None, description="Final intelligence report markdown")
error: str | None = Field(default=None, description="Error message when status is failed")
entities_seen: int = 0
depth_reached: int = 0
# Request/response schemas for API
class SeedRequest(BaseModel):
type: EntityType
value: str
email: str | None = None
real_name: str | None = Field(default=None, description=(
"Known real name of the target (e.g. 'Darsh Poddar'). "
"Used to anchor identity and reject mismatches faster."
))
class ScanRequest(BaseModel):
seed: SeedRequest
config: ScanConfig | None = None
demo_mode: bool = Field(default=False, description=(
"Shorthand to enable demo_mode without constructing a full config object. "
"Caps scan at max_depth=1, max_entities=50, timeout=3 min, skips GPU post-processing."
))
def model_post_init(self, __context: object) -> None:
"""Propagate demo_mode shorthand into config."""
if self.demo_mode:
if self.config is None:
self.config = ScanConfig(demo_mode=True)
else:
self.config.demo_mode = True
class ScanResponse(BaseModel):
scan_id: str
class StatusResponse(BaseModel):
scan_id: str
status: ScanStatus
entities_seen: int = 0
depth_reached: int = 0
error: str | None = None
report: str | None = None
class GraphResponse(BaseModel):
nodes: list[dict[str, Any]]
edges: list[dict[str, Any]]