Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def iterlist(x, *args, **kwargs):
x = np.array(x)
return list(
(i, list(x))
for i, x in common.iterate(x, index=True, *args, **kwargs)
)
def _wait(self, samples):
counter = 0
bufs = collections.deque([], maxlen=self.maxlen)
for offset, buf in common.iterate(samples, self.Nsym, index=True):
if offset > self.max_offset:
raise ValueError('Timeout waiting for carrier')
bufs.append(buf)
coeff = dsp.coherence(buf, self.omega)
if abs(coeff) > self.COHERENCE_THRESHOLD:
counter += 1
else:
counter = 0
if counter == self.CARRIER_THRESHOLD:
return offset, bufs
raise ValueError('No carrier detected')
def _to_bytes(bits):
converter = BitPacker()
for chunk in common.iterate(data=bits, size=8,
func=tuple, truncate=True):
yield [converter.to_byte[chunk]]
def encode(self, bits):
for bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
yield self.encode_map[bits_tuple]
def encode(self, bits):
for bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
yield self.encode_map[bits_tuple]
def decode(self, symbols, error_handler=None):
symbols_vec = self.symbols
_dec = self.decode_list
for syms in common.iterate(symbols, self.buf_size, truncate=False):
for received in syms:
error = np.abs(symbols_vec - received)
index = np.argmin(error)
decoded, bits = _dec[index]
if error_handler:
error_handler(received=received, decoded=decoded)
yield bits
def modulate(self, bits):
bits = itertools.chain(bits, self.padding)
Nfreq = len(self.carriers)
symbols_iter = common.iterate(self.modem.encode(bits), size=Nfreq)
for i, symbols in enumerate(symbols_iter, 1):
self.write(np.dot(symbols, self.carriers))
if i % self.iters_per_report == 0:
total_bits = i * Nfreq * self.modem.bits_per_symbol
log.debug('Sent %10.3f kB', total_bits / 8e3)