Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Note: [int] means a list of ints, it can accept any number of ints you want,
but [int, str] can only accecpt a list who has exactly 2 elements.
(if your data to serialize has more than 2 elements, the remaining elements may be
lost)
"""
from bigflow.core import entity
if hasattr(rtype, 'serialize') and hasattr(rtype, 'deserialize'):
return rtype
if isinstance(rtype, Serde):
return rtype
if isinstance(rtype, entity.EntitiedBySelf):
raise Exception("Not Support this serde :", rtype)
if isinstance(rtype, type) and issubclass(rtype, message.Message):
return proto_of(rtype)
serde = {
tuple: lambda: tuple_of(*rtype),
list: lambda: list_of(*tuple(rtype)),
set: lambda: set_of(list(rtype)[0]),
dict: lambda: dict_of(rtype.keys()[0], rtype.values()[0])
}
if type(rtype) in serde:
return serde[type(rtype)]()
"""
import sys
import struct
import marshal
from google.protobuf import message
from bigflow.core.serde import omnitypes_objector
from bigflow.core.serde import cloudpickle
from bigflow.core import entity
from bigflow import error
from bigflow_python.proto import types_pb2
class Serde(entity.EntitiedBySelf):
"""
所有python实现的serde类要求继承于此类,用户可以通过继承Serde实现自定义的序列化/反序列化器
>>> from bigflow import serde
>>>
>>> class CustomSerde(serde.Serde):
... def serialize(self, obj):
... assert(isinstance(obj, MyCostomObject))
... return str(obj)
... def deserialize(self, buf):
... return MyCostomObject(buf)
在大部分的变换中都可以传递一个特殊的叫serde的参数,
设置该参数即可指定产出的PCollection的serde。
p.map(lambda x: MyCostomObject(x), serde=CustomSerde())