Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if chan is not None:
data = chan(data)
if df:
sampler = sampling.Sampler(data, sampling.Interpolator())
sampler.freq += df
data = sampler.take(len(data))
data = common.dumps(data)
rx_audio = BytesIO(data)
rx_data = BytesIO()
dump = BytesIO()
if reader:
rx_audio = reader(rx_audio)
try:
result = main.recv(config=cfg, src=rx_audio, dst=rx_data,
dump_audio=dump, pylab=None)
finally:
rx_audio.close()
rx_data = rx_data.getvalue()
assert data.startswith(dump.getvalue())
assert result == success
if success:
assert rx_data == tx_data
def receiver_thread():
with self._audio_interface() as interface:
src = interface.recorder()
dst = BytesIO()
amodem.main.recv(config=self.modem_config, src=src, dst=dst)
return dst.getvalue()
def receiveAudio():
with AUDIO_INTERFACE:
s = AUDIO_INTERFACE.recorder()
src = async.AsyncReader(stream=s, bufsize=s.bufsize)
dst = StringIO.StringIO()
mainAudio.recv(CONFIG, src=src, dst=dst)
self.debugWindow.setText(dst.getvalue())
def receiver_thread():
with self._audio_interface() as interface:
src = interface.recorder()
dst = BytesIO()
amodem.main.recv(config=self.modem_config, src=src, dst=dst)
return dst.getvalue()
main=lambda config, args: main.recv(
config, src=args.src, dst=wrap(Decompressor, args.dst, args.zlib),
pylab=args.pylab, dump_audio=args.dump
),