-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
212 lines (188 loc) · 8.89 KB
/
Copy pathscript.py
File metadata and controls
212 lines (188 loc) · 8.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import requests
import json
import time
import random
from config import config
class ImmowebScraper:
def __init__(self, url):
self.url = url
self.last_result = ''
self.last_results = []
def get_source_from_url(self):
source = str(requests.get(self.url).text)
return source
def get_results_from_source(self, source):
json_data = json.loads(source)
results = json_data['results']
return results
# (TO DO) DONE: api search results do not refresh one by one, but some properties at once - make list of last result ids en check every result of new results if id is in last_result list
def update(self):
results = self.get_results_from_source(self.get_source_from_url())
# check if the current newest result is the same as the last result
for result in results:
if result['id'] not in self.last_results:
# wipe whole list if new result is found (otherwise the list will grow indefinitely)
try:
self.last_results.append(result['id'])
#self.last_result = result['id']
# make it balances - now it give {'ok': False, 'error_code': 429, 'description': 'Too Many Requests: retry after 41', 'parameters': {'retry_after': 41}} if a lot of properties are send at same time
self.notify(result)
time.sleep(60)
except Exception as e:
print(e)
pass
#if results[0]['id'] != self.last_result:
# self.last_result = results[0]['id']
# try:
# self.notify(results[0])
# except Exception as e:
# print(e)
# pass
# random sleep time in between refreshes
random_time = random.randint(40,60)
time.sleep(random_time)
def notify(self, result):
message = self.get_title(result) + self.get_address(result) + self.get_bedroomCount(result) + self.get_netHabitableSurface(result) + self.get_price(result) + ' \([Immoweb](https://www.immoweb.be/en/classified/' + str(result['id'])+ ')\)'
if self.get_picture(result) != 'error':
# if multiple pictures are avalable, send them as a a media group followed by the message
try:
media = json.dumps(self.get_pictures(result))
if media == '[]':
raise
#print('------------------------------------------------------')
#print(media)
#print('------------------------------------------------------')
response = requests.post(
url='https://api.telegram.org/bot{0}/sendMediaGroup'.format(token),
data={'chat_id': chat_id, 'media': media},
).json()
print(response)
response = requests.post(
url='https://api.telegram.org/bot{0}/sendMessage'.format(token),
data={'chat_id': chat_id, 'text': message, 'parse_mode': 'MarkdownV2', 'disable_web_page_preview': 'true'},
).json()
print(response)
# if only one picture is available, send it as a photo with the message as a caption
except:
response = requests.post(
url='https://api.telegram.org/bot{0}/sendPhoto'.format(token),
data={'chat_id': chat_id, 'photo': self.get_picture(result), 'caption': message, 'parse_mode': 'MarkdownV2'}
).json()
print(response)
# if no picture is available, send the message as a text message
else:
response = requests.post(
url='https://api.telegram.org/bot{0}/sendMessage'.format(token),
data={'chat_id': chat_id, 'text': message, 'parse_mode': 'MarkdownV2', 'disable_web_page_preview': 'true'},
).json()
print(response)
# ------------------------------------- GETTERS --------------------------------------------------------------------------------------------------
def get_title(self, result):
try:
property_type = str(result['property']['type'])
transaction_type = self.get_transaction_type(result)
return property_type + transaction_type + 'in ' + self.get_locality(result)
except:
return 'error'
def get_transaction_type(self, result):
try:
transaction_type = str(result['transaction']['type'])
if transaction_type == 'FOR_RENT':
return ' for rent '
elif transaction_type == 'FOR_SALE':
return ' for sale '
else:
return ' '
except:
return 'error'
# "Bad Request: can't parse entities: Character '-' is reserved and must be escaped with the preceding '\\'"} --> some localities have special character (eg.'-') in their name that have to be escaped
def get_locality(self, result):
try:
locality = str(result['property']['location']['locality'])
locality = locality.replace('-','\\-').replace('(','\\(').replace(')','\\)')
return locality
except:
return 'error'
def get_address(self, result):
try:
street = str(result['property']['location']['street'])
number = str(result['property']['location']['number'])
# "Bad Request: can't parse entities: Character '-' is reserved and must be escaped with the preceding '\\'"} --> to avoid problems with special characters in the address, we have to escape them
street = street.replace('-','\\-').replace('(','\\(').replace(')','\\)')
number = number.replace('-','\\-').replace('(','\\(').replace(')','\\)')
if street != ('None' or 'null') and number != ('None' or 'null'):
return ' \([' + street + ' ' + number + '](https://www.google.be/maps/place/' + street + ' ' + number + ' ' + self.get_locality(result) + ')\)\.'
elif street != ('None' or 'null') and number != 'None':
return ' \([' + street + ' ' + number + '](https://www.google.be/maps/place/' + street + ' ' + self.get_locality(result) + ')\)\.'
else:
return '\.'
except:
return 'error'
def get_bedroomCount(self, result):
try:
bedroomCount = str(result['property']['bedroomCount'])
if bedroomCount != 'None':
if bedroomCount == '1':
return ' 1 bedroom\.'
else:
return ' ' + bedroomCount + ' bedrooms\.'
else:
return ''
except:
return 'error'
def get_netHabitableSurface(self, result):
try:
netHabitableSurface = str(result['property']['netHabitableSurface'])
if netHabitableSurface != 'None':
return ' ' + netHabitableSurface + ' m²\.'
else:
return ''
except:
return 'error'
def get_price(self, result):
try:
price = str(result['price']['mainValue'])
if self.get_transaction_type(result) == ' for rent ':
return ' ' + price + ' euro per month\.'
else:
return ' ' + price + ' euro\.'
except:
return ''
def get_picture(self, result):
try:
picture_url = str(result['media']['pictures'][0]['largeUrl'])
if picture_url != 'None':
return picture_url
else:
return 'error'
except:
return 'error'
def get_pictures(self, result):
pictures = []
try:
picture_url = str(result['media']['pictures'][0]['largeUrl'])
picture_url = picture_url.split('_1.')[0]
for i in range (1, 10):
try:
# check if picture is not the same as the picture before (now sometimes it send media group with 10 same photos)
response = requests.get(picture_url + '_' + str(i) + '.jpg')
if response.status_code == 200:
photo = dict(type='photo', media=picture_url + '_' + str(i) + '.jpg')
pictures.append(photo)
else:
break
except:
pass
except:
return 'error'
return pictures
if __name__ == '__main__':
token = config['token']
chat_id = config['chat_id']
url = config['url']
scraper = ImmowebScraper(url)
#scraper.last_result = scraper.get_results_from_source(scraper.get_source_from_url())[0]['id']
for result in scraper.get_results_from_source(scraper.get_source_from_url()):
scraper.last_results.append(result['id'])
while True:
scraper.update()