Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_demux():
freqs = np.array([1e3, 2e3])
omegas = 2 * np.pi * freqs / config.Fs
carriers = [dsp.exp_iwt(2*np.pi*f/config.Fs, config.Nsym) for f in freqs]
syms = [3, 2j]
sig = np.dot(syms, carriers)
res = dsp.Demux(sampling.Sampler(sig.real), omegas, config.Nsym)
res = np.array(list(res))
assert np.max(np.abs(res - syms)) < 1e-12
def symbols_stream(signal):
sampler = sampling.Sampler(signal)
return dsp.Demux(sampler=sampler, omegas=[omega], Nsym=config.Nsym)
r = recv.Receiver(config, pylab=common.Dummy())
def run(self, sampler, gain, output):
log.debug('Receiving')
symbols = dsp.Demux(sampler, omegas=self.omegas, Nsym=self.Nsym)
self._prefix(symbols, gain=gain)
filt = self._train(sampler, order=10, lookahead=10)
sampler.equalizer = lambda x: list(filt(x))
bitstream = self._demodulate(sampler, symbols)
bitstream = itertools.chain.from_iterable(bitstream)
for frame in framing.decode_frames(bitstream):
output.write(frame)
self.output_size += len(frame)
def frame_iter(config, src, frame_length):
frame_size = frame_length * config.Nsym * config.sample_size
omegas = 2 * np.pi * np.array(config.frequencies) / config.Fs
while True:
data = src.read(frame_size)
if len(data) < frame_size:
return
data = common.loads(data)
frame = data - np.mean(data)
sampler = sampling.Sampler(frame)
symbols = dsp.Demux(sampler, omegas, config.Nsym)
symbols = np.array(list(symbols))
coeffs = np.mean(np.abs(symbols) ** 2, axis=0) ** 0.5
peak = np.max(np.abs(frame))
total = np.sqrt(np.dot(frame, frame) / (0.5 * len(frame)))
yield coeffs, peak, total