Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
])
cmd = database.Command.highlight_add(
self.current_user,
document,
hl,
obj.get('tags', []),
)
self.db.add(cmd)
self.db.commit()
self.db.refresh(cmd)
self.application.notify_project(document.project_id, cmd)
self.send_json({'id': hl.id})
class HighlightUpdate(BaseHandler):
@authenticated
def post(self, project_id, document_id, highlight_id):
obj = self.get_json()
document = self.get_document(project_id, document_id)
hl = self.db.query(database.Highlight).get(int(highlight_id))
if hl.document_id != document.id:
raise HTTPError(404)
if obj:
if 'start_offset' in obj:
hl.start_offset = obj['start_offset']
if 'end_offset' in obj:
hl.end_offset = obj['end_offset']
if 'tags' in obj:
(
self.db.query(database.HighlightTag)
.filter(database.HighlightTag.highlight == hl)
self.db.delete(tag)
cmd = database.Command.tag_delete(
self.current_user,
project.id,
tag.id,
)
self.db.add(cmd)
self.db.commit()
self.db.refresh(cmd)
self.application.notify_project(project.id, cmd)
self.set_status(204)
self.finish()
class HighlightAdd(BaseHandler):
@authenticated
def post(self, project_id, document_id):
obj = self.get_json()
document = self.get_document(project_id, document_id, True)
start, end = obj['start_offset'], obj['end_offset']
snippet = extract.extract(document.contents, start, end)
hl = database.Highlight(document=document,
start_offset=start,
end_offset=end,
snippet=snippet)
self.db.add(hl)
self.db.flush() # Need to flush to get hl.id
self.db.bulk_insert_mappings(database.HighlightTag, [
dict(
highlight_id=hl.id,
tag_id=tag,
self.db.rollback()
self.set_status(409)
return self.finish()
cmd = database.Command.tag_add(
self.current_user,
tag,
)
self.db.add(cmd)
self.db.commit()
self.db.refresh(cmd)
self.application.notify_project(project.id, cmd)
self.send_json({'id': tag.id})
class TagUpdate(BaseHandler):
@authenticated
def post(self, project_id, tag_id):
obj = self.get_json()
project = self.get_project(project_id)
tag = self.db.query(database.Tag).get(int(tag_id))
if tag.project_id != project.id:
raise HTTPError(404)
if obj:
if 'path' in obj:
tag.path = obj['path']
if 'description' in obj:
tag.description = obj['description']
cmd = database.Command.tag_add(
self.current_user,
tag,
)
# Add default set of tags
self.db.add(database.Tag(project=project, path='interesting',
description="Further review required"))
self.db.add(database.Tag(project=project, path='people',
description="Known people"))
self.db.commit()
self.redirect(self.reverse_url('project', project.id))
def render(self, template_name, **kwargs):
for name in ('name', 'description', 'error'):
kwargs.setdefault(name, '')
super(ProjectAdd, self).render(template_name, **kwargs)
class Project(BaseHandler):
@authenticated
def get(self, project_id):
project = self.get_project(project_id)
documents_json = jinja2.Markup(json.dumps(
{
str(doc.id): {'id': doc.id, 'name': doc.name,
'description': doc.description}
for doc in project.documents
},
sort_keys=True,
))
tags_json = jinja2.Markup(json.dumps(
{
str(tag.id): {'id': tag.id,
'path': tag.path,
'description': tag.description}
"""
if '/' in name:
name = name[name.rindex('/') + 1:]
if sys.platform == 'win32' and '\\' in name:
# It seems that IE gets that wrong, at least when the file is from
# a network share
name = name[name.rindex('\\') + 1:]
name = _not_ascii_re.sub('', name).strip('._')
if not name:
return '_'
if os.name == 'nt' and name.split('.')[0].upper() in _windows_device_files:
name = '_' + name
return name
class DocumentAdd(BaseHandler):
@authenticated
async def post(self, project_id):
project = self.get_project(project_id)
name = self.get_body_argument('name')
description = self.get_body_argument('description')
file = self.request.files['file'][0]
content_type = file.content_type
filename = secure_filename(file.filename)
try:
body = await convert.to_html_chunks(file.body, content_type,
filename)
except convert.ConversionError as err:
self.set_status(400)
self.send_json({
@export_doc
def get(self, project_id, document_id):
doc = self.get_document(project_id, document_id, True)
highlights = merge_overlapping_ranges((hl.start_offset, hl.end_offset)
for hl in doc.highlights)
html = self.render_string(
'export_document.html',
name=doc.name,
contents=Markup(extract.highlight(doc.contents, highlights)),
)
return doc.name, html
class ExportCodebookCsv(BaseHandler):
@authenticated
def get(self, project_id):
project = self.get_project(project_id)
tags = list(project.tags)
self.set_header('Content-Type', 'text/csv; charset=utf-8')
self.set_header('Content-Disposition',
'attachment; filename="codebook.csv"')
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['tag', 'description'])
for tag in tags:
writer.writerow([tag.path, tag.description])
self.finish(buf.getvalue())
class ExportCodebookDoc(BaseHandler):
document = self.get_document(project_id, document_id, True)
self.send_json({
'contents': [
{'offset': 0, 'contents': document.contents},
],
'highlights': [
{'id': hl.id,
'start_offset': hl.start_offset,
'end_offset': hl.end_offset,
'tags': [t.id for t in hl.tags]}
for hl in document.highlights
],
})
class TagAdd(BaseHandler):
@authenticated
def post(self, project_id):
obj = self.get_json()
project = self.get_project(project_id)
tag = database.Tag(project=project,
path=obj['path'], description=obj['description'])
try:
self.db.add(tag)
self.db.flush() # Need to flush to get tag.id
except IntegrityError:
self.db.rollback()
self.set_status(409)
return self.finish()
cmd = database.Command.tag_add(
self.current_user,
tag,
def _go_to_next(self):
next_ = self.get_argument('next')
if not next_:
next_ = self.reverse_url('index')
self.redirect(next_)
class Logout(BaseHandler):
def get(self):
if not self.application.multiuser:
raise HTTPError(404)
self.logout()
self.redirect(self.reverse_url('index'))
class Register(BaseHandler):
def get(self):
if not self.application.multiuser:
raise HTTPError(404)
if not self.application.register_enabled:
raise HTTPError(403)
if self.current_user:
self.redirect(self.reverse_url('index'))
else:
self.render('login.html', register=True)
def post(self):
if not self.application.multiuser:
raise HTTPError(404)
if not self.application.register_enabled:
raise HTTPError(403)
login = self.get_body_argument('login')
@authenticated
def get(self, project_id):
project = self.get_project(project_id)
tags = list(project.tags)
self.set_header('Content-Type', 'text/csv; charset=utf-8')
self.set_header('Content-Disposition',
'attachment; filename="codebook.csv"')
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['tag', 'description'])
for tag in tags:
writer.writerow([tag.path, tag.description])
self.finish(buf.getvalue())
class ExportCodebookDoc(BaseHandler):
@authenticated
@export_doc
def get(self, project_id):
project = self.get_project(project_id)
tags = list(project.tags)
html = self.render_string('export_codebook.html', tags=tags)
return 'codebook', html
class ProjectEvents(BaseHandler):
@authenticated
async def get(self, project_id):
from_id = int(self.get_query_argument('from'))
project = self.get_project(project_id)
self.project_id = int(project_id)
# Merge right
while (right < len(merged) and
merged[right][0] <= rg[1]):
rg = (min(rg[0], merged[right][0]),
max(rg[1], merged[right][1]))
right += 1
# Insert
if left == right:
merged.insert(left, rg)
else:
merged[left:right] = [rg]
return merged
class ExportDocument(BaseHandler):
@authenticated
@export_doc
def get(self, project_id, document_id):
doc = self.get_document(project_id, document_id, True)
highlights = merge_overlapping_ranges((hl.start_offset, hl.end_offset)
for hl in doc.highlights)
html = self.render_string(
'export_document.html',
name=doc.name,
contents=Markup(extract.highlight(doc.contents, highlights)),
)
return doc.name, html