Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def home():
"""Generates the home page view template.
Returns:
Template with context.
"""
form = HiddenNameDescriptionForm()
sketches = Sketch.all_with_acl().filter(
not_(Sketch.Status.status == u'deleted'),
Sketch.Status.parent).order_by(Sketch.updated_at.desc())
# Only render upload button if it is configured.
upload_enabled = current_app.config[u'UPLOAD_ENABLED']
# Handle form for creating a new sketch.
if form.validate_on_submit():
sketch = Sketch(
name=form.name.data,
description=form.description.data,
user=current_user)
sketch.status.append(sketch.Status(user=None, status=u'new'))
# Give the requesting user permissions on the new sketch.
sketch.grant_permission(permission=u'read', user=current_user)
sketch.grant_permission(permission=u'write', user=current_user)
sketch.grant_permission(permission=u'delete', user=current_user)
def post(self):
"""Handles POST request to the resource.
Returns:
A sketch in JSON (instance of flask.wrappers.Response)
"""
form = NameDescriptionForm.build(request)
if not form.validate_on_submit():
abort(
HTTP_STATUS_CODE_BAD_REQUEST, 'Unable to validate form data.')
sketch = Sketch(
name=form.name.data,
description=form.description.data,
user=current_user)
sketch.status.append(sketch.Status(user=None, status='new'))
db_session.add(sketch)
db_session.commit()
# Give the requesting user permissions on the new sketch.
sketch.grant_permission(permission='read', user=current_user)
sketch.grant_permission(permission='write', user=current_user)
sketch.grant_permission(permission='delete', user=current_user)
return self.to_json(sketch, status_code=HTTP_STATUS_CODE_CREATED)
pass
if not timeline_name:
if not isinstance(timeline_name, six.text_type):
timeline_name = codecs.decode(timeline_name, 'utf-8')
timeline_name = timeline_name.replace('_', ' ')
# Remove sketch ID if present in the filename.
timeline_parts = timeline_name.split()
if timeline_parts[0].isdigit():
timeline_name = ' '.join(timeline_name.split()[1:])
if not sketch:
# Create a new sketch.
sketch_name = 'Sketch for: {0:s}'.format(timeline_name)
sketch = Sketch(
name=sketch_name, description=sketch_name, user=user)
# Need to commit here to be able to set permissions later.
db_session.add(sketch)
db_session.commit()
sketch.grant_permission(permission='read', user=user)
sketch.grant_permission(permission='write', user=user)
sketch.grant_permission(permission='delete', user=user)
sketch.status.append(sketch.Status(user=None, status='new'))
db_session.add(sketch)
db_session.commit()
index_name = uuid.uuid4().hex
if not isinstance(index_name, six.text_type):
index_name = codecs.decode(index_name, 'utf-8')
searchindex = SearchIndex.get_or_create(
def overview(sketch_id):
"""Generates the sketch overview template.
Returns:
Template with context.
"""
sketch = Sketch.query.get_with_acl(sketch_id)
sketch_form = NameDescriptionForm()
permission_form = TogglePublic()
status_form = StatusForm()
trash_form = TrashForm()
upload_enabled = current_app.config['UPLOAD_ENABLED']
graphs_enabled = current_app.config['GRAPH_BACKEND_ENABLED']
# Dynamically set the forms select options.
# pylint: disable=singleton-comparison
permission_form.groups.choices = set(
(g.id, g.name)
for g in Group.query.filter(
or_(Group.user == current_user, Group.user == None)))
permission_form.remove_groups.choices = set((g.id, g.name)
for g in sketch.groups)
def get(self):
"""Handles GET request to the resource.
Returns:
List of sketches (instance of flask.wrappers.Response)
"""
# TODO: Handle offset parameter
sketches = Sketch.all_with_acl().filter(
not_(Sketch.Status.status == 'deleted'),
Sketch.Status.parent).order_by(Sketch.updated_at.desc())
paginated_result = sketches.paginate(1, 10, False)
meta = {
'next': paginated_result.next_num,
'previous': paginated_result.prev_num,
'offset': paginated_result.page,
'limit': paginated_result.per_page
}
if not paginated_result.has_prev:
meta['previous'] = None
if not paginated_result.has_next:
meta['next'] = None
result = self.to_json(paginated_result.items, meta=meta)
return result
def __init__(self, sketch_id):
"""Initializes a Sketch object.
Args:
sketch_id: The Sketch ID.
"""
self.id = sketch_id
self.sql_sketch = SQLSketch.query.get(sketch_id)
if not self.sql_sketch:
raise RuntimeError('No such sketch')
def __init__(self, name, description, user):
"""Initialize the Sketch object.
Args:
name: The name of the sketch
description: Description of the sketch
user: A user (instance of timesketch.models.user.User)
"""
super(Sketch, self).__init__()
self.name = name
self.description = description
self.user = user
def story(sketch_id, story_id=None):
"""Generates the story list template.
Returns:
Template with context.
"""
sketch = Sketch.query.get_with_acl(sketch_id)
graphs_enabled = current_app.config[u'GRAPH_BACKEND_ENABLED']
current_story = None
if story_id:
current_story = Story.query.get(story_id)
return render_template(
u'sketch/stories.html', sketch=sketch, story=current_story,
graphs_enabled=graphs_enabled)
def __init__(self, sketch_id=None, index=None):
"""Initialize the aggregator object.
Args:
sketch_id: Sketch ID.
index: List of elasticsearch index names.
"""
if not sketch_id and not index:
raise RuntimeError('Need at least sketch_id or index')
self.sketch = SQLSketch.query.get(sketch_id)
self.index = index
self.elastic = Elasticsearch(
host=current_app.config['ELASTIC_HOST'],
port=current_app.config['ELASTIC_PORT'])
if not self.index:
active_timelines = self.sketch.active_timelines
self.index = [t.searchindex.index_name for t in active_timelines]
def export(sketch_id):
"""Generates CSV from search result.
Args:
sketch_id: Primary key for a sketch.
Returns:
CSV string with header.
"""
sketch = Sketch.query.get_with_acl(sketch_id)
view = sketch.get_user_view(current_user)
query_filter = json.loads(view.query_filter)
query_dsl = json.loads(view.query_dsl)
indices = query_filter.get('indices', [])
# Export more than the 500 first results.
max_events_to_fetch = 10000
query_filter['terminate_after'] = max_events_to_fetch
query_filter['size'] = max_events_to_fetch
datastore = ElasticsearchDataStore(
host=current_app.config['ELASTIC_HOST'],
port=current_app.config['ELASTIC_PORT'])
result = datastore.search(
sketch_id,