Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Account(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request):
info = {}
email = request.user.username
info['email'] = email
info['usage'] = seafserv_threaded_rpc.get_user_quota_usage(email)
info['total'] = seafserv_threaded_rpc.get_user_quota(email)
info['feedback'] = settings.DEFAULT_FROM_EMAIL
response = Response(200, [info])
return self.render(response)
class ReposView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request):
email = request.user.username
owned_repos = seafserv_threaded_rpc.list_owned_repos(email)
calculate_repo_info (owned_repos, email)
owned_repos.sort(lambda x, y: cmp(y.latest_modify, x.latest_modify))
n_repos = seafserv_threaded_rpc.list_share_repos(email,
'to_email', -1, -1)
calculate_repo_info (n_repos, email)
owned_repos.sort(lambda x, y: cmp(y.latest_modify, x.latest_modify))
repos_json = []
return api_error(request, '400')
parent_dir = os.path.dirname(path)
new_dir_name = os.path.basename(path)
new_dir_name = check_filename_with_rename(repo_id, parent_dir, new_dir_name)
try:
seafserv_threaded_rpc.post_dir(repo_id, parent_dir, new_dir_name,
request.user.username)
except SearpcError, e:
return api_error(request, '421', e.msg)
return reloaddir_if_neccessary (request, repo_id, parent_dir)
class OpRenameView(ResponseMixin, View):
@api_login_required
def get(self, request, repo_id):
return api_error(request, '407')
@api_login_required
def post(self, request, repo_id):
resp = check_repo_access_permission(request, get_repo(repo_id))
if resp:
return resp
path = request.GET.get('p')
newname = request.POST.get("newname")
if not path or path[0] != '/' or not newname:
return api_error(request, '400')
if form.is_valid():
auth_login(request, form.get_user())
info = {}
email = request.user.username
info['email'] = email
info['feedback'] = settings.DEFAULT_FROM_EMAIL
info['sessionid'] = request.session.session_key
info['usage'] = seafserv_threaded_rpc.get_user_quota_usage(email)
info['total'] = seafserv_threaded_rpc.get_user_quota(email)
return HttpResponse(json.dumps([info]), status=200, content_type=json_content_type)
else:
return api_error(request, '408')
class Ping(ResponseMixin, View):
def get(self, request):
response = HttpResponse(json.dumps("pong"), status=200, content_type=json_content_type)
if request.user.is_authenticated():
response["logined"] = True
else:
response["logined"] = False
return response
class Account(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request):
info = {}
email = request.user.username
"owner":group.group_name,
"name":r.name,
"desc":r.desc,
"mtime":r.latest_modify,
"root":r.root,
"size":r.size,
"encrypted":r.encrypted,
"password_need":r.password_need,
}
repos_json.append(repo)
response = Response(200, repos_json)
return self.render(response)
class RepoView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get_repo_info(request, repo_id):
# check whether user can view repo
repo = get_repo(repo_id)
if not repo:
return api_error(request, '404')
if not can_access_repo(request, repo.id):
return api_error(request, '403')
# check whether use is repo owner
if validate_owner(request, repo_id):
owner = "self"
else:
mail_sended.send(sender=None, user=request.user.username,
email=to_email)
c = {
'email': request.user.username,
'to_email': to_email,
'file_shared_link': file_shared_link,
}
try:
send_mail('您的好友通过SeaCloud分享了一个文件给您',
t.render(Context(c)), None, [to_email],
fail_silently=False)
except:
return api_error(request, '502')
return HttpResponse(json.dumps(file_shared_link), status=200, content_type=json_content_type)
class RepoFileIdView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request, repo_id, file_id):
repo = get_repo(repo_id)
resp = check_repo_access_permission(request, repo)
if resp:
return resp
file_name = request.GET.get('file_name', file_id)
return get_repo_file (request, repo_id, file_id, file_name, 'download')
class RepoFilePathView(ResponseMixin, View):
if oldname == newname:
return api_error(request, '420', 'The new name is the same to the old')
newname = check_filename_with_rename(repo_id, parent_dir, newname)
try:
seafserv_threaded_rpc.rename_file (repo_id, parent_dir, oldname,
newname, request.user.username)
except SearpcError,e:
return api_error(request, '420', "SearpcError:" + e.msg)
return reloaddir_if_neccessary (request, repo_id, parent_dir)
class OpMoveView(ResponseMixin, View):
@api_login_required
def get(self, request):
return api_error(request, '407')
@api_login_required
def post(self, request):
src_repo_id = request.POST.get('src_repo')
src_dir = unquote(request.POST.get('src_dir')).decode('utf-8')
dst_repo_id = request.POST.get('dst_repo')
dst_dir = unquote(request.POST.get('dst_dir')).decode('utf-8')
op = request.POST.get('operation')
obj_names = request.POST.get('obj_names')
if not (src_repo_id and src_dir and dst_repo_id \
and dst_dir and op and obj_names):
@api_login_required
def post(self, request, repo_id):
resp = check_repo_access_permission(request, get_repo(repo_id))
if resp:
return resp
op = request.GET.get('op', 'setpassword')
if op == 'setpassword':
return HttpResponse(json.dumps("success"), status=200,
content_type=json_content_type)
return HttpResponse(json.dumps("unsupported operation"), status=200,
content_type=json_content_type)
class RepoDirPathView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request, repo_id):
repo = get_repo(repo_id)
resp = check_repo_access_permission(request, repo)
if resp:
return resp
current_commit = get_commits(repo_id, 0, 1)[0]
path = request.GET.get('p', '/')
if path[-1] != '/':
path = path + '/'
dir_id = None
try:
try:
if op == 'cp':
seafserv_threaded_rpc.copy_file (src_repo_id, src_dir, obj_name,
dst_repo_id, dst_dir, new_obj_name,
request.user.username)
elif op == 'mv':
seafserv_threaded_rpc.move_file (src_repo_id, src_dir, obj_name,
dst_repo_id, dst_dir, new_obj_name,
request.user.username)
except SearpcError, e:
return api_error(request, '419', "SearpcError:" + e.msg)
return reloaddir_if_neccessary (request, dst_repo_id, dst_dir)
class OpUploadView(ResponseMixin, View):
@api_login_required
def post(self, request):
return api_error(request, '407')
@api_login_required
def get(self, request, repo_id):
repo = get_repo(repo_id)
if check_permission(repo_id, request.user.username) == 'rw':
token = seafserv_rpc.web_get_access_token(repo_id,
'dummy',
'upload',
request.user.username)
else:
return api_error(request, '403')
class RepoFileIdView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request, repo_id, file_id):
repo = get_repo(repo_id)
resp = check_repo_access_permission(request, repo)
if resp:
return resp
file_name = request.GET.get('file_name', file_id)
return get_repo_file (request, repo_id, file_id, file_name, 'download')
class RepoFilePathView(ResponseMixin, View):
@api_login_required
def get(self, request, repo_id):
repo = get_repo(repo_id)
resp = check_repo_access_permission(request, repo)
if resp:
return resp
path = request.GET.get('p', None)
if not path:
return api_error(request, '413')
file_id = None
try:
file_id = seafserv_threaded_rpc.get_file_id_by_path(repo_id,
path.encode('utf-8'))
if not dir_id:
return api_error(request, '410')
old_oid = request.GET.get('oid', None)
if old_oid and old_oid == dir_id :
response = HttpResponse(json.dumps("uptodate"), status=200,
content_type=json_content_type)
response["oid"] = dir_id
return response
else:
return get_dir_entrys_by_id(request, dir_id)
class RepoDirIdView(ResponseMixin, View):
renderers = (JSONRenderer,)
@api_login_required
def get(self, request, repo_id, dir_id):
repo = get_repo(repo_id)
resp = check_repo_access_permission(request, repo)
if resp:
return resp
return get_dir_entrys_by_id(request, dir_id)
def get_shared_link(request, repo_id, path):
l = FileShare.objects.filter(repo_id=repo_id).filter(\
username=request.user.username).filter(path=path)
token = None
if len(l) > 0: