Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_expiry():
redis_session.set_expiry(1)
# Test if the expiry age is set correctly
eq_(redis_session.get_expiry_age(), 1)
redis_session['key'] = 'expiring_value'
redis_session.save()
key = redis_session.session_key
eq_(redis_session.exists(key), True)
time.sleep(2)
eq_(redis_session.exists(key), False)
# Set up configuration settings for links.
for rel, value in link_config.iteritems():
ConfigurationSetting.for_library(rel, self._default_library).value = value
# Set up settings for navigation links.
ConfigurationSetting.for_library(
Configuration.WEB_HEADER_LINKS, self._default_library
).value = json.dumps(["http://example.com/1", "http://example.com/2"])
ConfigurationSetting.for_library(
Configuration.WEB_HEADER_LABELS, self._default_library
).value = json.dumps(["one", "two"])
self.annotator.add_configuration_links(mock_feed)
# Ten links were added to the "feed"
eq_(10, len(mock_feed))
# They are the links we'd expect.
links = {}
for link in mock_feed:
rel = link.attrib['rel']
href = link.attrib['href']
if rel == 'help' or rel == 'related':
continue # Tested below
# Check that the configuration value made it into the link.
eq_(href, link_config[rel])
eq_("text/html", link.attrib['type'])
# There are three help links using different protocols.
help_links = [x.attrib['href'] for x in mock_feed
if x.attrib['rel'] == 'help']
eq_(set(["mailto:help@me", "http://help/", "uri:help"]),
def test_cron_schedule_1(self):
@self.scheduler.scheduled_job('cron')
def increment():
vals[0] += 1
vals = [0]
start = increment.job.next_run_time
self.scheduler._process_jobs(start)
self.scheduler._process_jobs(start + timedelta(seconds=1))
eq_(vals[0], 2)
data = sample_data('metadata_isbn_response.opds', 'opds')
lookup = MockMetadataWranglerOPDSLookup.from_config(self._db, collection)
lookup.queue_response(
200, {'content-type' : OPDSFeed.ACQUISITION_FEED_TYPE}, data
)
monitor = MetadataWranglerCollectionUpdateMonitor(
collection, lookup)
monitor.run_once(None, None)
# The original Identifier has information from the
# mock Metadata Wrangler.
mw_source = DataSource.lookup(self._db, DataSource.METADATA_WRANGLER)
eq_(3, len(lp.identifier.links))
[quality] = lp.identifier.measurements
eq_(mw_source, quality.data_source)
expected = (filenames, filepaths)
dictionary = {'experiment_id': '001',
'id_column': 'A',
'candidate_column': 'B',
'train_file': 'path/to/train.tsv',
'test_file': 'path/to/test.tsv',
'features': 'path/to/features.csv',
"model": 'LinearRegression',
'subgroups': ['C']}
config = Configuration(dictionary)
values_for_reader = config.get_names_and_paths(['train_file', 'test_file',
'features'],
['train', 'test',
'feature_specs'])
eq_(values_for_reader, expected)
def _assert_json_response(self, expected, code, response):
content_type = 'text/javascript; charset=UTF-8'
eq_(response.headers['Content-Type'], content_type)
got = json.loads(response.body)
eq_(got, expected)
eq_(response.code, code)
minimum_event_length=1.0
)
nose.tools.eq_(len(meta), 3)
nose.tools.eq_(meta[0].filename, 'audio_001.wav')
nose.tools.eq_(meta[0].scene_label, 'office')
nose.tools.eq_(meta[0].event_label, 'speech')
nose.tools.eq_(meta[0].onset, 1.5)
nose.tools.eq_(meta[0].offset, 3.0)
nose.tools.eq_(meta[1].filename, 'audio_001.wav')
nose.tools.eq_(meta[1].scene_label, 'office')
nose.tools.eq_(meta[1].event_label, 'speech')
nose.tools.eq_(meta[1].onset, 4.0)
nose.tools.eq_(meta[1].offset, 6.0)
nose.tools.eq_(meta[2].filename, 'audio_001.wav')
nose.tools.eq_(meta[2].scene_label, 'office')
nose.tools.eq_(meta[2].event_label, 'speech')
nose.tools.eq_(meta[2].onset, 7.0)
nose.tools.eq_(meta[2].offset, 8.0)
meta = MetaDataContainer(content2).process_events(
minimum_event_gap=1.0,
minimum_event_length=1.0
)
nose.tools.eq_(len(meta), 1)
nose.tools.eq_(meta[0].filename, 'audio_001.wav')
nose.tools.eq_(meta[0].scene_label, 'office')
nose.tools.eq_(meta[0].event_label, 'speech')
def test_stats_patrons(self):
with self.request_context_with_admin("/"):
self.admin.add_role(AdminRole.SYSTEM_ADMIN)
# At first, there's one patron in the database.
response = self.manager.admin_dashboard_controller.stats()
library_data = response.get(self._default_library.short_name)
total_data = response.get("total")
for data in [library_data, total_data]:
patron_data = data.get('patrons')
eq_(1, patron_data.get('total'))
eq_(0, patron_data.get('with_active_loans'))
eq_(0, patron_data.get('with_active_loans_or_holds'))
eq_(0, patron_data.get('loans'))
eq_(0, patron_data.get('holds'))
edition, pool = self._edition(with_license_pool=True, with_open_access_download=False)
edition2, open_access_pool = self._edition(with_open_access_download=True)
# patron1 has a loan.
patron1 = self._patron()
pool.loan_to(patron1, end=datetime.now() + timedelta(days=5))
# patron2 has a hold.
patron2 = self._patron()
pool.on_hold_to(patron2)
# patron3 has an open access loan with no end date, but it doesn't count
# because we don't know if it is still active.
def test_access_bucket_publicreadwrite_object_publicread():
obj = _setup_access(bucket_acl='public-read-write', object_acl='public-read')
eq(obj.a2.get_contents_as_string(), 'foocontent')
obj.a2.set_contents_from_string('barcontent')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
nose.tools.eq_(meta[0].filename, 'audio_002.wav')
nose.tools.eq_(meta[0].scene_label, 'meeting')
nose.tools.eq_(meta[0].event_label, 'speech')
nose.tools.eq_(meta[0].onset, 1.0)
nose.tools.eq_(meta[0].offset, 9.0)
nose.tools.eq_(meta[1].filename, 'audio_002.wav')
nose.tools.eq_(meta[1].scene_label, 'meeting')
nose.tools.eq_(meta[1].event_label, 'printer')
nose.tools.eq_(meta[1].onset, 5.0)
nose.tools.eq_(meta[1].offset, 7.0)
# Test filter by scene_label
meta = MetaDataContainer(content).filter(scene_label='office')
nose.tools.eq_(len(meta), 3)
nose.tools.eq_(meta[0].filename, 'audio_001.wav')
nose.tools.eq_(meta[0].scene_label, 'office')
nose.tools.eq_(meta[0].event_label, 'speech')
nose.tools.eq_(meta[0].onset, 1.0)
nose.tools.eq_(meta[0].offset, 10.0)
nose.tools.eq_(meta[1].filename, 'audio_001.wav')
nose.tools.eq_(meta[1].scene_label, 'office')
nose.tools.eq_(meta[1].event_label, 'mouse clicking')
nose.tools.eq_(meta[1].onset, 3.0)
nose.tools.eq_(meta[1].offset, 5.0)
meta = MetaDataContainer(content).filter(scene_list=['meeting'])
nose.tools.eq_(len(meta), 2)
# Test filter by event_label