-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmsgdiff.py
More file actions
executable file
·153 lines (126 loc) · 5.01 KB
/
msgdiff.py
File metadata and controls
executable file
·153 lines (126 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
import argparse
from functools import partial
import logging
import multiprocessing.pool
import os
import pickle
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple # noqa
from respdiff import cli
from respdiff.dataformat import (
DiffReport,
Disagreements,
DisagreementsCounter,
FieldLabel,
QID,
)
from respdiff.database import DNSRepliesFactory, DNSReply, key2qid, LMDB, MetaDatabase
from respdiff.match import compare
from respdiff.typing import ResolverID
wrk_lmdb = None
def read_answers_lmdb(
dnsreplies_factory: DNSRepliesFactory, qid: QID
) -> Mapping[ResolverID, DNSReply]:
assert wrk_lmdb is not None, "LMDB wasn't initialized!"
adb = wrk_lmdb.get_db(LMDB.ANSWERS)
with wrk_lmdb.env.begin(adb) as txn:
replies_blob = txn.get(qid)
assert replies_blob
return dnsreplies_factory.parse(replies_blob)
def compare_lmdb_wrapper(
criteria: Sequence[FieldLabel],
target: ResolverID,
dnsreplies_factory: DNSRepliesFactory,
qid: QID,
) -> None:
assert wrk_lmdb is not None, "LMDB wasn't initialized!"
answers = read_answers_lmdb(dnsreplies_factory, qid)
others_agree, target_diffs = compare(answers, criteria, target)
if others_agree and not target_diffs:
return # all agreed, nothing to write
blob = pickle.dumps((others_agree, target_diffs))
ddb = wrk_lmdb.get_db(LMDB.DIFFS)
with wrk_lmdb.env.begin(ddb, write=True) as txn:
txn.put(qid, blob)
def export_json(lmdb: LMDB, filename: str, report: DiffReport):
report.other_disagreements = DisagreementsCounter()
report.target_disagreements = Disagreements()
# get diff data
ddb = lmdb.get_db(LMDB.DIFFS)
with lmdb.env.begin(ddb) as txn:
with txn.cursor() as diffcur:
for key, diffblob in diffcur:
qid = key2qid(key)
others_agree, diff = pickle.loads(diffblob)
if not others_agree:
report.other_disagreements.queries.add(qid)
else:
for field, mismatch in diff.items():
report.target_disagreements.add_mismatch(field, mismatch, qid)
# NOTE: msgdiff is the first tool in the toolchain to generate report.json
# thus it doesn't make sense to re-use existing report.json file
if os.path.exists(filename):
backup_filename = filename + ".bak"
os.rename(filename, backup_filename)
logging.warning(
"JSON report already exists, overwriting file. Original "
"file backed up as %s",
backup_filename,
)
report.export_json(filename)
def prepare_report(lmdb_, servers: Sequence[ResolverID]) -> DiffReport:
qdb = lmdb_.open_db(LMDB.QUERIES)
adb = lmdb_.open_db(LMDB.ANSWERS)
with lmdb_.env.begin() as txn:
total_queries = txn.stat(qdb)["entries"]
total_answers = txn.stat(adb)["entries"]
meta = MetaDatabase(lmdb_, servers)
start_time = meta.read_start_time()
end_time = meta.read_end_time()
return DiffReport(start_time, end_time, total_queries, total_answers)
def wrk_lmdb_init(envdir):
"""Each worker process has it's own LMDB connection"""
global wrk_lmdb
wrk_lmdb = LMDB(envdir, fast=True)
wrk_lmdb.open_db(LMDB.ANSWERS)
wrk_lmdb.open_db(LMDB.DIFFS)
def main():
cli.setup_logging()
parser = argparse.ArgumentParser(
description="compute diff from answers stored in LMDB and write diffs to LMDB"
)
cli.add_arg_envdir(parser)
cli.add_arg_config(parser)
cli.add_arg_datafile(parser)
args = parser.parse_args()
datafile = cli.get_datafile(args, check_exists=False)
criteria = args.cfg["diff"]["criteria"]
target = args.cfg["diff"]["target"]
servers = args.cfg["servers"]["names"]
multiprocessing.set_start_method("forkserver")
with LMDB(args.envdir) as lmdb:
# NOTE: To avoid an lmdb.BadRslotError, probably caused by weird
# interaction when using multiple transaction / processes, open a separate
# environment. Also, any dbs have to be opened before using MetaDatabase().
report = prepare_report(lmdb, servers)
cli.check_metadb_servers_version(lmdb, servers)
# sanity check we have some answers
lmdb.open_db(LMDB.ANSWERS)
# prepare state shared by all workers
lmdb.open_db(LMDB.DIFFS, create=True, drop=True)
with LMDB(args.envdir, readonly=True) as lmdb:
lmdb.open_db(LMDB.ANSWERS)
lmdb.open_db(LMDB.DIFFS)
qid_stream = lmdb.key_stream(LMDB.ANSWERS)
dnsreplies_factory = DNSRepliesFactory(servers)
compare_func = partial(
compare_lmdb_wrapper, criteria, target, dnsreplies_factory
)
with multiprocessing.pool.Pool(
initializer=wrk_lmdb_init, initargs=(args.envdir,)
) as p:
for _ in p.imap_unordered(compare_func, qid_stream, chunksize=10):
pass
export_json(lmdb, datafile, report)
if __name__ == "__main__":
main()