Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(size, chan=None, df=0, success=True, reader=None, cfg=None):
if cfg is None:
cfg = config.fastest()
tx_data = os.urandom(size)
tx_audio = BytesIO()
main.send(config=cfg, src=BytesIO(tx_data), dst=tx_audio, gain=0.5)
data = tx_audio.getvalue()
data = common.loads(data)
if chan is not None:
data = chan(data)
if df:
sampler = sampling.Sampler(data, sampling.Interpolator())
sampler.freq += df
data = sampler.take(len(data))
data = common.dumps(data)
rx_audio = BytesIO(data)
rx_data = BytesIO()
dump = BytesIO()
if reader:
rx_audio = reader(rx_audio)
try:
result = main.recv(config=cfg, src=rx_audio, dst=rx_data,
def test_dumps_loads():
x = np.array([.1, .4, .2, .6, .3, .5])
y = common.loads(common.dumps(x))
assert all(x == y)
def test_resample():
x = np.sin(2*np.pi * 10 * np.linspace(0, 1, 1001))
src = BytesIO(common.dumps(x))
dst = BytesIO()
sampling.resample(src=src, dst=dst, df=0.0)
y = common.loads(dst.getvalue())
err = x[:len(y)] - y
assert np.max(np.abs(err)) < 1e-4
dst = BytesIO()
sampling.resample(src=BytesIO(b'\x00\x00'), dst=dst, df=0.0)
assert dst.tell() == 0
def frame_iter(config, src, frame_length):
frame_size = frame_length * config.Nsym * config.sample_size
omegas = 2 * np.pi * np.array(config.frequencies) / config.Fs
while True:
data = src.read(frame_size)
if len(data) < frame_size:
return
data = common.loads(data)
frame = data - np.mean(data)
sampler = sampling.Sampler(frame)
symbols = dsp.Demux(sampler, omegas, config.Nsym)
symbols = np.array(list(symbols))
coeffs = np.mean(np.abs(symbols) ** 2, axis=0) ** 0.5
peak = np.max(np.abs(frame))
total = np.sqrt(np.dot(frame, frame) / (0.5 * len(frame)))
yield coeffs, peak, total
def recv(config, src, dst, dump_audio=None, pylab=None):
if dump_audio:
src = stream.Dumper(src, dump_audio)
reader = stream.Reader(src, data_type=common.loads)
signal = itertools.chain.from_iterable(reader)
log.debug('Skipping %.3f seconds', config.skip_start)
common.take(signal, int(config.skip_start * config.Fs))
pylab = pylab or common.Dummy()
detector = detect.Detector(config=config, pylab=pylab)
receiver = _recv.Receiver(config=config, pylab=pylab)
try:
log.info('Waiting for carrier tone: %.1f kHz', config.Fc / 1e3)
signal, amplitude, freq_error = detector.run(signal)
freq = 1 / (1.0 + freq_error) # receiver's compensated frequency
log.debug('Frequency correction: %.3f ppm', (freq - 1) * 1e6)
gain = 1.0 / amplitude