-
Notifications
You must be signed in to change notification settings - Fork 216
Mongodb backend #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
9128e7e
Adds Mongodb Backend with FIFO, LIFO, DFS and BFS algorithms.
bbotella 0b07766
Merge branch 'mongodb_backend' into development
bbotella d7de1b5
Fix request_error method.
bbotella 0fa45d0
Deploy commit.
bbotella 7e33928
Add stopwords filters
bbotella 566b3bd
Add referer middleware.
bbotella 237ccc7
Improves closing behaviour.
bbotella d092bb1
Add seeds logic.
bbotella File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,3 +52,7 @@ docs/_build/ | |
|
|
||
| # PyBuilder | ||
| target/ | ||
|
|
||
|
|
||
| # IDE files | ||
| .idea | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| import datetime | ||
| import random | ||
| import copy | ||
| from collections import OrderedDict | ||
| import string | ||
| from bson import json_util | ||
| from pymongo import MongoClient | ||
| from crawlfrontier import Backend, Request | ||
| import json | ||
| from crawlfrontier.exceptions import NotConfigured | ||
|
|
||
|
|
||
| class MongodbBackend(Backend): | ||
| name = 'Mongodb Backend' | ||
|
|
||
| class State: | ||
| NOT_CRAWLED = 'NOT CRAWLED' | ||
| QUEUED = 'QUEUED' | ||
| CRAWLED = 'CRAWLED' | ||
| ERROR = 'ERROR' | ||
|
|
||
| def __init__(self, manager): | ||
| settings = manager.settings | ||
| mongo_ip = settings.get('BACKEND_MONGO_IP', None) | ||
| mongo_port = settings.get('BACKEND_MONGO_PORT', None) | ||
| if mongo_ip is None or mongo_port is None: | ||
| raise NotConfigured | ||
| self.client = MongoClient(mongo_ip, mongo_port) | ||
| self.database_name = ''.join( | ||
| random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(10)) | ||
| mongo_db = settings.get('BACKEND_MONGO_DB_NAME', self.database_name) | ||
| mongo_collection = settings.get('BACKEND_MONGO_COLLECTION_NAME', 'links') | ||
| self.db = self.client[mongo_db] | ||
| self.collection = self.db[mongo_collection] | ||
| self.manager = manager | ||
|
|
||
| @classmethod | ||
| def from_manager(cls, manager): | ||
| return cls(manager) | ||
|
|
||
| def add_seeds(self, seeds): | ||
| # Log | ||
| self.manager.logger.backend.debug('ADD_SEEDS n_links=%s' % len(seeds)) | ||
|
|
||
| for seed in seeds: | ||
| # Get or create page from link | ||
| request, _ = self._get_or_create_request(seed) | ||
|
|
||
| def page_crawled(self, response, links): | ||
| # Log | ||
| self.manager.logger.backend.debug('PAGE_CRAWLED page=%s status=%s links=%s' % | ||
| (response, response.status_code, len(links))) | ||
|
|
||
| # process page crawled | ||
| backend_page = self._page_crawled(response) | ||
|
|
||
| # Update crawled fields | ||
| backend_page.state = self.State.CRAWLED | ||
| self.collection.update({'_meta.fingerprint': backend_page._meta['fingerprint']}, { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be it makes sense also to make a key customizable by subclasses? |
||
| "$set": self._request_to_mongo(backend_page)}, upsert=False) | ||
|
|
||
| # Create links | ||
| for link in links: | ||
| self.manager.logger.backend.debug('ADD_LINK link=%s' % link) | ||
| link_page, link_created = self._get_or_create_request(link) | ||
| if link_created: | ||
| link_page._meta['depth'] = response.meta['depth'] + 1 | ||
| self.collection.update({'_meta.fingerprint': link_page._meta['fingerprint']}, { | ||
| "$set": self._request_to_mongo(link_page)}, upsert=False) | ||
|
|
||
| def request_error(self, page, error): | ||
| # Log | ||
| self.manager.logger.backend.debug('PAGE_CRAWLED_ERROR page=%s error=%s' % (page, error)) | ||
|
|
||
| # process page crawled | ||
| backend_page = self._page_crawled(page) | ||
|
|
||
| # Update error fields | ||
| backend_page.state = self.State.ERROR | ||
| self.collection.update({'_meta.fingerprint': backend_page._meta['fingerprint']}, | ||
| {"$set": self._request_to_mongo(backend_page)}, upsert=False) | ||
|
|
||
| # Return updated page | ||
| return backend_page | ||
|
|
||
| def get_next_requests(self, max_next_pages): | ||
| # Log | ||
| self.manager.logger.backend.debug('GET_NEXT_PAGES max_next_pages=%s' % max_next_pages) | ||
| now = datetime.datetime.utcnow() | ||
| mongo_pages = self._get_sorted_pages(max_next_pages) | ||
| requests = [] | ||
| for p in mongo_pages: | ||
| req = self._request_from_mongo_dict(p) | ||
| requests.append(req) | ||
|
|
||
| if max_next_pages: | ||
| requests = requests[0:max_next_pages] | ||
| for req in requests: | ||
| req.state = self.State.QUEUED | ||
| req.last_update = now | ||
| self.collection.update({'_meta.fingerprint': req._meta['fingerprint']}, { | ||
| "$set": self._request_to_mongo(req)}, upsert=False) | ||
| remaining = self.collection.find_one({'state': {'$ne': self.State.CRAWLED}}) | ||
| if remaining is None: | ||
| self.manager._finished = True | ||
| return requests | ||
|
|
||
| def _page_crawled(self, response): | ||
| # Get timestamp | ||
| now = datetime.datetime.utcnow() | ||
|
|
||
| # Get or create page from incoming page | ||
| backend_page, created = self._get_or_create_request(response) | ||
|
|
||
| # Update creation fields | ||
| if created: | ||
| backend_page.created_at = now | ||
|
|
||
| # Update fields | ||
| backend_page.last_update = now | ||
| backend_page.status = response.status_code | ||
| return backend_page | ||
|
|
||
| def _request_to_mongo(self, request): | ||
| return json.loads(json.dumps(vars(request), sort_keys=True, indent=4, default=json_util.default), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way how to make it computationally cheaper? |
||
| object_hook=json_util.object_hook) | ||
|
|
||
| def _request_from_mongo_dict(self, mongo_dict): | ||
| url = mongo_dict['_url'] | ||
| method = mongo_dict['_method'] | ||
| headers = mongo_dict['_headers'] | ||
| cookies = mongo_dict['_cookies'] | ||
| meta = mongo_dict['_meta'] | ||
| request = Request(url, method, headers, cookies, meta) | ||
| return request | ||
|
|
||
| def _get_or_create_request(self, request): | ||
| fingerprint = request.meta['fingerprint'] | ||
| existing_request = self.collection.find_one({'_meta.fingerprint': fingerprint}) | ||
| if existing_request is None: | ||
| new_request = request.copy() | ||
| new_request.meta['created_at'] = datetime.datetime.utcnow() | ||
| new_request.meta['depth'] = 0 | ||
| new_request.state = self.State.NOT_CRAWLED | ||
| self.collection.insert( | ||
| json.loads(json.dumps(vars(new_request), sort_keys=True, indent=4, default=json_util.default), | ||
| object_hook=json_util.object_hook)) | ||
| self.manager.logger.backend.debug('Creating request %s' % new_request) | ||
| return new_request, True | ||
| else: | ||
| request = self._request_from_mongo_dict(existing_request) | ||
| self.manager.logger.backend.debug('Request exists %s' % request) | ||
| return request, False | ||
|
|
||
| def _get_sorted_pages(self, max_pages): | ||
| raise NotImplementedError | ||
|
|
||
| def frontier_start(self): | ||
| pass | ||
|
|
||
| def frontier_stop(self): | ||
| if self.manager.settings.get('BACKEND_MONGO_PERSIST_INFO', True) is False: | ||
| self.client.drop_database(self.database_name) | ||
| self.client.close() | ||
|
|
||
|
|
||
| class MongodbFIFOBackend(MongodbBackend): | ||
| name = 'FIFO Mongodb Backend' | ||
|
|
||
| def _get_sorted_pages(self, max_pages): | ||
| mongo_requests = self.collection.find({'state': self.State.NOT_CRAWLED}).limit(max_pages) | ||
| return mongo_requests | ||
|
|
||
|
|
||
| class MongodbLIFOBackend(MongodbBackend): | ||
| name = 'LIFO Mongodb Backend' | ||
|
|
||
| def _get_sorted_pages(self, max_pages): | ||
| mongo_requests = self.collection.find({'state': self.State.NOT_CRAWLED}).sort('_id', -1).limit(max_pages) | ||
| return mongo_requests | ||
|
|
||
|
|
||
| class MongodbDFSBackend(MongodbBackend): | ||
| name = 'DFS Mongodb Backend' | ||
|
|
||
| def _get_sorted_pages(self, max_pages): | ||
| mongo_requests = self.collection.find({'state': self.State.NOT_CRAWLED}).sort('depth', -1).limit(max_pages) | ||
| return mongo_requests | ||
|
|
||
|
|
||
| class MongodbBFSBackend(MongodbBackend): | ||
| name = 'BFS Mongodb Backend' | ||
|
|
||
| def _get_sorted_pages(self, max_pages): | ||
| mongo_requests = self.collection.find({'state': self.State.NOT_CRAWLED}).sort('depth', 1).limit(max_pages) | ||
| return mongo_requests | ||
|
|
||
|
|
||
| BASE = MongodbBackend | ||
| FIFO = MongodbFIFOBackend | ||
| LIFO = MongodbLIFOBackend | ||
| DFS = MongodbDFSBackend | ||
| BFS = MongodbBFSBackend | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| """ | ||
| Offsite Spider Middleware | ||
|
|
||
| See documentation in docs/topics/spider-middleware.rst | ||
| """ | ||
|
|
||
| import re | ||
|
|
||
| from scrapy import signals | ||
| from scrapy.http import Request | ||
| from scrapy.utils.httpobj import urlparse_cached | ||
| from scrapy import log | ||
| from crawlfrontier import Middleware | ||
| import tldextract | ||
|
|
||
|
|
||
| class OffsiteMiddleware(Middleware): | ||
|
|
||
| def __init__(self, manager): | ||
| self.manager = manager | ||
| self.allowed_domains = ['amazonaws.com'] | ||
| self.host_regex = self.get_host_regex() | ||
| self.domains_seen = set() | ||
|
|
||
| @classmethod | ||
| def from_manager(cls, manager): | ||
| return cls(manager) | ||
|
|
||
|
|
||
| def frontier_start(self): | ||
| pass | ||
|
|
||
| def frontier_stop(self): | ||
| pass | ||
|
|
||
| def add_seeds(self, seeds): | ||
| return seeds | ||
|
|
||
| def request_error(self, request, error): | ||
| pass | ||
|
|
||
|
|
||
| def page_crawled(self, response, links): | ||
| try: | ||
| if response.request.isSeed == True: | ||
| self.allowed_domains.append(response.request.meta['domain']['netloc']) | ||
| self.host_regex = self.get_host_regex() | ||
| except Exception as e: | ||
| pass | ||
| for link in links: | ||
| fingerprint = link.meta['fingerprint'] | ||
| if fingerprint not in self.manager.backend.requests: | ||
| should = self.should_follow(link) | ||
| if not should: | ||
| self.manager.backend.requests[link.meta['fingerprint']] = link | ||
| # log.msg(message='Filtered offsite '+str(link.url), | ||
| # level=log.DEBUG, domain=str(response.meta['domain']['netloc']), url=str(link.url), | ||
| # module='offsite_filter', filtered=True) | ||
| return response | ||
|
|
||
|
|
||
| def should_follow(self, request): | ||
| regex = self.host_regex | ||
| # hostname can be None for wrong urls (like javascript links) | ||
| host = urlparse_cached(request).hostname or '' | ||
| return bool(regex.search(host)) | ||
|
|
||
| def get_host_regex(self): | ||
| """Override this method to implement a different offsite policy""" | ||
| allowed_domains = self.allowed_domains | ||
| if not allowed_domains: | ||
| return re.compile('') # allow all by default | ||
| regex = r'^(.*\.)?(%s)$' % '|'.join(re.escape(d) for d in allowed_domains if d is not None) | ||
| return re.compile(regex) | ||
|
|
62 changes: 62 additions & 0 deletions
62
crawlfrontier/contrib/middlewares/FilterHardStopwordsMiddleware.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| import re | ||
| from scrapy import signals | ||
| from scrapy.http import Request | ||
| from scrapy.utils.httpobj import urlparse_cached | ||
| from scrapy import log | ||
| from aluanabot.settings import * | ||
| from aluanabot.classes.Log.DomainLog import DomainLog | ||
| from aluanabot.classes.StopwordsFilter.HardStopwordsFilter import HardStopwordsFilter | ||
| from crawlfrontier import Middleware | ||
| from crawlfrontier.exceptions import NotConfigured | ||
| from crawlfrontier.utils.misc import load_object | ||
| from crawlfrontier.utils.url import canonicalize_url | ||
|
|
||
| class FilterHardStopwordsMiddleware(Middleware): | ||
|
|
||
| HARD_STOP_LIST = [] | ||
| fingerprint_function_name = '' | ||
|
|
||
| def __init__(self, manager): | ||
| self.manager = manager | ||
| self.stop_words_filter = HardStopwordsFilter(HARD_STOPWORDS_FILTER) | ||
|
|
||
|
|
||
| @classmethod | ||
| def from_manager(cls, manager): | ||
| return cls(manager) | ||
|
|
||
|
|
||
| def frontier_start(self): | ||
| pass | ||
|
|
||
| def frontier_stop(self): | ||
| pass | ||
|
|
||
| def add_seeds(self, seeds): | ||
| return seeds | ||
|
|
||
| def request_error(self, request, error): | ||
| pass | ||
|
|
||
|
|
||
| def page_crawled(self, response, links): | ||
| for link in links: | ||
| fingerprint = link.meta['fingerprint'] | ||
| if fingerprint not in self.manager.backend.requests: | ||
| should, token, term, u = self.should_follow(link) | ||
| if not should: | ||
| self.manager.backend.requests[link.meta['fingerprint']] = link | ||
| log.msg(message='Filtered hard stopword request to '+str(link.url), | ||
| level=log.DEBUG, domain=str(response.meta['domain']['netloc']), url=str(link.url), | ||
| module='hard_stopwords', token=token, term=term, filter_url=u, filtered=True) | ||
| return response | ||
|
|
||
|
|
||
|
|
||
| def should_follow(self, request): | ||
| result, token, term, u = self.stop_words_filter.url_matched(request) | ||
| if result == True: | ||
| return False, token, term, u | ||
| else: | ||
| return True, token, term, u | ||
|
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of logic is pretty specific. User could do the same in settings.py file, if such behavior is really needed.