Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
formatter = logging.Formatter('%(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
parser = optparse.OptionParser("fix-songkick")
opts, args = parser.parse_args()
try:
url = args[0]
except IndexError:
f = sys.stdin.buffer
else:
f = urllib.request.urlopen(url)
orig = Calendar.from_ical(f.read())
TRACKING_PREFIX = "You’re tracking this event.\n\n"
GOING_PREFIX = "You’re going.\n\n"
def fix_vevent(vevent):
status = None
if str(vevent['DESCRIPTION']).startswith(TRACKING_PREFIX):
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(TRACKING_PREFIX):]
if str(vevent['DESCRIPTION']).startswith(GOING_PREFIX):
vevent['STATUS'] = 'CONFIRMED'
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(GOING_PREFIX):]
if str(vevent['DESCRIPTION']).startswith(str(vevent['URL'])):
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(str(vevent['URL'])):]
if not vevent['DESCRIPTION']:
del vevent['DESCRIPTION']
lpts = str(vevent['LOCATION']).split(',')
if str(vevent['DESCRIPTION']).startswith(TRACKING_PREFIX):
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(TRACKING_PREFIX):]
if str(vevent['DESCRIPTION']).startswith(GOING_PREFIX):
vevent['STATUS'] = 'CONFIRMED'
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(GOING_PREFIX):]
if str(vevent['DESCRIPTION']).startswith(str(vevent['URL'])):
vevent['DESCRIPTION'] = vevent['DESCRIPTION'][len(str(vevent['URL'])):]
if not vevent['DESCRIPTION']:
del vevent['DESCRIPTION']
lpts = str(vevent['LOCATION']).split(',')
for i in reversed(list(range(len(lpts)))):
loc = (' at ' + ','.join(lpts[:i])) + ' (' + vevent['DTSTART'].dt.strftime('%d %b %y') + ')'
vevent['SUMMARY'] = vevent['SUMMARY'].replace(loc, '')
return vevent
out = Calendar()
for component in orig.subcomponents:
if component.name == 'VEVENT':
component = fix_vevent(component)
out.add_component(component)
sys.stdout.buffer.write(out.to_ical())
for (href, status, propstat) in calendar_query(
url, ['{DAV:}getetag', '{urn:ietf:params:xml:ns:caldav}calendar-data'], filter):
by_status = {}
for propstatsub in propstat:
if propstatsub.tag == '{DAV:}status':
status = propstatsub.text
elif propstatsub.tag == '{DAV:}prop':
by_status[status] = propstatsub
else:
assert False, 'invalid %r' % propstatsub.tag
data = None
for prop in by_status.get('HTTP/1.1 200 OK', []):
if prop.tag == '{urn:ietf:params:xml:ns:caldav}calendar-data':
data = prop.text
assert data is not None, "data missing for %r" % href
yield href, Calendar.from_ical(data)
elif self.ics_url:
cal_data = urlopen(self.ics_url)
elif self.xml_file:
cal_data = open(self.xml_file, "rb")
elif self.ics_file:
cal_data = open(self.ics_file, "rb")
else:
raise UnboundLocalError("No calendar url or file path has been set.")
cal_str = cal_data.read()
cal_data.close()
if (self.xml_url or self.xml_file) and not force_ics:
self.calendar = BeautifulStoneSoup(_normalize(cal_str, True))
elif (self.ics_url or self.ics_file) and not force_xml:
self.calendar = Calendar.from_ical(cal_str)
return self.calendar
def calendar(self):
if self._calendar is None:
try:
self._calendar = Calendar.from_ical(b''.join(self.content))
except ValueError as e:
raise InvalidFileContents(
self.content_type, self.content, str(e))
return self._calendar
async def report(self, environ, body, resources_by_hrefs,
properties, base_href, base_resource, depth):
requested = None
for el in body:
if el.tag == '{urn:ietf:params:xml:ns:caldav}time-range':
requested = el
else:
raise AssertionError("unexpected XML element")
tz = get_calendar_timezone(base_resource)
def tzify(dt):
return as_tz_aware_ts(dt, tz).astimezone(pytz.utc)
(start, end) = _parse_time_range(requested)
assert start.tzinfo
assert end.tzinfo
ret = ICalendar()
ret['VERSION'] = '2.0'
ret['PRODID'] = PRODID
fb = FreeBusy()
fb['DTSTAMP'] = vDDDTypes(tzify(datetime.datetime.now()))
fb['DTSTART'] = vDDDTypes(start)
fb['DTEND'] = vDDDTypes(end)
fb['FREEBUSY'] = [item async for item in iter_freebusy(
webdav.traverse_resource(base_resource, base_href, depth),
start, end, tzify)]
ret.add_component(fb)
return webdav.Response(status='200 OK', body=[ret.to_ical()])
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from dystros import filters, utils
parser = optparse.OptionParser("travel")
store_set_options = utils.StoreSetOptionGroup(parser)
parser.add_option_group(store_set_options)
opts, args = parser.parse_args()
stores = utils.StoreSet.from_options(opts)
vevents = filters.extract_vevents(stores.iter_calendars())
out = Calendar()
freebusy = FreeBusy()
for vevent in vevents:
freebusy['UID'] = vevent['UID']
freebusy['DTSTART'] = vevent['DTSTART']
freebusy['DTEND'] = vevent['DTEND']
out.add_component(freebusy)
print(out.to_ical())
async def get_value_ext(self, base_href, resource, el, environ, requested):
if len(requested) == 0:
serialized_cal = b''.join(await resource.get_body())
else:
c = ICalendar()
calendar = calendar_from_resource(resource)
if calendar is None:
raise KeyError
extract_from_calendar(calendar, c, requested)
serialized_cal = c.to_ical()
# TODO(jelmer): Don't hardcode encoding
# TODO(jelmer): Strip invalid characters or raise an exception
el.text = serialized_cal.decode('utf-8')
def get_pytz_from_text(tztext):
tzid = extract_tzid(ICalendar.from_ical(tztext))
return pytz.timezone(tzid)
if location is not None:
props['location'] = vText(location)
if opts.url:
props['url'] = vUri(opts.url)
if description is not None:
props['summary'] = vText(description)
if dtend is not None:
props['dtend'] = vDate(dtend.date())
if duration is not None:
props['duration'] = vDuration(duration)
uid = str(uuid.uuid1())
props['UID'] = uid
ev = Event(**props)
c = Calendar()
c.add_component(ev)
fname = uid + '.ics'
url = urllib.parse.urljoin(opts.url, fname)
utils.put(url, c.to_ical())
logging.info('Wrote %s', url)