Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_expunge(self):
config.load(configfile)
# create test entries in cache with correct timestamp
for entry in glob.glob(testentries):
e=minidom.parse(entry)
e.normalize()
eid = e.getElementsByTagName('id')
efile = filename(workdir, eid[0].childNodes[0].nodeValue)
eupdated = e.getElementsByTagName('updated')[0].childNodes[0].nodeValue
emtime = time.mktime(feedparser._parse_date_w3dtf(eupdated))
if not eid or not eupdated: continue
shutil.copyfile(entry, efile)
os.utime(efile, (emtime, emtime))
# create test feeds in cache
sources = config.cache_sources_directory()
for feed in glob.glob(testfeeds):
f=minidom.parse(feed)
f.normalize()
fid = f.getElementsByTagName('id')
if not fid: continue
ffile = filename(sources, fid[0].childNodes[0].nodeValue)
shutil.copyfile(feed, ffile)
def test_filename(self):
self.assertEqual(os.path.join('.', 'example.com,index.html'),
filename('.', 'http://example.com/index.html'))
self.assertEqual(os.path.join('.',
'planet.intertwingly.net,2006,testfeed1,1'),
filename('.', u'tag:planet.intertwingly.net,2006:testfeed1,1'))
self.assertEqual(os.path.join('.',
'00000000-0000-0000-0000-000000000000'),
filename('.', u'urn:uuid:00000000-0000-0000-0000-000000000000'))
# Requires Python 2.3
try:
import encodings.idna
except:
return
self.assertEqual(os.path.join('.', 'xn--8ws00zhy3a.com'),
filename('.', u'http://www.\u8a79\u59c6\u65af.com/'))
def destroy():
from planet import logger as log
cache = config.cache_directory()
index=os.path.join(cache,'index')
if not os.path.exists(index): return None
idindex = filename(index, 'id')
if os.path.exists(idindex): os.unlink(idindex)
os.removedirs(index)
log.info(idindex + " deleted")
except:
libxml2 = False
from xml.dom import minidom
for file in glob(cache+"/*"):
if os.path.isdir(file):
continue
elif libxml2:
try:
doc = libxml2.parseFile(file)
ctxt = doc.xpathNewContext()
ctxt.xpathRegisterNs('atom','http://www.w3.org/2005/Atom')
entry = ctxt.xpathEval('/atom:entry/atom:id')
source = ctxt.xpathEval('/atom:entry/atom:source/atom:id')
if entry and source:
index[filename('',entry[0].content)] = source[0].content
doc.freeDoc()
except:
log.error(file)
else:
try:
doc = minidom.parse(file)
doc.normalize()
ids = doc.getElementsByTagName('id')
entry = [e for e in ids if e.parentNode.nodeName == 'entry']
source = [e for e in ids if e.parentNode.nodeName == 'source']
if entry and source:
index[filename('',entry[0].childNodes[0].nodeValue)] = \
source[0].childNodes[0].nodeValue
doc.freeDoc()
except:
log.error(file)
def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=True):
from planet import logger
import config
try:
import urllib2, StringIO
from planet.spider import filename
# list cache file name
cache_filename = filename(config.cache_lists_directory(), list)
# retrieve list options (e.g., etag, last-modified) from cache
options = {}
# add original options
for key in orig_config.options(list):
options[key] = orig_config.get(list, key)
try:
if use_cache:
cached_config = ConfigParser()
cached_config.read(cache_filename)
for option in cached_config.options(list):
options[option] = cached_config.get(list,option)
except:
pass
if not output_file:
# filter
context.push({'input':XMLParser(StringIO(doc))})
else:
# template
import time
from planet import config,feedparser
from planet.spider import filename
# gather a list of subscriptions, feeds
global subscriptions
feeds = []
sources = config.cache_sources_directory()
for sub in config.subscriptions():
data=feedparser.parse(filename(sources,sub))
data.feed.config = norm(dict(config.parser.items(sub)))
if data.feed.has_key('link'):
feeds.append((data.feed.config.get('name',''),data.feed))
subscriptions.append(norm(sub))
feeds.sort()
# annotate each entry
new_date_format = config.new_date_format()
vars = feedparser.parse(StringIO(doc))
vars.feeds = [value for name,value in feeds]
last_feed = None
last_date = None
for entry in vars.entries:
entry.source.config = find_config(config, entry.source)
# add new_feed and new_date fields
except:
libxml2 = False
from xml.dom import minidom
for file in glob(cache+"/*"):
if os.path.isdir(file):
continue
elif libxml2:
try:
doc = libxml2.parseFile(file)
ctxt = doc.xpathNewContext()
ctxt.xpathRegisterNs('atom','http://www.w3.org/2005/Atom')
entry = ctxt.xpathEval('/atom:entry/atom:id')
source = ctxt.xpathEval('/atom:entry/atom:source/atom:id')
if entry and source:
index[filename('',entry[0].content)] = source[0].content
doc.freeDoc()
except:
log.error(file)
else:
try:
doc = minidom.parse(file)
doc.normalize()
ids = doc.getElementsByTagName('id')
entry = [e for e in ids if e.parentNode.nodeName == 'entry']
source = [e for e in ids if e.parentNode.nodeName == 'source']
if entry and source:
index[filename('',entry[0].childNodes[0].nodeValue)] = \
source[0].childNodes[0].nodeValue
doc.freeDoc()
except:
log.error(file)
def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=True):
from planet import logger
import config
try:
import urllib2, StringIO
from planet.spider import filename
# list cache file name
cache_filename = filename(config.cache_lists_directory(), list)
# retrieve list options (e.g., etag, last-modified) from cache
options = {}
# add original options
for key in orig_config.options(list):
options[key] = orig_config.get(list, key)
try:
if use_cache:
cached_config = ConfigParser()
cached_config.read(cache_filename)
for option in cached_config.options(list):
options[option] = cached_config.get(list,option)
except:
pass