Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from .response import Response, response_xref
from .part import Part
from .primitives import Int, Bool
error_struct = struct.Struct("!" + Int.fmt)
log = logging.getLogger(__name__)
class MultiHeader(Part):
"""
"""
parts = (
("type", Int),
("done", Bool),
("error", Int),
)
class TransactionRequest(Request):
"""
"""
opcode = 14
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.requests = []
def add(self, request):
self.requests.append(request)
def parse(cls, buff, offset):
"""
Parses a raw buffer at offset and returns the resulting array value.
Starts off by `parse()`-ing the 32-bit element count, followed by
parsing items out of the buffer "count" times.
"""
count, offset = Int.parse(buff, offset)
values = []
for _ in range(count):
value, new_offset = cls.item_class.parse(buff, offset)
values.append(value)
offset = new_offset
return values, offset