Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_autodiscover_cache(self, m):
# Empty the cache
from exchangelib.autodiscover import _autodiscover_cache
_autodiscover_cache.clear()
cache_key = (self.account.domain, self.account.protocol.credentials)
# Not cached
self.assertNotIn(cache_key, _autodiscover_cache)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Now it's cached
self.assertIn(cache_key, _autodiscover_cache)
# Make sure the cache can be looked by value, not by id(). This is important for multi-threading/processing
self.assertIn((
self.account.primary_smtp_address.split('@')[1],
Credentials(self.account.protocol.credentials.username, self.account.protocol.credentials.password),
True
), _autodiscover_cache)
# Poison the cache. discover() must survive and rebuild the cache
_autodiscover_cache[cache_key] = AutodiscoverProtocol(config=Configuration(
service_endpoint='https://example.com/blackhole.asmx',
credentials=Credentials('leet_user', 'cannaguess'),
auth_type=NTLM,
retry_policy=FailFast(),
))
m.post('https://example.com/blackhole.asmx', status_code=404)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
self.assertIn(cache_key, _autodiscover_cache)
# Make sure that the cache is actually used on the second call to discover()
_orig = exchangelib.autodiscover._try_autodiscover
cache_key = (self.account.domain, self.account.protocol.credentials)
# Not cached
self.assertNotIn(cache_key, _autodiscover_cache)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Now it's cached
self.assertIn(cache_key, _autodiscover_cache)
# Make sure the cache can be looked by value, not by id(). This is important for multi-threading/processing
self.assertIn((
self.account.primary_smtp_address.split('@')[1],
Credentials(self.account.protocol.credentials.username, self.account.protocol.credentials.password),
True
), _autodiscover_cache)
# Poison the cache. discover() must survive and rebuild the cache
_autodiscover_cache[cache_key] = AutodiscoverProtocol(config=Configuration(
service_endpoint='https://example.com/blackhole.asmx',
credentials=Credentials('leet_user', 'cannaguess'),
auth_type=NTLM,
retry_policy=FailFast(),
))
m.post('https://example.com/blackhole.asmx', status_code=404)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
self.assertIn(cache_key, _autodiscover_cache)
# Make sure that the cache is actually used on the second call to discover()
_orig = exchangelib.autodiscover._try_autodiscover
def _mock(*args, **kwargs):
raise NotImplementedError()
exchangelib.autodiscover._try_autodiscover = _mock
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Fake that another thread added the cache entry into the persistent storage but we don't have it in our
# in-memory cache. The cache should work anyway.
def test_versioned_choice(self):
field = ChoiceField('foo', field_uri='bar', choices={
Choice('c1'), Choice('c2', supported_from=EXCHANGE_2010)
})
with self.assertRaises(ValueError):
field.clean('XXX') # Value must be a valid choice
field.clean('c2', version=None)
with self.assertRaises(ErrorInvalidServerVersion):
field.clean('c2', version=Version(EXCHANGE_2007))
field.clean('c2', version=Version(EXCHANGE_2010))
field.clean('c2', version=Version(EXCHANGE_2013))
def test_garbage_input(self):
# Test that we can survive garbage input for common field types
tz = EWSTimeZone.timezone('Europe/Copenhagen')
account = namedtuple('Account', ['default_timezone'])(default_timezone=tz)
payload = b'''\
THIS_IS_GARBAGE
'''
elem = to_xml(payload).find('{%s}Item' % TNS)
for field_cls in (Base64Field, BooleanField, IntegerField, DateField, DateTimeField, DecimalField):
field = field_cls('foo', field_uri='item:Foo', is_required=True, default='DUMMY')
self.assertEqual(field.from_xml(elem=elem, account=account), None)
# Test MS timezones
payload = b'''\
'''
elem = to_xml(payload).find('{%s}Item' % TNS)
field = TimeZoneField('foo', field_uri='item:Foo', default='DUMMY')
self.assertEqual(field.from_xml(elem=elem, account=account), None)
raise ValueError('Unsupported field %s' % field)
if isinstance(field, URIField):
return get_random_url()
if isinstance(field, EmailAddressField):
return get_random_email()
if isinstance(field, ChoiceField):
return get_random_choice(field.supported_choices(version=self.account.version))
if isinstance(field, CultureField):
return get_random_choice(['da-DK', 'de-DE', 'en-US', 'es-ES', 'fr-CA', 'nl-NL', 'ru-RU', 'sv-SE'])
if isinstance(field, BodyField):
return get_random_string(400)
if isinstance(field, CharListField):
return [get_random_string(16) for _ in range(random.randint(1, 4))]
if isinstance(field, TextListField):
return [get_random_string(400) for _ in range(random.randint(1, 4))]
if isinstance(field, CharField):
return get_random_string(field.max_length)
if isinstance(field, TextField):
return get_random_string(400)
if isinstance(field, MimeContentField):
return get_random_string(400)
if isinstance(field, Base64Field):
return get_random_bytes(400)
if isinstance(field, BooleanField):
return get_random_bool()
if isinstance(field, DecimalField):
return get_random_decimal(field.min or 1, field.max or 99)
if isinstance(field, IntegerField):
return get_random_int(field.min or 0, field.max or 256)
if isinstance(field, DateTimeField):
return get_random_datetime(tz=self.account.default_timezone)
if isinstance(field, AttachmentField):
def test_generic_folder(self):
f = Folder(parent=self.account.inbox, name=get_random_string(16))
f.save()
f.name = get_random_string(16)
f.save()
f.delete()
UTC.localize(EWSDateTime(2000, 1, 2, 3, 4, 5))
)
self.assertIsInstance(EWSDateTime.from_string('2000-01-02T03:04:05+01:00'), EWSDateTime)
self.assertIsInstance(EWSDateTime.from_string('2000-01-02T03:04:05Z'), EWSDateTime)
# Test addition, subtraction, summertime etc
self.assertIsInstance(dt + datetime.timedelta(days=1), EWSDateTime)
self.assertIsInstance(dt - datetime.timedelta(days=1), EWSDateTime)
self.assertIsInstance(dt - EWSDateTime.now(tz=tz), datetime.timedelta)
self.assertIsInstance(EWSDateTime.now(tz=tz), EWSDateTime)
self.assertEqual(dt, EWSDateTime.from_datetime(tz.localize(datetime.datetime(2000, 1, 2, 3, 4, 5))))
self.assertEqual(dt.ewsformat(), '2000-01-02T03:04:05+01:00')
utc_tz = EWSTimeZone.timezone('UTC')
self.assertEqual(dt.astimezone(utc_tz).ewsformat(), '2000-01-02T02:04:05Z')
# Test summertime
dt = tz.localize(EWSDateTime(2000, 8, 2, 3, 4, 5))
self.assertEqual(dt.astimezone(utc_tz).ewsformat(), '2000-08-02T01:04:05Z')
# Test normalize, for completeness
self.assertEqual(tz.normalize(dt).ewsformat(), '2000-08-02T03:04:05+02:00')
self.assertEqual(utc_tz.normalize(dt, is_dst=True).ewsformat(), '2000-08-02T01:04:05Z')
# Test in-place add and subtract
dt = tz.localize(EWSDateTime(2000, 1, 2, 3, 4, 5))
dt += datetime.timedelta(days=1)
self.assertIsInstance(dt, EWSDateTime)
self.assertEqual(dt, tz.localize(EWSDateTime(2000, 1, 3, 3, 4, 5)))
dt = tz.localize(EWSDateTime(2000, 1, 2, 3, 4, 5))
dt -= datetime.timedelta(days=1)
self.assertIsInstance(dt, EWSDateTime)
self.assertEqual(dt, tz.localize(EWSDateTime(2000, 1, 1, 3, 4, 5)))
# Test ewsformat() failure
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2100, 12, 1)),
end=UTC.localize(EWSDateTime(2100, 11, 1)),
).clean(version=None)
with self.assertRaises(ValueError):
# End must be in the future
OofSettings(
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2000, 11, 1)),
end=UTC.localize(EWSDateTime(2000, 12, 1)),
).clean(version=None)
with self.assertRaises(ValueError):
# Must have an internal and external reply
OofSettings(
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2100, 11, 1)),
end=UTC.localize(EWSDateTime(2100, 12, 1)),
).clean(version=None)
# Test summertime
dt = tz.localize(EWSDateTime(2000, 8, 2, 3, 4, 5))
self.assertEqual(dt.astimezone(utc_tz).ewsformat(), '2000-08-02T01:04:05Z')
# Test normalize, for completeness
self.assertEqual(tz.normalize(dt).ewsformat(), '2000-08-02T03:04:05+02:00')
self.assertEqual(utc_tz.normalize(dt, is_dst=True).ewsformat(), '2000-08-02T01:04:05Z')
# Test in-place add and subtract
dt = tz.localize(EWSDateTime(2000, 1, 2, 3, 4, 5))
dt += datetime.timedelta(days=1)
self.assertIsInstance(dt, EWSDateTime)
self.assertEqual(dt, tz.localize(EWSDateTime(2000, 1, 3, 3, 4, 5)))
dt = tz.localize(EWSDateTime(2000, 1, 2, 3, 4, 5))
dt -= datetime.timedelta(days=1)
self.assertIsInstance(dt, EWSDateTime)
self.assertEqual(dt, tz.localize(EWSDateTime(2000, 1, 1, 3, 4, 5)))
# Test ewsformat() failure
dt = EWSDateTime(2000, 1, 2, 3, 4, 5)
with self.assertRaises(ValueError):
dt.ewsformat()
# Test wrong tzinfo type
with self.assertRaises(ValueError):
EWSDateTime(2000, 1, 2, 3, 4, 5, tzinfo=pytz.utc)
with self.assertRaises(ValueError):
EWSDateTime.from_datetime(EWSDateTime(2000, 1, 2, 3, 4, 5))
# Needs a start and end
OofSettings(
state=OofSettings.SCHEDULED,
).clean(version=None)
with self.assertRaises(ValueError):
# Start must be before end
OofSettings(
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2100, 12, 1)),
end=UTC.localize(EWSDateTime(2100, 11, 1)),
).clean(version=None)
with self.assertRaises(ValueError):
# End must be in the future
OofSettings(
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2000, 11, 1)),
end=UTC.localize(EWSDateTime(2000, 12, 1)),
).clean(version=None)
with self.assertRaises(ValueError):
# Must have an internal and external reply
OofSettings(
state=OofSettings.SCHEDULED,
start=UTC.localize(EWSDateTime(2100, 11, 1)),
end=UTC.localize(EWSDateTime(2100, 12, 1)),
).clean(version=None)