Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from scrapy.conf import settings
from scrapy.selector import Selector
from scrapy.http.request import Request
from scrapy_imdb.items import ImdbItem
"""
This class defines the rules according to which we extract information
from the urls we scrape. We firstly collect all the urls of the given
list and then we issue a request per url. This request results in a
page corresponding to a movie existing in the given list. For this
movie we extract relevant information such as title, duration, etc.
"""
class ImdbSpider(Spider):
name = "imdb"
allowed_domains = ["imdb.com"]
start_urls = settings['START_URLS']
# data members
protocol = "http"
base_url = "www.imdb.com"
"""
For every url existing in start_urls (each one corresponds to a
movie list) we extract the urls of the movies in the list.
"""
def parse(self, response):
sel = Selector(response)
# xpath rule for extracting movies' urls
url_list = sel.xpath('//tbody[@class="lister-list"]/tr\
/td[@class="titleColumn"]/a/@href').extract()
def execute(argv=None, settings=None):
if argv is None:
argv = sys.argv
# --- backwards compatibility for scrapy.conf.settings singleton ---
if settings is None and 'scrapy.conf' in sys.modules:
from scrapy import conf
if hasattr(conf, 'settings'):
settings = conf.settings
# ------------------------------------------------------------------
if settings is None:
settings = get_project_settings()
check_deprecated_settings(settings)
# --- backwards compatibility for scrapy.conf.settings singleton ---
import warnings
from scrapy.exceptions import ScrapyDeprecationWarning
with warnings.catch_warnings():
warnings.simplefilter("ignore", ScrapyDeprecationWarning)
from scrapy import conf
conf.settings = settings
# ------------------------------------------------------------------
crawler = CrawlerProcess(settings)
def __init__(self):
self.access_key = settings['AWS_ACCESS_KEY_ID']
self.secret_key = settings['AWS_SECRET_ACCESS_KEY']
def __init__(self):
self.mapping = settings['SOLR_MAPPING'].items()
self.solr = pysolr.Solr(settings['SOLR_URL'], timeout=10)
def __init__(self):
super(SimpledbStatsCollector, self).__init__()
self._sdbdomain = settings['STATS_SDB_DOMAIN']
self._access_key = settings['AWS_ACCESS_KEY_ID']
self._secret_key = settings['AWS_SECRET_ACCESS_KEY']
self._async = settings.getbool('STATS_SDB_ASYNC')
import boto
self.connect_sdb = boto.connect_sdb
self.connect_sdb(aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key).create_domain(self._sdbdomain)
def update_ipaddress(self):
'''
Updates the scheduler so it knows its own ip address
'''
# assign local ip in case of exception
self.old_ip = self.my_ip
self.my_ip = '127.0.0.1'
try:
obj = urllib.request.urlopen(settings.get('PUBLIC_IP_URL',
'http://ip.42.pl/raw'))
results = self.ip_regex.findall(obj.read())
if len(results) > 0:
self.my_ip = results[0]
else:
raise IOError("Could not get valid IP Address")
obj.close()
self.logger.debug("Current public ip: {ip}".format(ip=self.my_ip))
except IOError:
self.logger.error("Could not reach out to get public ip")
pass
if self.old_ip != self.my_ip:
self.logger.info("Changed Public IP: {old} -> {new}".format(
old=self.old_ip, new=self.my_ip))
def __init__(self):
if not settings['ITEMSAMPLER_FILE']:
raise NotConfigured
def __init__(self):
if not settings.getbool('CLUSTER_WORKER_ENABLED'):
raise NotConfigured
self.maxproc = settings.getint('CLUSTER_WORKER_MAXPROC')
self.logdir = settings['CLUSTER_LOGDIR']
self.running = {} # dict of domain->ScrapyProcessControl
self.crawlers = {} # dict of pid->scrapy process remote pb connection
self.starttime = datetime.datetime.utcnow()
self.prerun_hooks = [load_object(f) for f in settings.getlist('CLUSTER_WORKER_PRERUN_HOOKS', [])]
port = settings.getint('CLUSTER_WORKER_PORT')
factory = pb.PBServerFactory(self, unsafeTracebacks=True)
scrapyengine.listenTCP(port, factory)
log.msg("Using sys.path: %s" % repr(sys.path), level=log.DEBUG)
def __init__(self):
self.header_accept = settings.get('REQUEST_HEADER_ACCEPT')
self.header_accept_language = settings.get('REQUEST_HEADER_ACCEPT_LANGUAGE')
def process_options(self, args, opts):
ScrapyCommand.process_options(self, args, opts)
if opts.nopipeline:
settings.overrides['ITEM_PIPELINES'] = []
if opts.nocache:
settings.overrides['CACHE2_DIR'] = None
if opts.restrict:
settings.overrides['RESTRICT_TO_URLS'] = args
if opts.nofollow:
settings.overrides['CRAWLSPIDER_FOLLOW_LINKS'] = False