Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _toft_path(self, path):
return requests.get_toft_style_path(path, self._job_config.hadoop_config_path)
def reset_counter(self, name):
"""
将一个counter清零, 若 name 中不包含 group 部分, 则默认将 Flume group 下面对应的 counter 清零
Args:
name (str): counter名称,其说明请参考 :mod:`counter模块`
Raises:
error.BigflowRuntimeException: 此方法不允许在 :mod:`Bigflow变换` 的用户自定义方法(UDF)中调用,否则抛出此异常
"""
if os.getenv("__PYTHON_IN_REMOTE_SIDE", None) is not None:
raise error.BigflowRuntimeException("reset_counter cannot be called at runtime")
requests.reset_counter(name)
serde: 设置dataset的serde对象
Returns:
PType: 表示该内存变量的P类型
"""
objector = options.get("serde", self.default_objector())
local_input_path = "./.local_input"
if os.path.isfile(local_input_path):
raise error.BigflowPlanningException("file ./.local_input exist, "
"cannot use it as temp directory")
if not os.path.exists(local_input_path):
os.makedirs(local_input_path)
file_name = os.path.abspath(local_input_path + "/" + str(uuid.uuid4()))
requests.write_record(file_name,
utils.flatten_runtime_value(dataset),
objector)
self._local_temp_files.append(file_name)
node = self.read(input.SequenceFile(file_name, **options)).node()
nested_level, ptype = utils.detect_ptype(dataset)
if nested_level < 0:
return utils.construct(self, node, ptype)
else:
from bigflow.transform_impls import group_by
for i in range(0, nested_level + 1):
node = group_by.node_group_by(