Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return self.render('project_new.html',
name=name, description=description,
error="Please enter a name")
# Create project
project = database.Project(name=name, description=description)
self.db.add(project)
# Add user as admin
membership = database.ProjectMember(
project=project,
user_login=self.current_user,
privileges=database.Privileges.ADMIN
)
self.db.add(membership)
# Add default set of tags
self.db.add(database.Tag(project=project, path='interesting',
description="Further review required"))
self.db.add(database.Tag(project=project, path='people',
description="Known people"))
self.db.commit()
self.redirect(self.reverse_url('project', project.id))
try:
validate.project_name(name)
validate.project_description(description)
# Create project
project = database.Project(name=name, description=description)
self.db.add(project)
# Add user as admin
membership = database.ProjectMember(
project=project,
user_login=self.current_user,
privileges=database.Privileges.ADMIN
)
self.db.add(membership)
# Add default tags
self.db.add(database.Tag(
project=project,
# TRANSLATORS: Default tag name
path=self.gettext("interesting"),
# TRANSLATORS: Default tag description
description=self.gettext("Further review required")),
)
self.db.commit()
return self.redirect(self.reverse_url('project', project.id))
except validate.InvalidFormat as e:
logger.info("Error validating ProjectAdd: %r", e)
return self.render('project_new.html',
name=name, description=description,
error=self.gettext(e.message))
error="Please enter a name")
# Create project
project = database.Project(name=name, description=description)
self.db.add(project)
# Add user as admin
membership = database.ProjectMember(
project=project,
user_login=self.current_user,
privileges=database.Privileges.ADMIN
)
self.db.add(membership)
# Add default set of tags
self.db.add(database.Tag(project=project, path='interesting',
description="Further review required"))
self.db.add(database.Tag(project=project, path='people',
description="Known people"))
self.db.commit()
self.redirect(self.reverse_url('project', project.id))
def post(self, project_id):
obj = self.get_json()
project = self.get_project(project_id)
tag = database.Tag(project=project,
path=obj['path'], description=obj['description'])
try:
self.db.add(tag)
self.db.flush() # Need to flush to get tag.id
except IntegrityError:
self.db.rollback()
self.set_status(409)
return self.finish()
cmd = database.Command.tag_add(
self.current_user,
tag,
)
self.db.add(cmd)
self.db.commit()
self.db.refresh(cmd)
self.application.notify_project(project.id, cmd)
def post(self, project_id):
project, privileges = self.get_project(project_id)
if not privileges.can_add_tag():
self.set_status(403)
return self.send_json({'error': "Unauthorized"})
try:
obj = self.get_json()
validate.tag_path(obj['path'])
validate.tag_description(obj['description'])
tag = database.Tag(project=project,
path=obj['path'],
description=obj['description'])
try:
self.db.add(tag)
self.db.flush() # Need to flush to get tag.id
except IntegrityError:
self.db.rollback()
self.set_status(409)
return self.finish()
cmd = database.Command.tag_add(
self.current_user,
tag,
)
self.db.add(cmd)
self.db.commit()
self.db.refresh(cmd)
def get_highlights_for_export(self, project_id, path):
project, _ = self.get_project(project_id)
if path:
tag = aliased(database.Tag)
hltag = aliased(database.HighlightTag)
highlights = (
self.db.query(database.Highlight)
.options(joinedload(database.Highlight.document))
.join(hltag, hltag.highlight_id == database.Highlight.id)
.join(tag, hltag.tag_id == tag.id)
.filter(tag.path.startswith(path))
.filter(tag.project == project)
.order_by(database.Highlight.document_id,
database.Highlight.start_offset)
).all()
name = None
else:
# Special case to select all highlights: we also need to select
# highlights that have no tag at all
document = aliased(database.Document)
sheet = workbook.add_worksheet('highlights')
header = workbook.add_format({'bold': True})
sheet.write(0, 0, 'id', header)
sheet.write(0, 1, 'document', header)
sheet.write(0, 2, 'tag', header)
sheet.write(0, 3, 'content', header)
sheet.set_column(0, 0, 5.0)
sheet.set_column(1, 1, 15.0)
sheet.set_column(2, 2, 15.0)
sheet.set_column(3, 3, 80.0)
row = 1
for hl in highlights:
tags = hl.tags
assert all(isinstance(t, database.Tag) for t in tags)
if tags:
tags = [t.path for t in tags]
else:
tags = ['']
for tag_path in tags:
sheet.write(row, 0, str(hl.id))
sheet.write(row, 1, hl.document.name)
sheet.write(row, 2, tag_path)
sheet.write(row, 3, convert.html_to_plaintext(hl.snippet))
row += 1
workbook.close()
with open(filename, 'rb') as fp:
chunk = fp.read(4096)
self.write(chunk)
while len(chunk) == 4096:
chunk = fp.read(4096)
highlight_id = Column(Integer, ForeignKey('highlights.id',
ondelete='CASCADE'),
primary_key=True)
highlight = relationship('Highlight')
tag_id = Column(Integer, ForeignKey('tags.id',
ondelete='CASCADE'),
primary_key=True)
tag = relationship('Tag')
Tag.highlights_count = column_property(
select(
[functions.count(HighlightTag.highlight_id)],
).where(
HighlightTag.tag_id == Tag.id,
).correlate_except(HighlightTag)
)
def connect(db_url):
"""Connect to the database using an environment variable.
"""
logger.info("Connecting to SQL database %r", db_url)
kwargs = {}
if db_url.startswith('sqlite:'):
kwargs['connect_args'] = {'check_same_thread': False}
engine = create_engine(db_url, **kwargs)
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
if db_url.startswith('sqlite:'):
@sqlalchemy.event.listens_for(sqlalchemy.engine.Engine, "connect")
def post(self, project_id, tag_id):
project, privileges = self.get_project(project_id)
if not privileges.can_update_tag():
self.set_status(403)
return self.send_json({'error': "Unauthorized"})
try:
obj = self.get_json()
tag = self.db.query(database.Tag).get(int(tag_id))
if tag is None or tag.project_id != project.id:
self.set_status(404)
return self.send_json({'error': "No such tag"})
if obj:
if 'path' in obj:
validate.tag_path(obj['path'])
tag.path = obj['path']
if 'description' in obj:
validate.tag_description(obj['description'])
tag.description = obj['description']
cmd = database.Command.tag_add(
self.current_user,
tag,
)
try:
self.db.add(cmd)
def tag_add(cls, user_login, tag):
assert isinstance(tag, Tag)
return cls(
user_login=user_login,
project_id=tag.project_id,
payload={'type': 'tag_add', # keep in sync above
'id': tag.id,
'path': tag.path,
'description': tag.description},
)