Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def signupfunc():
if request.method == 'POST':
robj = request.get_json()
user = User(username=robj['name'], email=robj['email'])
user.set_password(robj['password'])
db.session.add(user)
db.session.commit()
return 'request'
def save_file(name, data):
f = ext.storage.save(name, data)
name = f.name.decode('utf-8')
url = f.url.decode('utf-8')
ext.db.session.add(Upload(name=name, url=url))
ext.db.session.commit()
c = db.session.query(CI.ci_id).filter(CI.ci_id == second_ci).first()
if c is None:
return abort(404, "second_ci {0} is not existed".format(
second_ci))
existed = db.session.query(CIRelation.cr_id).filter(
CIRelation.first_ci_id == first_ci).filter(
CIRelation.second_ci_id == second_ci).first()
if existed is not None:
return existed.cr_id
cr = CIRelation()
cr.first_ci_id = first_ci
cr.second_ci_id = second_ci
if more is not None:
cr.more = more
cr.relation_type = relation_type
db.session.add(cr)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("add CIRelation is error, {0}".format(
str(e)))
return abort(400, "add CIRelation is error, {0}".format(str(e)))
# write history
his_manager = CIRelationHistoryManager()
his_manager.add(cr.cr_id, cr.first_ci_id, cr.second_ci_id,
relation_type, operate_type="add")
return cr.cr_id
sha="master")
for commit in commit_pages.iterator():
if last_commit and commit.sha == last_commit.sha:
print 'Next'
break
c = Commit.query.filter_by(sha=commit.sha).first()
if c is None:
c = Commit()
c.sha = commit.sha
c.message = commit.commit.message
c.repository = repo
c.date = commit.commit.author.date
if commit.author:
c.user = self.get_or_create_user(commit.author)
print 'New commit: ' + c.sha
db.session.add(c)
db.session.commit()
repo.last_commit_date = repo.last_commit.date
db.session.commit()
attr = db.session.query(CIAttribute).filter_by(attr_id=attr_id).first()
if not attr:
return False, "CI attribute you want to update is not existed"
choice_value = kwargs.get("choice_value", False)
is_choice = False
if choice_value:
is_choice = True
attr.attr_name = args[0]
attr.attr_alias = args[1]
if not args[1]:
attr.attr_alias = args[0]
attr.is_choice = is_choice
attr.is_multivalue = kwargs.get("is_multivalue", False)
attr.is_uniq = kwargs.get("is_uniq", False)
attr.value_type = kwargs.get("value_type", "text")
db.session.add(attr)
db.session.flush()
if is_choice:
self._add_choice_value(choice_value, attr.attr_id, attr.value_type)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("update attribute error, {0}".format(
str(e)))
return False, str(e)
CIAttributeCache.clean(attr)
return True, attr.attr_id
def get_orgs(self):
url = 'http://registry.usa.gov/accounts.json?service_id=github'
r = requests.get(url)
if r.status_code == 200:
for acct in r.json['accounts']:
org = Organization.query.filter_by(username=acct['account']).first()
if org is None:
org = Organization()
org.name = acct['organization']
org.username = acct['account']
db.session.add(org)
db.session.commit()
else:
print 'Error importing organizations'
def get_or_create_user(self, user):
u = User.query.filter_by(gh_id=user.id).first()
if u is None:
u = User()
u.gh_id = user.id
u.login = user.login
u.avatar_url = user.avatar_url
db.session.add(u)
db.session.commit()
return u