Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_head(case, expected):
assert _.head(case) == expected
if not _pairings.where(user_unique_id=user_unique_id, player_id=player_id):
# Insert pairing
pairing = DataObject(user_unique_id=user_unique_id, player_id=player_id)
_pairings.insert(pairing)
# Insert pair's user to cache
user = _.head(_users.where(unique_id=user_unique_id))
if user:
_cached_users.delete(unique_id=user.unique_id)
_cached_users.insert(DataObject(unique_id=user.unique_id, name=user.name))
# Insert pair's player to cache
result = _battle_players.where(id=player_id)
if not result:
result = _prebattle_players.where(id=player_id)
if not result:
result = _me_player.where(id=player_id)
player = _.head(result)
if player:
_cached_players.delete(id=player.id)
_cached_players.insert(DataObject(id=player.id, name=player.name))
# Notify listeners
messages.publish(PairingMessage("added", pairing))
players = Table()
players.create_index('id', unique=True)
pairings = Table()
pairings.create_index('player_id')
pairings.create_index('user_unique_id')
# Load old 0.6.x cache file
parser = ConfigParser.ConfigParser()
with open(source_filepath, "rb") as file:
parser.readfp(file)
# Build new cache structures
users.insert_many(DataObject(unique_id=id, name=name) for name, id in parser.items("TeamSpeakUsers"))
players.insert_many(DataObject(id=int(id), name=name) for name, id in parser.items("GamePlayers"))
for user_name, player_names in parser.items("UserPlayerPairings"):
userid = _.head(users.where(name=user_name)).unique_id
for player_name in list(csv.reader([player_names]))[0]:
playerid = _.head(players.where(name=player_name)).id
pairings.insert(DataObject(player_id=int(playerid), user_unique_id=userid))
# Remove users & players which do not exist in pairings
for user in users.clone():
if not pairings.where(user_unique_id=user.unique_id):
users.remove(user)
for player in players.clone():
if not pairings.where(player_id=player.id):
players.remove(player)
# create destination directory if it doesn't exist yet
if not os.path.isdir(dest_dirpath):
os.makedirs(dest_dirpath)
def self_desc(cls):
'''Method to get self description, used at init.'''
desc_list = [f'{get_class_name(cls)}:']
for k, v in get_class_attr(cls).items():
if k == 'spec':
desc_v = v['name']
elif ps.is_dict(v) or ps.is_dict(ps.head(v)):
desc_v = pformat(v)
else:
desc_v = v
desc_list.append(f'- {k} = {desc_v}')
desc = '\n'.join(desc_list)
return desc
def remove_user(id):
old_user = _.head(_users.where(id=id))
if not old_user:
return
_users.remove(old_user)
messages.publish(UserMessage("removed", old_user))
def get_my_vehicle():
player = _.head(_me_player)
if player:
return _.head(_vehicles.where(player_id=player.id))
def remove_vehicle(id):
vehicle = _.head(_vehicles.where(id=id))
if vehicle:
_vehicles.remove(vehicle)
messages.publish(VehicleMessage("removed", vehicle))
pairings = Table()
pairings.create_index('player_id')
pairings.create_index('user_unique_id')
# Load old 0.6.x cache file
parser = ConfigParser.ConfigParser()
with open(source_filepath, "rb") as file:
parser.readfp(file)
# Build new cache structures
users.insert_many(DataObject(unique_id=id, name=name) for name, id in parser.items("TeamSpeakUsers"))
players.insert_many(DataObject(id=int(id), name=name) for name, id in parser.items("GamePlayers"))
for user_name, player_names in parser.items("UserPlayerPairings"):
userid = _.head(users.where(name=user_name)).unique_id
for player_name in list(csv.reader([player_names]))[0]:
playerid = _.head(players.where(name=player_name)).id
pairings.insert(DataObject(player_id=int(playerid), user_unique_id=userid))
# Remove users & players which do not exist in pairings
for user in users.clone():
if not pairings.where(user_unique_id=user.unique_id):
users.remove(user)
for player in players.clone():
if not pairings.where(player_id=player.id):
players.remove(player)
# create destination directory if it doesn't exist yet
if not os.path.isdir(dest_dirpath):
os.makedirs(dest_dirpath)
# write out the new cache files
users.json_export(users_filepath)