-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
114 lines (93 loc) · 3.73 KB
/
main.py
File metadata and controls
114 lines (93 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""Bulk Email Verifier — pre-send list cleaning demo.
Use case: validate a list of B2B prospect emails before launching a cold-email
campaign. Each address is checked for syntax, MX record presence, deliverability,
disposable flag, role-based flag, and a risk score. Outputs JSON + CSV split
into a "send" tier and a "drop" tier.
Run:
pip install -r requirements.txt
cp .env.example .env
python main.py
Actor: https://apify.com/pro100chok/email-verifier?utm_source=github_pro100&utm_medium=readme&utm_campaign=bulk-email-verifier-python
"""
from __future__ import annotations
import csv
import json
import os
import sys
from pathlib import Path
from apify_client import ApifyClient
from apify_client.errors import ApifyApiError
from dotenv import load_dotenv
# In a real workflow this list comes from your CRM export or a previous
# website-contact-scraper run. Replace with addresses you actually need to verify.
EMAILS_TO_CHECK = [
"hello@example.com",
"contact@example.org",
"support@nonexistent-domain-12345.io",
"valid.address@gmail.com",
"throwaway@10minutemail.com", # likely disposable
"info@stripe.com",
"no-reply@github.com",
"ceo@startup.io",
"test+filter@gmail.com",
"press@anthropic.com",
]
ACTOR_ID = "pro100chok/email-verifier"
def main() -> int:
load_dotenv()
token = os.environ.get("APIFY_API_TOKEN")
if not token:
sys.exit("APIFY_API_TOKEN missing — copy .env.example to .env first.")
client = ApifyClient(token)
print(f"Verifying {len(EMAILS_TO_CHECK)} emails via {ACTOR_ID}...")
try:
run = client.actor(ACTOR_ID).call(
run_input={"emails": EMAILS_TO_CHECK, "concurrency": 5},
timeout_secs=600,
)
except ApifyApiError as exc:
sys.exit(f"Actor call failed: {exc}")
if not run or run.get("status") != "SUCCEEDED":
sys.exit(f"Run not SUCCEEDED: {run.get('status') if run else 'no run'}")
items = list(client.dataset(run["defaultDatasetId"]).iterate_items())
if not items:
sys.exit("Empty dataset.")
out = Path(__file__).parent
(out / "output.json").write_text(json.dumps(items, indent=2, ensure_ascii=False), encoding="utf-8")
send: list[dict] = []
drop: list[dict] = []
for it in items:
row = {
"email": it.get("address") or it.get("email"),
"status": it.get("status") or it.get("result"),
"deliverable": it.get("deliverable"),
"is_disposable": it.get("isDisposable") or it.get("disposable"),
"is_role": it.get("isRole") or it.get("roleBased"),
"is_free_provider": it.get("isFreeProvider") or it.get("free"),
"mx_found": it.get("mxFound") or it.get("mxRecord"),
"risk_score": it.get("riskScore") or it.get("score"),
}
status = (row["status"] or "").lower()
is_send = status in {"valid", "deliverable"} and not row["is_disposable"]
(send if is_send else drop).append(row)
write_csv(out / "send.csv", send)
write_csv(out / "drop.csv", drop)
print(f"\nVerified {len(items)} emails:")
print(f" ✓ send: {len(send)} (output: send.csv)")
print(f" ✗ drop: {len(drop)} (output: drop.csv)")
print()
for row in items:
e = row.get("address") or row.get("email")
s = row.get("status") or row.get("result")
print(f" {e:<42} {s}")
return 0
def write_csv(path: Path, rows: list[dict]) -> None:
if not rows:
path.write_text("", encoding="utf-8")
return
with path.open("w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
if __name__ == "__main__":
raise SystemExit(main())