Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _generate_resource_message(self):
folder = entity.ENTITY_FOLDER
if os.path.exists(folder):
for file_name in os.listdir(folder):
src_file = os.path.join(folder, file_name)
target_file = os.path.join(entity.FLUME_WORKER_ENTITY_FOLDER, file_name)
self.add_file(src_file, target_file)
def _sorted_hooks(hooks_dict):
result = []
for name in sorted(hooks_dict.keys()):
result.append(hooks_dict[name])
return result
import copy
resource = copy.deepcopy(self._resource)
from bigflow.core.serde import cloudpickle
resource.add_file_from_bytes(
cloudpickle.dumps(_sorted_hooks(self._init_hooks)),
".init_hooks")
resource.add_file_from_bytes(
cloudpickle.dumps(_sorted_hooks(self._fini_hooks)),
def _generate_resource_message(self):
folder = entity.ENTITY_FOLDER
if os.path.exists(folder):
for file_name in os.listdir(folder):
src_file = os.path.join(folder, file_name)
target_file = os.path.join(entity.FLUME_WORKER_ENTITY_FOLDER, file_name)
self.add_file(src_file, target_file)
def _sorted_hooks(hooks_dict):
result = []
for name in sorted(hooks_dict.keys()):
result.append(hooks_dict[name])
return result
import copy
resource = copy.deepcopy(self._resource)
from bigflow.core.serde import cloudpickle
resource.add_file_from_bytes(
def __init__(self, **options):
self._hadoop_client_instance = None
self._id = str(uuid.uuid4())
self._plan = logical_plan.LogicalPlan()
self._plan.set_environment(entity.PythonEnvironment())
self._resource = python_resource.Resource()
self._type_str = "RawPipeline"
self.estimate_concurrency = False
self._job_config = None
# todo(yexianjin): determine to use list or set
self._local_temp_files = list()
self._remote_temp_files = list()
self._cache_node_ids = list()
self._uri_to_write = list()
# pipeline before and after run hooks
self._before_run_hooks = dict()
self._after_run_hooks = dict()