Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
io = CfbIO(self.filename)
me = io["\001CompObj"]
self.assertEqual(me.tell(), 0)
self.assertEqual(me.seek(32), 32)
self.assertEqual(me.read(23), b('Microsoft Word-Dokument'))
self.assertEqual(me.tell(), 32 + 23)
self.assertEqual(me.seek(5, SEEK_CUR), 32 + 23 + 5)
self.assertEqual(me.read(9), b('MSWordDoc'))
self.assertEqual(me.seek(27, SEEK_END), 16 * 5 - 1)
self.assertEqual(me.read(8), b('Document'))
self.assertEqual(me.seek(0), 0)
data = me.read()
self.assertEqual(me.size, len(data))
self.assertTrue(b('Microsoft Word-Dokument') in data)
self.assertEqual(data.find(b('Microsoft Word-Dokument')), 32)
self.assertEqual(me.seek(1024), 1024)
self.assertEqual(me.read(16), b(''))
ACCOUNT_HEADER + six.b('\x06\x00\x00\x00\x00\x01\x00\x00')
]
)
def test_account_start(fake_socket, packets):
client = TACACSClient('127.0.0.1', 49, None, session_id=12345)
client._sock = fake_socket
reply = client.account('username', TAC_PLUS_ACCT_FLAG_START,
arguments=[b"service=shell", b"cmd=show", b"cmdargs=version"])
assert reply.valid
fake_socket.buff.seek(0)
first_header = TACACSHeader.unpacked(fake_socket.buff.read(12))
assert (first_header.version_max, first_header.version_min) == (12, 0)
first_body = fake_socket.buff.read(first_header.length)
assert TACACSAccountingStart(
'username', TAC_PLUS_ACCT_FLAG_START, TAC_PLUS_AUTHEN_METH_TACACSPLUS, TAC_PLUS_PRIV_LVL_MIN,
TAC_PLUS_AUTHEN_TYPE_ASCII, [b"service=shell", b"cmd=show", b"cmdargs=version"],
req.get_header('transfer-encoding') is not None
)
)
if requires_content_type:
content_type = (req.content_type.split(';', 1)[0].strip()
if req.content_type else '')
if not content_type:
raise falcon.HTTPMissingHeader('Content-Type')
elif content_type not in valid_content_types:
message = (
"Unexpected content type: {type}. Expected content types "
"are: {expected}."
).format(
type=six.b(req.content_type).decode('utf-8'),
expected=valid_content_types
)
raise falcon.HTTPUnsupportedMediaType(description=message)
def send_hundred_continue_response(self):
towrite = []
# 100 Continue status line
towrite.append(self.wfile_line)
# Optional headers
if self.hundred_continue_headers is not None:
# 100 Continue headers
for header in self.hundred_continue_headers:
towrite.append(six.b('%s: %s\r\n' % header))
# Blank line
towrite.append(b'\r\n')
self.wfile.writelines(towrite)
self.wfile.flush()
# Reinitialize chunk_length (expect more data)
self.chunk_length = -1
def ready(count):
if not isinstance(count, int):
raise TypeError('ready count must be an integer')
if count < 0:
raise ValueError('ready count cannot be negative')
return _command(RDY, None, six.b('{}'.format(count)))
self._conn = http_client.HTTPSConnection(
server, port, context=context
)
else:
self._conn = http_client.HTTPConnection(server, port)
self._conn.putrequest('POST', self._url)
self._conn.putheader('Transfer-Encoding', 'chunked')
for header in headers:
self._conn.putheader(header, headers[header])
self._conn.endheaders()
# Set blocking to False prevents recv
# from blocking while waiting for a response.
self._conn.sock.setblocking(False)
self._bytes = six.b('')
self._reset_retries()
time.sleep(0.5)
def _to_binary_string_py3(text):
"""
Converts a string to a binary string if it is not already one. Returns a str
in Python 2 and a bytes in Python3.
Do not use directly, use to_binary_string instead.
"""
if isinstance(text, six.binary_type):
return text
elif isinstance(text, six.string_types):
return six.b(text)
else:
raise Exception('only takes string types')
def produce_one_sample(data,
index,
split_id,
max_turn_num,
max_turn_len,
turn_cut_type='tail',
term_cut_type='tail'):
'''max_turn_num=10
max_turn_len=50
return y, nor_turns_nor_c, nor_r, turn_len, term_len, r_len
'''
c = data[six.b('c')][index]
r = data[six.b('r')][index][:]
y = data[six.b('y')][index]
turns = split_c(c, split_id)
#normalize turns_c length, nor_turns length is max_turn_num
nor_turns, turn_len = normalize_length(turns, max_turn_num, turn_cut_type)
nor_turns_nor_c = []
term_len = []
#nor_turn_nor_c length is max_turn_num, element is a list length is max_turn_len
for c in nor_turns:
#nor_c length is max_turn_len
nor_c, nor_c_len = normalize_length(c, max_turn_len, term_cut_type)
nor_turns_nor_c.append(nor_c)
term_len.append(nor_c_len)
nor_r, r_len = normalize_length(r, max_turn_len, term_cut_type)
elements[0], elements[1], elements[2], s)
zip_filename = join(directory, s + '-8.zip')
self.logger.debug('Reading text from {}'.format(zip_filename))
with ZipFile(zip_filename) as zip_file:
filename = join(s, s + '-8.txt')
try:
with zip_file.open(filename) as f:
encoded_text = f.read()
except KeyError:
# There might be zip files where the data file is in the root
filename = s + '-8.txt'
with zip_file.open(filename) as f:
encoded_text = f.read()
if encoded_text.find(b('Character set encoding: ISO-8859-1')) != -1:
text = encoded_text.decode('ISO-8859-1')
elif encoded_text.find(b('Character set encoding: ISO Latin-1')) != -1:
text = encoded_text.decode('Latin-1')
else:
raise LookupError('Unknown encoding for file {}'.format(filename))
if extract_body:
extracted_text = extract_text(text)
return extracted_text
else:
return text
def to_bytes(s):
if isinstance(s, six.string_types):
return six.b(s)
else:
return s