From 9128e7e27f37b74a09c86f04372d5fe3afb986c6 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 18 Dec 2014 11:26:44 +0100 Subject: [PATCH 1/7] Adds Mongodb Backend with FIFO, LIFO, DFS and BFS algorithms. --- .gitignore | 4 + .../contrib/backends/mongo/__init__.py | 0 crawlfrontier/contrib/backends/mongo/mongo.py | 241 ++++++++++++++++++ requirements.txt | 5 + 4 files changed, 250 insertions(+) create mode 100644 crawlfrontier/contrib/backends/mongo/__init__.py create mode 100644 crawlfrontier/contrib/backends/mongo/mongo.py diff --git a/.gitignore b/.gitignore index db4561eaa..48593f41d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,7 @@ docs/_build/ # PyBuilder target/ + + +# IDE files +.idea \ No newline at end of file diff --git a/crawlfrontier/contrib/backends/mongo/__init__.py b/crawlfrontier/contrib/backends/mongo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/crawlfrontier/contrib/backends/mongo/mongo.py b/crawlfrontier/contrib/backends/mongo/mongo.py new file mode 100644 index 000000000..3a26e3b27 --- /dev/null +++ b/crawlfrontier/contrib/backends/mongo/mongo.py @@ -0,0 +1,241 @@ +import datetime +import random +import copy +from collections import OrderedDict +import string +from pymongo import MongoClient +from crawlfrontier import Backend, Page +import json +from crawlfrontier.contrib.middlewares.domain import Domain +from crawlfrontier.exceptions import NotConfigured + + +class MongodbBackend(Backend): + name = 'Mongodb Backend' + + def __init__(self, manager): + settings = manager.settings + mongo_ip = settings.get('BACKEND_MONGO_IP', None) + mongo_port = settings.get('BACKEND_MONGO_PORT', None) + if mongo_ip is None or mongo_port is None: + raise NotConfigured + self.client = MongoClient(mongo_ip, mongo_port) + self.database_name = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(10)) + mongo_db = settings.get('BACKEND_MONGO_DB_NAME', self.database_name) + mongo_collection = settings.get('BACKEND_MONGO_COLLECTION_NAME', 'links') + self.db = self.client[mongo_db] + self.collection = self.db[mongo_collection] + self.manager = manager + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + def add_seeds(self, links): + # Log + self.manager.logger.backend.debug('ADD_SEEDS n_links=%s' % len(links)) + + pages = [] + for link in links: + # Get timestamp + now = datetime.datetime.utcnow() + + # Get or create page from link + page, created = self._get_or_create_page_from_link(link, now) + + # Update add fields + page.n_adds += 1 + page.last_update = now + pages.append(page) + # self.collection.insert(json.loads(repr(page))) + + # Return updated pages + return pages + + def page_crawled(self, page, links): + # Log + self.manager.logger.backend.debug('PAGE_CRAWLED page=%s status=%s links=%s' % + (page, page.status, len(links))) + + # process page crawled + backend_page = self._page_crawled(page) + + # Update crawled fields + backend_page.n_crawls += 1 + backend_page.state = self.manager.page_model.State.CRAWLED + self.collection.update({'fingerprint':backend_page.fingerprint}, {"$set": json.loads(repr(backend_page))}, upsert=False) + + # Create links + for link in links: + self.manager.logger.backend.debug('ADD_LINK link=%s' % link) + link_page, link_created = self._get_or_create_page_from_link(link, datetime.datetime.utcnow()) + if link_created: + link_page.depth = page.depth+1 + self.collection.update({'fingerprint':link_page.fingerprint}, {"$set": json.loads(repr(link_page))}, upsert=False) + + # Return updated page + return backend_page + + def page_crawled_error(self, page, error): + # Log + self.manager.logger.backend.debug('PAGE_CRAWLED_ERROR page=%s error=%s' % (page, error)) + + # process page crawled + backend_page = self._page_crawled(page) + + # Update error fields + backend_page.n_errors += 1 + backend_page.state = self.manager.page_model.State.ERROR + self.collection.update({'fingerprint':backend_page.fingerprint}, {"$set": json.loads(repr(backend_page))}, upsert=False) + + # Return updated page + return backend_page + + def get_next_pages(self, max_next_pages): + # Log + self.manager.logger.backend.debug('GET_NEXT_PAGES max_next_pages=%s' % max_next_pages) + + now = datetime.datetime.utcnow() + # pages = [page for page in self.pages.values() if page.state == self.manager.page_model.State.NOT_CRAWLED] + + mongo_pages = self._get_sorted_pages(max_next_pages) + + pages = [] + for p in mongo_pages: + page = self._page_from_mongo_dict(p) + pages.append(page) + finished = False + # pages = self._sort_pages(pages) + if max_next_pages: + pages = pages[0:max_next_pages] + for page in pages: + page.state = self.manager.page_model.State.QUEUED + page.n_queued += 1 + page.last_update = now + self.collection.update({'fingerprint':page.fingerprint}, {"$set": json.loads(repr(page))}, upsert=False) + remaining = self.collection.find_one({'state': {'$ne': self.manager.page_model.State.CRAWLED}}) + if remaining is None: + self.manager._finished = True + return pages + + + def _page_crawled(self, page): + # Get timestamp + now = datetime.datetime.utcnow() + + # Get or create page from incoming page + backend_page, created = self._get_or_create_page_from_page(page, now) + + # Update creation fields + if created: + backend_page.created_at = now + + # Update fields + backend_page.last_update = now + backend_page.status = page.status + + return backend_page + + def _page_from_mongo_dict(self, mongo_dict): + page = self.manager.page_model(mongo_dict['url']) + page.status = mongo_dict['status'] + domain = Domain(mongo_dict['domain']['netloc'], mongo_dict['domain']['name'], mongo_dict['domain']['scheme'], mongo_dict['domain']['sld'], mongo_dict['domain']['tld'], mongo_dict['domain']['subdomain']) + page.domain = domain + page.n_crawls = mongo_dict['n_crawls'] + page.created_at = mongo_dict['created_at'] + page.state = mongo_dict['state'] + page.last_update = mongo_dict['last_update'] + page.depth = mongo_dict['depth'] + page.meta = mongo_dict['meta'] + page.n_errors = mongo_dict['n_errors'] + page.fingerprint = mongo_dict['fingerprint'] + page.n_queued = mongo_dict['n_queued'] + page.n_adds = mongo_dict['n_adds'] + return page + + + def _get_or_create_page_from_link(self, link, now): + fingerprint = link.fingerprint + existing_page = self.collection.find_one({'fingerprint': fingerprint}) + if existing_page is None: + new_page = self.manager.page_model.from_link(link) + # self.pages[fingerprint] = new_page + new_page.created_at = now + self.collection.insert(json.loads(repr(new_page))) + self.manager.logger.backend.debug('Creating page %s from link %s' % (new_page, link)) + return new_page, True + else: + page = self._page_from_mongo_dict(existing_page) + # page = self.pages[fingerprint] + self.manager.logger.backend.debug('Page %s exists' % page) + return page, False + + def _get_or_create_page_from_page(self, page, now): + fingerprint = page.fingerprint + existing_page = self.collection.find_one({'fingerprint': fingerprint}) + if existing_page is None: + new_page = copy.deepcopy(page) + # self.pages[fingerprint] = new_page + new_page.created_at = now + self.collection.insert(json.loads(repr(new_page))) + self.manager.logger.backend.debug('Creating page %s from page %s' % (new_page, page)) + return new_page, True + else: + page = self._page_from_mongo_dict(existing_page) + self.manager.logger.backend.debug('Page %s exists' % page) + # page = self.pages[fingerprint] + return page, False + + def _get_sorted_pages(self, max_pages): + raise NotImplementedError + + + def frontier_stop(self): + if self.manager.settings.get('BACKEND_MONGO_PERSIST_INFO', True) is False: + self.client.drop_database(self.database_name) + self.client.close() + + +class MongodbFIFOBackend(MongodbBackend): + name = 'FIFO Mongodb Backend' + + def _get_sorted_pages(self, max_pages): + mongoPages = self.collection.find({'state': self.manager.page_model.State.NOT_CRAWLED}).limit(max_pages) + return mongoPages + + + +class MongodbLIFOBackend(MongodbBackend): + name = 'LIFO Mongodb Backend' + + def _get_sorted_pages(self, max_pages): + mongoPages = self.collection.find({'state': self.manager.page_model.State.NOT_CRAWLED}).sort('_id', -1).limit(max_pages) + return mongoPages + + + +class MongodbDFSBackend(MongodbBackend): + name = 'DFS Mongodb Backend' + + def _get_sorted_pages(self, max_pages): + mongoPages = self.collection.find({'state': self.manager.page_model.State.NOT_CRAWLED}).sort('depth', -1).limit(max_pages) + return mongoPages + + + + +class MongodbBFSBackend(MongodbBackend): + name = 'BFS Mongodb Backend' + + def _get_sorted_pages(self, max_pages): + mongoPages = self.collection.find({'state': self.manager.page_model.State.NOT_CRAWLED}).sort('depth', 1).limit(max_pages) + return mongoPages + + + + +BASE = MongodbBackend +FIFO = MongodbFIFOBackend +LIFO = MongodbLIFOBackend +DFS = MongodbDFSBackend +BFS = MongodbBFSBackend diff --git a/requirements.txt b/requirements.txt index be7958899..0c322639a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,8 @@ pyparsing==1.5.7 --allow-unverified pydot pydot==1.0.28 + +#------------------------------- +# mongodb backend +#------------------------------- +pymongo==2.7.2 \ No newline at end of file From d7de1b5f41ddc6e576689c42e917247d5342e314 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 18 Dec 2014 17:45:21 +0100 Subject: [PATCH 2/7] Fix request_error method. --- crawlfrontier/contrib/backends/mongo.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crawlfrontier/contrib/backends/mongo.py b/crawlfrontier/contrib/backends/mongo.py index 4931aba3f..f575003c1 100644 --- a/crawlfrontier/contrib/backends/mongo.py +++ b/crawlfrontier/contrib/backends/mongo.py @@ -77,8 +77,8 @@ def request_error(self, page, error): # Update error fields backend_page.state = self.State.ERROR - self.collection.update({'fingerprint': backend_page.fingerprint}, {"$set": json.loads(repr(backend_page))}, - upsert=False) + self.collection.update({'_meta.fingerprintfingerprint': backend_page._meta['fingerprint']}, + {"$set": self._request_to_mongo(link_page)}, upsert=False) # Return updated page return backend_page From 0fa45d0ac68f1c519b454fc5227ef03833694493 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 8 Jan 2015 17:28:21 +0100 Subject: [PATCH 3/7] Deploy commit. --- crawlfrontier/contrib/backends/mongo.py | 4 +- .../contrib/middlewares/upload_graph.py | 46 +++++++++++++++++++ .../contrib/scrapy/middlewares/recording.py | 3 ++ 3 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 crawlfrontier/contrib/middlewares/upload_graph.py diff --git a/crawlfrontier/contrib/backends/mongo.py b/crawlfrontier/contrib/backends/mongo.py index f575003c1..6472292be 100644 --- a/crawlfrontier/contrib/backends/mongo.py +++ b/crawlfrontier/contrib/backends/mongo.py @@ -77,8 +77,8 @@ def request_error(self, page, error): # Update error fields backend_page.state = self.State.ERROR - self.collection.update({'_meta.fingerprintfingerprint': backend_page._meta['fingerprint']}, - {"$set": self._request_to_mongo(link_page)}, upsert=False) + self.collection.update({'_meta.fingerprint': backend_page._meta['fingerprint']}, + {"$set": self._request_to_mongo(backend_page)}, upsert=False) # Return updated page return backend_page diff --git a/crawlfrontier/contrib/middlewares/upload_graph.py b/crawlfrontier/contrib/middlewares/upload_graph.py new file mode 100644 index 000000000..4adb3c8ee --- /dev/null +++ b/crawlfrontier/contrib/middlewares/upload_graph.py @@ -0,0 +1,46 @@ +import re + +from crawlfrontier.core.components import Middleware +from crawlfrontier import graphs + +class UploadGraphMiddleware(Middleware): + + component_name = 'Upload Graph Middleware' + + def __init__(self, manager): + self.manager = manager + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + + def frontier_start(self): + pass + + + def frontier_stop(self): + print '**************************************************************' + print '**************************************************************' + print '**************************************************************' + print '**************************************************************' + print '**************************************************************' + print '**************************************************************' + g = graphs.Manager(engine='sqlite:///my_record.db') + g.render(filename='graph.png', label='A simple Graph', fontsize=15, node_fontsize=8, node_height=2, node_width=2, node_fixedsize=False) + pass + + + + def add_seeds(self, seeds): + print 1 + return seeds + + def page_crawled(self, response, links): + print 2 + return response + + def request_error(self, request, error): + print 3 + return request + diff --git a/crawlfrontier/contrib/scrapy/middlewares/recording.py b/crawlfrontier/contrib/scrapy/middlewares/recording.py index 33c110696..19fa222a5 100644 --- a/crawlfrontier/contrib/scrapy/middlewares/recording.py +++ b/crawlfrontier/contrib/scrapy/middlewares/recording.py @@ -63,6 +63,9 @@ def spider_closed(self, spider, reason): self.graph.session.query(graphs.Page).filter_by(status=None).delete() self.graph.save() + g = graphs.Manager(engine='sqlite:///my_record.db') + g.render(filename='graphs/'+spider.domain.replace(':', '_').replace('.', '_').replace('/', '_')+'.png', label=spider.domain, fontsize=15, node_fontsize=8, node_height=2, node_width=2, node_fixedsize=False) + def _get_url(self, response): return response.meta['redirect_urls'][0] if 'redirect_urls' in response.meta else response.request.url From 7e339287c4e5b5040a4e36b2ac30efd966676bc6 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 15 Jan 2015 01:22:09 +0100 Subject: [PATCH 4/7] Add stopwords filters --- .../FilterHardStopwordsMiddleware.py | 62 +++++++++++++++++++ .../FilterSoftStopwordsMiddleware.py | 57 +++++++++++++++++ .../contrib/scrapy/middlewares/frontier.py | 2 +- .../contrib/scrapy/middlewares/recording.py | 5 +- crawlfrontier/core/manager.py | 1 + 5 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 crawlfrontier/contrib/middlewares/FilterHardStopwordsMiddleware.py create mode 100644 crawlfrontier/contrib/middlewares/FilterSoftStopwordsMiddleware.py diff --git a/crawlfrontier/contrib/middlewares/FilterHardStopwordsMiddleware.py b/crawlfrontier/contrib/middlewares/FilterHardStopwordsMiddleware.py new file mode 100644 index 000000000..2564d18e9 --- /dev/null +++ b/crawlfrontier/contrib/middlewares/FilterHardStopwordsMiddleware.py @@ -0,0 +1,62 @@ +import re +from scrapy import signals +from scrapy.http import Request +from scrapy.utils.httpobj import urlparse_cached +from scrapy import log +from aluanabot.settings import * +from aluanabot.classes.Log.DomainLog import DomainLog +from aluanabot.classes.StopwordsFilter.HardStopwordsFilter import HardStopwordsFilter +from crawlfrontier import Middleware +from crawlfrontier.exceptions import NotConfigured +from crawlfrontier.utils.misc import load_object +from crawlfrontier.utils.url import canonicalize_url + +class FilterHardStopwordsMiddleware(Middleware): + + HARD_STOP_LIST = [] + fingerprint_function_name = '' + + def __init__(self, manager): + self.manager = manager + self.stop_words_filter = HardStopwordsFilter(HARD_STOPWORDS_FILTER) + + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + + def frontier_start(self): + pass + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + return seeds + + def request_error(self, request, error): + pass + + + def page_crawled(self, response, links): + for link in links: + fingerprint = link.meta['fingerprint'] + if fingerprint not in self.manager.backend.requests: + should, token, term, u = self.should_follow(link) + if not should: + self.manager.backend.requests[link.meta['fingerprint']] = link + log.msg(message='Filtered hard stopword request to '+str(link.url), + level=log.DEBUG, domain=str(response.meta['domain']['netloc']), url=str(link.url), + module='hard_stopwords', token=token, term=term, filter_url=u, filtered=True) + return response + + + + def should_follow(self, request): + result, token, term, u = self.stop_words_filter.url_matched(request) + if result == True: + return False, token, term, u + else: + return True, token, term, u + diff --git a/crawlfrontier/contrib/middlewares/FilterSoftStopwordsMiddleware.py b/crawlfrontier/contrib/middlewares/FilterSoftStopwordsMiddleware.py new file mode 100644 index 000000000..6444ee5a9 --- /dev/null +++ b/crawlfrontier/contrib/middlewares/FilterSoftStopwordsMiddleware.py @@ -0,0 +1,57 @@ +import re +from scrapy import signals +from scrapy.http import Request +from scrapy.utils.httpobj import urlparse_cached +from scrapy import log +from aluanabot.classes.Log.DomainLog import DomainLog +from aluanabot.classes.StopwordsFilter.SoftStopwordsFilter import SoftStopwordsFilter +from aluanabot.settings import * +from crawlfrontier import Middleware + + +class FilterSoftStopwordsMiddleware(Middleware): + + SOFT_WHITE_STOP_LIST = [] + SOFT_BLACK_STOP_LIST = [] + + def __init__(self, manager): + self.manager = manager + self.stop_words_filter = SoftStopwordsFilter(soft_white=SOFT_STOPWORDS_FILTER_WHITE, soft_black=SOFT_STOPWORDS_FILTER_BLACK) + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + + def frontier_start(self): + pass + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + return seeds + + def request_error(self, request, error): + pass + + def page_crawled(self, response, links): + for link in links: + fingerprint = link.meta['fingerprint'] + if fingerprint not in self.manager.backend.requests: + should = self.should_follow(link) + if not should: + self.manager.backend.requests[link.meta['fingerprint']] = link + log.msg(message='Filtered soft stopword request to '+str(link.url), + level=log.DEBUG, domain=str(response.meta['domain']['netloc']), url=str(link.url), + module='soft_stopwords', filtered=True) + + return response + + def should_follow(self, request): + result = self.stop_words_filter.url_matched(request.url) + if result: + return False + else: + return True + diff --git a/crawlfrontier/contrib/scrapy/middlewares/frontier.py b/crawlfrontier/contrib/scrapy/middlewares/frontier.py index fad71d20a..448f7c4ef 100644 --- a/crawlfrontier/contrib/scrapy/middlewares/frontier.py +++ b/crawlfrontier/contrib/scrapy/middlewares/frontier.py @@ -43,7 +43,7 @@ def __init__(self, crawler, stats): self.crawler.signals.connect(self.spider_opened, signals.spider_opened) self.crawler.signals.connect(self.spider_closed, signals.spider_closed) #self.crawler.signals.connect(self.response_received, signals.response_received) - self.crawler.signals.connect(self.spider_idle, signals.spider_idle) + # self.crawler.signals.connect(self.spider_idle, signals.spider_idle) self.crawler.signals.connect(self.download_error, frontier_download_error) @classmethod diff --git a/crawlfrontier/contrib/scrapy/middlewares/recording.py b/crawlfrontier/contrib/scrapy/middlewares/recording.py index 19fa222a5..761e0801f 100644 --- a/crawlfrontier/contrib/scrapy/middlewares/recording.py +++ b/crawlfrontier/contrib/scrapy/middlewares/recording.py @@ -54,6 +54,7 @@ def process_spider_output(self, response, result, spider): link = self.graph.add_link(page=page, url=request.url) request.meta['page'] = link request.meta['referer'] = page + request.meta['anchor'] = '' yield request def spider_closed(self, spider, reason): @@ -63,8 +64,8 @@ def spider_closed(self, spider, reason): self.graph.session.query(graphs.Page).filter_by(status=None).delete() self.graph.save() - g = graphs.Manager(engine='sqlite:///my_record.db') - g.render(filename='graphs/'+spider.domain.replace(':', '_').replace('.', '_').replace('/', '_')+'.png', label=spider.domain, fontsize=15, node_fontsize=8, node_height=2, node_width=2, node_fixedsize=False) + # g = graphs.Manager(engine='sqlite:///my_record.db') + # g.render(filename='/information-retrieval-bot/aluanabot/graphs/'+spider.domain.replace(':', '_').replace('.', '_').replace('/', '_')+'.png', label=spider.domain, fontsize=15, node_fontsize=8, node_height=2, node_width=2, node_fixedsize=False) def _get_url(self, response): return response.meta['redirect_urls'][0] if 'redirect_urls' in response.meta else response.request.url diff --git a/crawlfrontier/core/manager.py b/crawlfrontier/core/manager.py index 134d977a2..29157e5f4 100644 --- a/crawlfrontier/core/manager.py +++ b/crawlfrontier/core/manager.py @@ -123,6 +123,7 @@ def from_settings(cls, settings=None): :class:`Settings ` object instance. If no settings is given, :ref:`frontier default settings ` are used. """ + manager_settings = Settings(settings) return FrontierManager(request_model=manager_settings.REQUEST_MODEL, response_model=manager_settings.RESPONSE_MODEL, From 566b3bd6296882c3a66921e152ffcb88b97243c6 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 15 Jan 2015 12:33:10 +0100 Subject: [PATCH 5/7] Add referer middleware. --- crawlfrontier/contrib/middlewares/referer.py | 35 ++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 crawlfrontier/contrib/middlewares/referer.py diff --git a/crawlfrontier/contrib/middlewares/referer.py b/crawlfrontier/contrib/middlewares/referer.py new file mode 100644 index 000000000..df6faf4c1 --- /dev/null +++ b/crawlfrontier/contrib/middlewares/referer.py @@ -0,0 +1,35 @@ +""" +RefererMiddleware: populates Request referer field, based on the Response which +originated it. +""" + +from scrapy.http import Request +from scrapy.exceptions import NotConfigured +from crawlfrontier import Middleware + + +class RefererMiddleware(Middleware): + + def __init__(self, manager): + self.manager = manager + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + def frontier_start(self): + pass + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + return seeds + + def page_crawled(self, response, links): + for link in links: + link.headers.setdefault('Referer', response.url) + return response + + def request_error(self, request, error): + return request \ No newline at end of file From 237ccc754d05bc482f37d876f9e0e0262491a003 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 15 Jan 2015 19:20:31 +0100 Subject: [PATCH 6/7] Improves closing behaviour. --- crawlfrontier/contrib/backends/memory.py | 6 +++++- .../contrib/scrapy/middlewares/frontier.py | 13 ++++++++++--- crawlfrontier/core/manager.py | 1 + 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/crawlfrontier/contrib/backends/memory.py b/crawlfrontier/contrib/backends/memory.py index 4f96460e9..623cedf2a 100644 --- a/crawlfrontier/contrib/backends/memory.py +++ b/crawlfrontier/contrib/backends/memory.py @@ -33,7 +33,8 @@ def add_seeds(self, seeds): self.heap.push(request) def get_next_requests(self, max_next_requests): - return self.heap.pop(max_next_requests) + things = self.heap.pop(max_next_requests) + return things def page_crawled(self, response, links): for link in links: @@ -42,17 +43,20 @@ def page_crawled(self, response, links): request.meta['depth'] = response.request.meta['depth']+1 self.heap.push(request) + def request_error(self, request, error): pass def _get_or_create_request(self, request): fingerprint = request.meta['fingerprint'] if fingerprint not in self.requests: + new_request = request.copy() new_request.meta['created_at'] = datetime.datetime.utcnow() new_request.meta['depth'] = 0 self.requests[fingerprint] = new_request self.manager.logger.backend.debug('Creating request %s' % new_request) + return new_request, True else: page = self.requests[fingerprint] diff --git a/crawlfrontier/contrib/scrapy/middlewares/frontier.py b/crawlfrontier/contrib/scrapy/middlewares/frontier.py index 448f7c4ef..7a608becd 100644 --- a/crawlfrontier/contrib/scrapy/middlewares/frontier.py +++ b/crawlfrontier/contrib/scrapy/middlewares/frontier.py @@ -1,6 +1,6 @@ from twisted.internet.error import DNSLookupError, TimeoutError from twisted.internet.task import LoopingCall -from scrapy.exceptions import NotConfigured, DontCloseSpider +from scrapy.exceptions import NotConfigured, DontCloseSpider, CloseSpider from scrapy.http import Request from scrapy import signals @@ -18,6 +18,7 @@ class CrawlFrontierSpiderMiddleware(object): def __init__(self, crawler, stats): + self.idle_counter = 0 self.crawler = crawler self.stats = stats @@ -43,7 +44,7 @@ def __init__(self, crawler, stats): self.crawler.signals.connect(self.spider_opened, signals.spider_opened) self.crawler.signals.connect(self.spider_closed, signals.spider_closed) #self.crawler.signals.connect(self.response_received, signals.response_received) - # self.crawler.signals.connect(self.spider_idle, signals.spider_idle) + self.crawler.signals.connect(self.spider_idle, signals.spider_idle) self.crawler.signals.connect(self.download_error, frontier_download_error) @classmethod @@ -75,6 +76,7 @@ def process_spider_output(self, response, result, spider): self.frontier.page_crawled(scrapy_response=response, scrapy_links=links) self._remove_queued_request(response.request) + self.idle_counter = 0 def download_error(self, request, exception, spider): # TODO: Add more errors... @@ -87,14 +89,19 @@ def download_error(self, request, exception, spider): self._remove_queued_request(request) def spider_idle(self, spider): - if not self.frontier.manager.finished: + if self.idle_counter > 3: + raise CloseSpider() + elif not self.frontier.manager.finished: + self.idle_counter += 1 raise DontCloseSpider() + def _schedule_next_requests(self, spider): n_scheduled = len(self.queued_requests) if not self.frontier.manager.finished and n_scheduled < self.scheduler_concurrent_requests: n_requests_gap = self.scheduler_concurrent_requests - n_scheduled next_pages = self._get_next_requests(n_requests_gap) + for request in next_pages: self.crawler.engine.crawl(request, spider) diff --git a/crawlfrontier/core/manager.py b/crawlfrontier/core/manager.py index 29157e5f4..e22b95bb0 100644 --- a/crawlfrontier/core/manager.py +++ b/crawlfrontier/core/manager.py @@ -352,6 +352,7 @@ def page_crawled(self, response, links=None): :return: None. """ + self._check_startstop() self.logger.manager.debug(self._msg('PAGE_CRAWLED url=%s status=%s links=%s' % (response.url, response.status_code, len(links) if links else 0))) From d092bb17eab50dc483882b86a89a5bfd9e371325 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Date: Thu, 22 Jan 2015 17:48:43 +0100 Subject: [PATCH 7/7] Add seeds logic. --- crawlfrontier/contrib/backends/memory.py | 1 + .../middlewares/AluanaFilterOffsite.py | 75 +++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 crawlfrontier/contrib/middlewares/AluanaFilterOffsite.py diff --git a/crawlfrontier/contrib/backends/memory.py b/crawlfrontier/contrib/backends/memory.py index 623cedf2a..97e540cd1 100644 --- a/crawlfrontier/contrib/backends/memory.py +++ b/crawlfrontier/contrib/backends/memory.py @@ -30,6 +30,7 @@ def frontier_stop(self): def add_seeds(self, seeds): for seed in seeds: request, _ = self._get_or_create_request(seed) + request.isSeed = True self.heap.push(request) def get_next_requests(self, max_next_requests): diff --git a/crawlfrontier/contrib/middlewares/AluanaFilterOffsite.py b/crawlfrontier/contrib/middlewares/AluanaFilterOffsite.py new file mode 100644 index 000000000..34d3cb1f2 --- /dev/null +++ b/crawlfrontier/contrib/middlewares/AluanaFilterOffsite.py @@ -0,0 +1,75 @@ +""" +Offsite Spider Middleware + +See documentation in docs/topics/spider-middleware.rst +""" + +import re + +from scrapy import signals +from scrapy.http import Request +from scrapy.utils.httpobj import urlparse_cached +from scrapy import log +from crawlfrontier import Middleware +import tldextract + + +class OffsiteMiddleware(Middleware): + + def __init__(self, manager): + self.manager = manager + self.allowed_domains = ['amazonaws.com'] + self.host_regex = self.get_host_regex() + self.domains_seen = set() + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + + def frontier_start(self): + pass + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + return seeds + + def request_error(self, request, error): + pass + + + def page_crawled(self, response, links): + try: + if response.request.isSeed == True: + self.allowed_domains.append(response.request.meta['domain']['netloc']) + self.host_regex = self.get_host_regex() + except Exception as e: + pass + for link in links: + fingerprint = link.meta['fingerprint'] + if fingerprint not in self.manager.backend.requests: + should = self.should_follow(link) + if not should: + self.manager.backend.requests[link.meta['fingerprint']] = link + # log.msg(message='Filtered offsite '+str(link.url), + # level=log.DEBUG, domain=str(response.meta['domain']['netloc']), url=str(link.url), + # module='offsite_filter', filtered=True) + return response + + + def should_follow(self, request): + regex = self.host_regex + # hostname can be None for wrong urls (like javascript links) + host = urlparse_cached(request).hostname or '' + return bool(regex.search(host)) + + def get_host_regex(self): + """Override this method to implement a different offsite policy""" + allowed_domains = self.allowed_domains + if not allowed_domains: + return re.compile('') # allow all by default + regex = r'^(.*\.)?(%s)$' % '|'.join(re.escape(d) for d in allowed_domains if d is not None) + return re.compile(regex) +