Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import logging
import numpy as np
from . import dsp
from . import equalizer
from . import common
log = logging.getLogger(__name__)
class Detector:
COHERENCE_THRESHOLD = 0.9
CARRIER_DURATION = sum(equalizer.prefix)
CARRIER_THRESHOLD = int(0.9 * CARRIER_DURATION)
SEARCH_WINDOW = int(0.1 * CARRIER_DURATION)
START_PATTERN_LENGTH = SEARCH_WINDOW // 4
def __init__(self, config, pylab):
self.freq = config.Fc
self.omega = 2 * np.pi * self.freq / config.Fs
self.Nsym = config.Nsym
self.Tsym = config.Tsym
self.maxlen = config.baud # 1 second of symbols
self.max_offset = config.timeout * config.Fs
self.plt = pylab
def _wait(self, samples):
counter = 0
bufs = collections.deque([], maxlen=self.maxlen)
def test_detect():
P = sum(equalizer.prefix)
t = np.arange(P * config.Nsym) * config.Ts
x = np.cos(2 * np.pi * config.Fc * t)
detector = detect.Detector(config, pylab=common.Dummy())
samples, amp, freq_err = detector.run(x)
assert abs(1 - amp) < 1e-12
assert abs(freq_err) < 1e-12
x = np.cos(2 * np.pi * (2*config.Fc) * t)
with pytest.raises(ValueError):
detector.run(x)
with pytest.raises(ValueError):
detector.max_offset = 0
detector.run(x)
def test_prefix():
omega = 2 * np.pi * config.Fc / config.Fs
symbol = np.cos(omega * np.arange(config.Nsym))
signal = np.concatenate([c * symbol for c in equalizer.prefix])
def symbols_stream(signal):
sampler = sampling.Sampler(signal)
return dsp.Demux(sampler=sampler, omegas=[omega], Nsym=config.Nsym)
r = recv.Receiver(config, pylab=common.Dummy())
r._prefix(symbols_stream(signal))
with pytest.raises(ValueError):
silence = 0 * signal
r._prefix(symbols_stream(silence))
def _prefix(self, symbols, gain=1.0):
S = common.take(symbols, len(equalizer.prefix))
S = S[:, self.carrier_index] * gain
sliced = np.round(np.abs(S))
self.plt.figure()
self.plt.subplot(1, 2, 1)
self._constellation(S, sliced, 'Prefix')
bits = np.array(sliced, dtype=int)
self.plt.subplot(1, 2, 2)
self.plt.plot(np.abs(S))
self.plt.plot(equalizer.prefix)
errors = (bits != equalizer.prefix)
if any(errors):
msg = 'Incorrect prefix: {0} errors'.format(sum(errors))
raise ValueError(msg)
log.debug('Prefix OK')
def _prefix(self, symbols, gain=1.0):
S = common.take(symbols, len(equalizer.prefix))
S = S[:, self.carrier_index] * gain
sliced = np.round(np.abs(S))
self.plt.figure()
self.plt.subplot(1, 2, 1)
self._constellation(S, sliced, 'Prefix')
bits = np.array(sliced, dtype=int)
self.plt.subplot(1, 2, 2)
self.plt.plot(np.abs(S))
self.plt.plot(equalizer.prefix)
errors = (bits != equalizer.prefix)
if any(errors):
msg = 'Incorrect prefix: {0} errors'.format(sum(errors))
raise ValueError(msg)
log.debug('Prefix OK')
def start(self):
for value in equalizer.prefix:
self.write(self.pilot * value)
symbols = self.equalizer.train_symbols(equalizer.equalizer_length)
signal = self.equalizer.modulator(symbols)
self.write(self.silence)
self.write(signal)
self.write(self.silence)
def _prefix(self, symbols, gain=1.0):
S = common.take(symbols, len(equalizer.prefix))
S = S[:, self.carrier_index] * gain
sliced = np.round(np.abs(S))
self.plt.figure()
self.plt.subplot(1, 2, 1)
self._constellation(S, sliced, 'Prefix')
bits = np.array(sliced, dtype=int)
self.plt.subplot(1, 2, 2)
self.plt.plot(np.abs(S))
self.plt.plot(equalizer.prefix)
errors = (bits != equalizer.prefix)
if any(errors):
msg = 'Incorrect prefix: {0} errors'.format(sum(errors))
raise ValueError(msg)
log.debug('Prefix OK')