Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
obj_json_unpickable_true = '{"id": 1,' \
' "key1": "key1",' \
' "key2": "key2",' \
' "object1": {"id": 11,' \
' "key1": "keychild1",' \
' "key2": "keychild2",' \
' "py/object": "tests.util.test_json.MyChildClass"},' \
' "object2": {"id": 12,' \
' "key1": "keychild1",' \
' "key2": "keychild2",' \
' "py/object": "tests.util.test_json.MyChildClass"},' \
' "py/object": "tests.util.test_json.MyClass"}'
# Force sorted keys to be able to compare results (Python 3 sorts by default)
jsonpickle.set_encoder_options('simplejson', sort_keys=True)
def test_object_from_json():
assert obj == from_json(obj_json_unpickable_true)
def test_dict_from_json():
assert obj_dict == from_json(obj_json)
def test_object_to_json():
assert obj_json == to_json(obj)
def test_dict_to_json():
assert obj_json == to_json(obj_dict)
def random_string(prefix, maxlen):
symbols = string.ascii_letters + string.digits + string.punctuation + " "*10
return prefix + "".join([random.choice(symbols) for i in range(random.randrange(maxlen))])
testdata = [Group(name="", header="", footer="")] + [
Group(name=random_string("name", 10), header=random_string("header", 7), footer=random_string("footer", 5))
for i in range(n)
# for name in ["", random_string("name", 10)]
# for header in ["", random_string("header", 7)]
# for footer in ["", random_string("footer", 5)]
]
file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", f)
with open(file, "w") as out:
jsonpickle.set_encoder_options("json", indent=2)
out.write(jsonpickle.encode(testdata))
def write_json(self):
path = os.path.join(self.galaxy.output_path, 'galaxy.json')
jsonpickle.set_encoder_options('simplejson', sort_keys=True, indent=2)
galaxy_json = jsonpickle.encode(self.galaxy, unpicklable=False, max_depth=14)
with open(path, 'w+', encoding='utf8') as f:
f.write(galaxy_json)
for sector in self.galaxy.sectors.values():
path = os.path.join(self.galaxy.output_path, sector.sector_name() + " Sector.sector.json")
sector_json = jsonpickle.encode(sector, unpicklable=False, max_depth=14)
with open(path, 'w+', encoding='utf8') as f:
f.write(sector_json)
path = os.path.join(self.galaxy.output_path, 'ranges.json')
with open(path, "w+", encoding='utf8') as f:
f.write(jsonpickle.encode(json_graph.node_link_data(self.galaxy.ranges)))
path = os.path.join(self.galaxy.output_path, 'stars.json')
with open(path, "w+", encoding='utf8') as f:
f.write(jsonpickle.encode(json_graph.node_link_data(self.galaxy.stars)))
def getJson(self):
self.load(None)
jsonpickle.set_encoder_options('json', sort_keys=True, indent=3)
json = jsonpickle.encode(self.data, unpicklable=False)
log.L.debug('Encoded SDP JSON: %s' % json)
return json
# -*- coding: utf-8 -*-
from skpy import Skype, SkypeEventLoop, SkypeNewMessageEvent
import requests
from pprint import pprint
import jsonpickle, json
import logging
from logging.handlers import RotatingFileHandler
from flask import Flask, request, redirect, url_for, jsonify
import re
jsonpickle.set_preferred_backend('json')
jsonpickle.set_encoder_options('json', ensure_ascii=False);
app = Flask(__name__)
clusterChat='YOUR_CHAT_GROUP_ID_HERE'
skypeLogin="LOGIN_HERE" #for newly registred skypes it usually email, not login!
skypePass="PASS_HERE"
slackGetUserUrl='https://slack.com/api/users.info'
slackToken='SLACK_TOCKEN_HERE'
#это оформляем отдельным приложением, которое вызывается по событию из слака. учесть что подключения надо кешировать
def remove_html_tags(text):
return super(customPickler, self)._get_flattener(obj)
class UserError(Exception):
"""
user errors should be caught and re-thrown with this
Be warned that this exception can throw an exception. Yes, you read that right. I apolagize in advance.
:raises: ValueError (varsSoFar gets pickled into JSON, which may result in any number of errors depending on what types are inside)
"""
def __init__(self, message, varsSoFar={}):
super().__init__(message)
self.varsSoFar = pickle_user_vars(varsSoFar)
jsonpickle.pickler.Pickler = customPickler
jsonpickle.set_encoder_options('json', ensure_ascii=False)
jsonpickle.set_encoder_options('json', allow_nan=False) # nan is not deseriazable by javascript
for handler in handlers:
jsonpickle.handlers.register(handler['type'], handler['handler'])
# copy all special vars (we want execd code to have similar locals as actual code)
# not copying builtins cause exec adds it in
# also when specialVars is deepCopied later on deepcopy cant handle builtins anyways
startingLocals = {}
specialVars = ['__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__']
for var in specialVars:
startingLocals[var] = locals()[var]
oldSavedLines = []
savedLocals = {}
def get_imports(parsedText, text):
"""
def save(self, file: str = None) -> None:
if file is None:
file = self.save_file
jsonpickle.set_encoder_options('json', sort_keys=True, indent=4)
if os.path.dirname(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
f.write(jsonpickle.encode(self))
logger(__name__).debug('Saved {}'.format(file))
d.addCallback(validateRouterDetails)
d.addCallback(initiateRouterNormalization)
d.addBoth(stop_reactor)
reactor.run()
if d.result is not None:
for device in d.result.keys():
m = re.search(r"\[OK\]", d.result[device]["write mem"])
if m is not None:
print "Device {}: Configuration Saved".format(device)
else:
print "Device {}: Warning no [OK] in Output".format(device)
stateFile = open("routerstate.json", "w")
jsonpickle.set_encoder_options('json', sort_keys=True, indent=4)
data = jsonpickle.encode(routers)
stateFile.write(data)
import requests
import jsonpickle
from requests_oauthlib import OAuth1
from urllib.parse import parse_qs, urlencode
import cherrypy
from collections import defaultdict
import json
import os
import re
from collections import defaultdict
# For readable serializations
jsonpickle.set_encoder_options('json', sort_keys=True, indent=4)
class LocalCache(object):
""" Generic class for encapsulating twitter credential caching """
server_data_template = "{}.server"
user_data_template = "{0}.user.{1}"
def __init__(self, backup = "tmp/twitter.cache"):
self.backup = backup #Unique identifier for the backup of this cache
self.memcache = {
"users" : defaultdict(lambda : {}),
"server": defaultdict(lambda : {})
}
self.deserialize()
def users(self):