Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
pickling_support.install()
while True:
try:
index = self.queue.get()
if index == 'stop':
self.queue.task_done()
break
except ConnectionRefusedError:
time.sleep(.1)
continue
item = self.session.items[index]
try:
run_test(self.session, item, None)
except BaseException:
import pickle
import sys
import multiprocessing
import multiprocessing.connection
import tempfile
import shutil
import os
import sys
import inspect
import importlib
import concurrent.futures
import contextlib
import time
import random
import string
from tblib import pickling_support
pickling_support.install()
import forge
import anyio
from async_generator import aclosing
@contextlib.contextmanager
def compile_temp_proto(*relative_proto_paths):
modules = []
with tempfile.TemporaryDirectory() as temp_dir:
sys.path.insert(0, temp_dir)
try:
for relative_proto_path in relative_proto_paths:
proto_path = os.path.join(os.path.dirname(
inspect.currentframe().f_back.f_back.f_globals['__file__']),
relative_proto_path)
be put them on the redis queue. Then, we swap back in the original exception
when reading off the queue. These operations are performed by
`swap_in_serializable` and `swap_back_original`, respectively.
"""
from __future__ import absolute_import
import dill
try:
from _pytest import outcomes
except ImportError:
from _pytest import runner as outcomes
from _pytest._code import code
from tblib import pickling_support
pickling_support.install()
class Skipped(Exception):
"""placeholder for outcomes.Skipped which is not serializable"""
class Failed(Exception):
"""placeholder for outcomes.Failed which is not serializable"""
class UnserializableException(Exception):
"""placeholder for any Exceptions that cannnot be serialized"""
SERIALIZE_TYPES = {outcomes.Skipped: Skipped,
outcomes.Failed: Failed}
# limitations under the License.
from hashlib import md5
from json import load, dumps as json_dumps
from base64 import b64decode, b64encode
from pickle import dumps, loads
from os import path
from . import OneServer
from tblib import pickling_support
from sys import exc_info
from six import reraise
from collections import OrderedDict
from gzip import open
from os import environ
pickling_support.install()
def read_fixture_file(fixture_file):
f = open(fixture_file, "rt")
ret = load(f)
f.close()
return ret
def write_fixture_file(fixture_file, obj):
f = open(fixture_file, "wb")
f.write(json_dumps(obj).encode())
f.close()
class OneServerTester(OneServer):
'''
"""
Set test result in internal dictionary. Updates UI.
Args:
test_id: An unique string test identifier.
"""
update_listbox = False
if not test_id in self.test_data:
self.test_data[test_id] = {
'id': test_id
}
update_listbox = True
if extracted_traceback:
py_traceback = Traceback.from_dict(extracted_traceback).as_traceback()
extracted_traceback = traceback.extract_tb(py_traceback)
output += ''.join(
traceback.format_list(extracted_traceback) +
[exc_value]
)
test_data = self.test_data[test_id]
test_data['exc_type'] = exc_type
test_data['exc_value'] = exc_value
test_data['exc_tb'] = extracted_traceback
if when == 'call' and last_failed_exempt is not None:
test_data['last_failed_exempt'] = last_failed_exempt
# Ignore success, except for the 'call' step
# ignore successive failure, take only the first
if (outcome != 'passed' or when == 'call') \
"""
ids = []
for v in request.args.get('ids', '').split(','):
id = int(v.strip())
if id > 0:
ids.append(id)
if len(ids) == 0:
raise BadRequest()
query = Address.query.filter(Address.id.in_(ids))
addresses = {address.id: AddressSchema().dump(address)
for address in query}
return json_response(addresses=addresses)
def delete_cart_product(id):
"""删除购物车商品
"""
cart_product = CartProduct.query.filter(CartProduct.id == id).first()
if cart_product is None:
return json_response(ResponseCode.NOT_FOUND)
session.delete(cart_product)
session.commit()
return json_response(cart_product=CartProductSchema().dump(cart_product))
"""
user_id = request.args.get('user_id', type=int)
order_direction = request.args.get('order_direction', 'desc')
limit = request.args.get(
'limit', current_app.config['PAGINATION_PER_PAGE'], type=int)
offset = request.args.get('offset', 0, type=int)
order_by = Order.id.asc() if order_direction == 'asc' else Order.id.desc()
query = Order.query
if user_id is not None:
query = query.filter(Order.user_id == user_id)
total = query.count()
query = query.order_by(order_by).limit(limit).offset(offset)
return json_response(orders=OrderSchema().dump(query, many=True), total=total)
def update_product(id):
"""更新商品
"""
data = request.get_json()
count = Product.query.filter(Product.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
product = Product.query.get(id)
session.commit()
return json_response(product=ProductSchema().dump(product))