-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathduplicate_handler.py
More file actions
208 lines (181 loc) · 10 KB
/
duplicate_handler.py
File metadata and controls
208 lines (181 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import hashlib
import html
import re
from pathlib import Path
class DuplicateHandler:
"""
Handles post-processing deduplication.
Buffers processed entries, merges duplicates based on HTML content,
and writes the merger log.
"""
def __init__(self, output_base_name):
self.output_name = output_base_name
self.entries = [] # The final list of unique entries
self.mismatch_log = [] # list of (word, headword_span)
self.reinstated_log = []
self.seen_hashes = {} # hash -> index in self.entries
self.dropped_log = {} # hash -> list of dropped headwords [word1, word2]
self.quarantine = {} # (def_hash, word) -> (words, definition, headword_text)
def add(self, words: list[str], definition: str, debug_words=None, is_split_part: bool = False):
headword_text = ""
match = re.search(r'<span class="headword"><b>(.*?)</b></span>', definition)
if match:
headword_text = html.unescape(re.sub(r'<[^>]+>', '', match.group(1)))
# clean_hw retains bare opening parens (e.g. 'hois(s') intentionally
# paren_only relies on this to detect words that only exist as parenthetical roots.
clean_hw = re.sub(r'[ˈˌ]', '', headword_text)
hw_forms = self._expand_parens(clean_hw) # expand balanced parens first
hw_forms = [re.sub(r'\(', '', f) for f in hw_forms] # then strip bare opening parens
clean_hw_base = re.sub(r'\([^)]*\)', '', clean_hw) # e.g. '† ey(e)rer' → '† eyrer'
definition = definition.rstrip()
# Hash the processed definition, we need to unescape so `'` matches `'`
def_hash = hashlib.sha256(html.unescape(definition).encode('utf-8')).hexdigest()
if is_split_part and headword_text and not any(words[0] in form for form in hw_forms):
if debug_words:
print(f"\n\n--> Headword mismatch: '{words[0]}' not found in headword span")
print(f" Headword span: >> {headword_text} <<")
print(f" Quarantining entry pending duplicate check")
self.quarantine[(def_hash, words[0])] = (words, definition, headword_text)
return
if def_hash in self.seen_hashes:
# duplicate found
existing_idx = self.seen_hashes[def_hash]
existing_entry = self.entries[existing_idx]
new_word = words[0]
current_primary = existing_entry['words'][0]
candidates = [current_primary, new_word]
# Filter candidates that are actually in the text
valid_candidates = [c for c in candidates if
any(re.search(rf'(?<!\w){re.escape(c)}(?!\w)', form) for form in hw_forms) or
re.search(rf'(?<!\w){re.escape(c)}\(', clean_hw)]
winning_word = current_primary # Default to keeping current if uncertain
if debug_words:
print(f"\n\n--> Duplicated entry found: '{new_word}'")
if valid_candidates:
def sort_key(w):
"""
Calculates sorting criteria and metadata for a candidate word.
Pecking order:
1. Whoever appears earliest in the headword wins
2. Words that only exist by virtue of expanding a parenthetical form yield
3. If tied on position, a clean standalone word beats one that only exists as a parenthetical root
4. If still tied, the longer word wins
"""
# Look for the earliest occurrence across all forms
occurrences = [form.find(w) for form in hw_forms if w in form]
pos = min(occurrences) if occurrences else len(clean_hw_base)
# Boundary-aware regex to check for standalone existence
pattern = rf'(?<!\w){re.escape(w)}(?!\w)'
in_base = bool(re.search(pattern, clean_hw_base))
in_any_form = any(re.search(pattern, form) for form in hw_forms)
expanded_only = int(not in_base and in_any_form)
paren_only = int(not in_any_form and re.search(rf'(?<!\w){re.escape(w)}\(', clean_hw) is not None)
return (pos, expanded_only, paren_only, -len(w))
# Sort using the metadata tuple
valid_candidates.sort(key=sort_key)
if debug_words:
print(f" hw_forms: {hw_forms}")
print(f" candidates: {valid_candidates}")
for w in valid_candidates:
pos, exp_only, par_only, neg_len = sort_key(w)
print(f" * '{w}': pos={pos}, expanded_only={exp_only}, paren_only={par_only}, len={-neg_len}")
winning_word = valid_candidates[0]
if winning_word == new_word:
# Swap needed: New word is better suited as primary
dropped_headword = current_primary
# Reconstruct entry words with new winner at front
# Create set for unique, but preserve winning_word as 0
all_w = set(existing_entry['words'] + words)
all_w.discard(winning_word)
existing_entry['words'] = [winning_word] + list(all_w)
# Push this entry's sort index past the current tail so it sorts after
# any entries that were registered before the swap occurred.
existing_entry['idx'] = len(self.entries)
else: # No swap: Current is still best
dropped_headword = new_word
for w in words:
if w not in existing_entry['words']:
existing_entry['words'].append(w)
if dropped_headword != winning_word:
if def_hash not in self.dropped_log:
self.dropped_log[def_hash] = []
if dropped_headword not in self.dropped_log[def_hash]:
self.dropped_log[def_hash].append(dropped_headword)
if debug_words:
print(f" '{new_word}' is a duplicate of: '{current_primary}'")
print(f" From headword: >> {headword_text} <<")
print(f" Merging '{dropped_headword}' into '{winning_word}'")
else:
self.seen_hashes[def_hash] = len(self.entries)
self.entries.append({'words': list(words), 'definition': definition, 'idx': len(self.entries)})
def _expand_parens(self, text):
"""Expand 'ey(e)rer' into ['eyrer', 'eyerer']."""
results = [text]
for m in re.finditer(r'\(([^)]+)\)', text):
results = [
r.replace(m.group(0), '', 1) # without paren content
for r in results
] + [
r.replace(m.group(0), m.group(1), 1) # with paren content
for r in results
]
return results
def drain(self):
"""Yield entries in sorted order, clearing as we go."""
for entry in sorted(self.entries, key=lambda e: (e['words'][0], e['idx'])):
yield {'words': entry['words'], 'definition': entry['definition']}
entry['definition'] = None
self.entries.clear()
self.seen_hashes.clear()
self.dropped_log.clear()
self.mismatch_log.clear()
def quarantine_trial(self, debug_words=None):
"""Trial for quarantined entries — run after all adds, before drain."""
for (def_hash, _), (words, definition, headword_text) in self.quarantine.items():
if def_hash in self.seen_hashes:
if debug_words:
print(f"\n Quarantine trial: '{words[0]}' confirmed duplicate — sending to Gaol")
print(f" Headword span: >> {headword_text} <<")
self.mismatch_log.append((words[0], headword_text))
else:
if debug_words:
print(f"\n Quarantine trial: '{words[0]}' not a duplicate — reinstating")
print(f" Headword span: >> {headword_text} <<")
self.entries.append({'words': list(words), 'definition': definition, 'idx': len(self.entries)})
self.reinstated_log.append((words[0], headword_text))
self.quarantine.clear()
def write_logs(self):
if self.dropped_log:
self._write_log_file(
f"{Path(self.output_name).name}_dup_log.txt",
(f"{self.entries[self.seen_hashes[h]]['words'][0]}|{'|'.join(d)}\n"
for h, d in self.dropped_log.items()),
"Duplicate entries log",
)
if self.mismatch_log:
self._write_log_file(
f"{Path(self.output_name).name}_mismatch_log.txt",
(f"{w}|{s}\n" for w, s in self.mismatch_log),
"Headword mismatch log",
)
if self.reinstated_log:
self._write_log_file(
f"{Path(self.output_name).name}_reinstated_log.txt",
(f"{w}|{s}\n" for w, s in self.reinstated_log),
"Reinstated entries log",
)
def _write_log_file(self, filename, lines, label):
log_file = Path(self.output_name).parent / filename
try:
log_file.parent.mkdir(parents=True, exist_ok=True)
if log_file.exists():
log_file.unlink()
with open(log_file, 'w', encoding='utf-8') as lf:
lf.writelines(lines)
print(f"--> {label} written to '{log_file}'.")
except Exception as e:
print(f"--> Warning: Could not write {label.lower()}: {e}")
def get_stats(self):
"""Returns tuple: (unique_hashes_count, entries_with_dupes_count, mismatched_entries, total_dropped_count)"""
total_dropped = sum(len(drops) for drops in self.dropped_log.values()) + len(self.mismatch_log)
return len(self.seen_hashes), len(self.dropped_log), len(self.mismatch_log), total_dropped