-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrawler.py
More file actions
148 lines (129 loc) · 4.45 KB
/
crawler.py
File metadata and controls
148 lines (129 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import logging
import time
import os
import sys
import importlib
sys.path.append(os.path.abspath("../"))
from DataGatewayClient import DataGatewayClient
def load_hotels(target):
hotels = []
with open(target + "/hotels.txt") as f:
encoding = "utf-8"
for l in f:
l = l.strip(" \t\r\n")
if not l:
continue
if l.startswith('#'):
l = l[1:].lstrip(" \t")
if l.startswith("encoding") and '=' in l:
encoding = l.split('=')[1].strip(" \t").lower()
continue
args = [a.strip(" \t") for a in l.decode(encoding).split(',')]
hotels.append(args)
return hotels
def fetch_latest(target, history):
try:
mod = importlib.import_module('%s.fetch' % target)
fetch = mod.fetch
except ImportError:
logging.error("** Crawler \'%s\' not found!" % target)
return None
except Exception, e:
logging.error("** Something is wrong with crawler \'%s\': %s" % (target, str(e)))
return None
hotels = load_hotels(target)
results = []
for hotel_arg in hotels:
try:
records = fetch(hotel_arg)
if not records:
continue
except Exception, e:
# ignore errors
logging.error("** Crawler [%s] exception: %s" % (target, str(e)))
continue
hotel_key = target + '-' + '-'.join(hotel_arg)
if hotel_key in history:
hl = history[hotel_key]
else:
hl = []
newhl = []
for r in records:
h = r.hash
if h not in hl:
results.append(r)
newhl.append(h)
history[hotel_key] = newhl
return results
def push_record(client, record):
retries = 0
while retries < 2:
try:
if not client.is_connected():
client.reconnect()
code, reason = client.push("hotel_review", record, "mysql")
if code != 200:
logging.warning("DataGatewayClient reports " + reason)
break
return True
except Exception, e:
client.close()
retries += 1
logging.error("DataGatewayClient error, retry[" + str(retries) + "] " + str(e))
return False
def run_crawler(target, period, gateway):
history = {}
try:
client = DataGatewayClient(gateway)
except Exception, e:
logging.error(str(e))
logging.error("*** Can't connect to gateway server " + str(gateway))
return -1
while True:
try:
logging.info("crawler [%s] is started..." % target)
records = fetch_latest(target, history)
count = 0
for r in records:
push_record(client, r)
count += 1
logging.info("crawler [%s] got %d new records" % (target, count))
except Exception, e:
logging.error("Unhandled exception from \'%s\': %s" % (target, str(e)))
logging.info("crawler [%s] is slept" % target)
time.sleep(period)
def getarg(argv, name, default=None):
capture = False
for a in argv:
if capture:
return a
if a == name:
capture = True
return default
if __name__ == "__main__":
crawlers = []
for e in os.listdir("./"):
if not os.path.isdir(e):
continue
if os.path.exists(e+"/fetch.py") and os.path.exists(e+"/hotels.txt") and os.path.exists(e+"/__init__.py"):
crawlers.append(e)
if len(sys.argv) < 2:
print("Usage: crawler.py <CRAWLER-ID> [OPTIONS]")
print("Currently available crawlers are:")
print(" " + ' '.join(crawlers))
print("")
print("OPTIONS may be:")
print(" -t <PERIOD> period of crawling, in seconds. Default is 3600.")
print(" -g <GATEWAR> address of data gateway server. Default is localhost:8086")
print(" -l <LEVEL> log level, within range [0, 50]. Default is 30(warning).")
exit(1)
# set log level
lvl = int(getarg(sys.argv, '-l', "30"))
logging.getLogger().setLevel(lvl)
target = sys.argv[1]
if target not in crawlers:
print("Crawler \'%s\' not found!" % target)
period = int(getarg(sys.argv, "-t", "3600"))
addr = getarg(sys.argv, "-g", "localhost:8086").split(':')
gateway = (addr[0], int(addr[1]))
run_crawler(target, period, gateway)