Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_prep_value(self, value):
# Value in DB should be null.
if value is None:
return None
if not isinstance(value, isodate.duration.Duration):
raise ValidationError(
'Cannot convert objects that are not Durations.')
return isodate.duration_isoformat(value)
def hide_transcript(current_event, caliper_event):
"""
When a user toggles Show Transcript to suppress display of the video transcript, the browser or mobile app emits a
hide_transcript event.
:param current_event: default log
:param caliper_event: log containing both basic and default attribute
:return: final created log
"""
current_event_details = json.loads(current_event['event'])
caliper_event.update({
'action': 'DisabledClosedCaptioning',
'type': 'MediaEvent',
'object': {
'duration': duration_isoformat(
timedelta(seconds=current_event_details['duration'])),
'extensions': {
'code': current_event_details['code'],
'id': current_event_details['id']
},
'id': current_event['referer'],
'type': 'VideoObject'
},
'target': {
'currentTime': duration_isoformat(
timedelta(seconds=current_event_details['current_time'])),
'id': current_event['referer'],
'type': 'MediaLocation'
}
})
caliper_event['actor'].update({
def xmlvalue(self, value):
return isodate.duration_isoformat(value)
def build_sampling_rate(min_freq, max_freq):
min_sampling_rate = isodate.duration_isoformat(timedelta(seconds=min_freq))
max_sampling_rate = isodate.duration_isoformat(timedelta(seconds=max_freq))
sampling_rate = oadr_20b.oadrSamplingRateType(oadrMinPeriod=min_sampling_rate,
oadrMaxPeriod=max_sampling_rate,
oadrOnChange=False)
return sampling_rate
def to_primitive(self, value, context=None):
return duration_isoformat(value)
'duration': lambda d: isodate.duration_isoformat(d),
'geopoint': lambda d: list(map(float, d)),
isinstance(obj, datetime.datetime) or \
isinstance(obj, datetime.time):
## Note this issue with isodate module: "time formating does not allow
## to create fractional representations".
## Hence we use standard Python isoformat()
##
s = obj.isoformat()
if hasattr(obj, 'tzinfo') and obj.tzinfo is None and s[-1] != 'Z':
## assume UTC and append 'Z' for ISO format compliance!
return s + 'Z'
else:
return s
elif isinstance(obj, datetime.timedelta):
#return (datetime.datetime.min + obj).time().isoformat()
return isodate.duration_isoformat(obj)
else:
return super(CustomJsonEncoder, self).default(obj)
def fail(self, error_message="", capture_exception=True):
"""
Send a message to the controller that this message failed to process
"""
self.controller_queue.send_json({
'ramp_unique_id': self.ramp_unique_id,
'ack_value': -1,
'content': {
'process_name': self.process_name,
'msg_type': 'fail',
'duration': duration_isoformat(datetime.datetime.now() - self.init_time),
'message_content': self.content
},
'producer_uuid': self.producer_uuid,
'destination_uuid': self.producer_uuid,
'error_message': error_message if not capture_exception else traceback.format_exc(),
})
'ip': current_event.get('ip')
})
caliper_event['referrer']['type'] = 'WebPage'
caliper_event['actor'].update({
'name': current_event.get('username'),
'type': 'Person'
})
event_info = json.loads(current_event['event'])
caliper_event['object'] = {
'id': current_event.get('referer'),
'type': 'VideoObject',
'duration': duration_isoformat(timedelta(
seconds=event_info.pop('duration')
)),
'extensions': {
'id': event_info.get('id'),
'code': event_info.get('code'),
}
}
return caliper_event
When a user selects the video player's pause control,
the player emits a pause_video event. For videos that
are streamed in a browser, when the player reaches the
end of the video file and play automatically stops it
emits both this event and a stop event (as of June 2014).
:param current_event: default event log generated.
:param caliper_event: caliper_event log having some basic attributes.
:return: updated caliper_event.
"""
current_event_details = json.loads(current_event['event'])
caliper_event.update({
'action': 'Paused',
'type': 'MediaEvent',
'object': {
'duration': duration_isoformat(timedelta(
seconds=current_event_details['duration']
)),
'extensions': {
'code': current_event_details['code'],
'id': current_event_details['id']
},
'id': current_event['referer'],
'type': 'VideoObject'
},
'target': {
'currentTime': duration_isoformat(timedelta(
seconds=current_event_details['currentTime']
)),
'id': current_event['referer'],
'type': 'MediaLocation'
}