Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, pipeline, proto_file):
import os
bigflow_python_home = os.getenv("BIGFLOW_PYTHON_HOME")
module_name = proto_file.split('/')[-1].split('.')[0]
proto_path = '/'.join(proto_file.split('/')[:-1])
command = "%s/bigflow/bin/fast-pb-build %s %s %s" \
%(bigflow_python_home, module_name, proto_path, proto_file)
result = os.system(command)
if result != 0:
raise error.BigflowRuntimeException("Failed to fast-pb-build")
egg_file = '%s/pb_modules/%s/dist/proto_wrapper-1.0-py2.7-linux-x86_64.egg' \
%(bigflow_python_home, module_name)
pipeline.add_egg_file(egg_file)
def _path_exists(self, path):
if path_util.is_hdfs_path(path):
return self._hadoop_client().fs_test(path, self._hadoop_config)
return os.path.exists(path)
def _print_counters(self):
# print counters after run
c_dict = counter._get_all(grouped=True)
if len(c_dict) > 0:
logger.info("=========================================================")
logger.info("all counters:")
for group in sorted(c_dict.iterkeys()):
logger.info("\t%s:" % group)
for k, v in c_dict[group].iteritems():
logger.info("\t\t%s=%d" % (k, v))
def _handle_new_writtens(self):
if len(self._uri_to_write) > 0:
logger.info("=========================================================")
logger.info("all outputs:")
for uri in self._uri_to_write:
logger.info("\t%s" % uri)
self._uri_to_write[:] = []
def _handle_new_writtens(self):
if len(self._uri_to_write) > 0:
logger.info("=========================================================")
logger.info("all outputs:")
for uri in self._uri_to_write:
logger.info("\t%s" % uri)
self._uri_to_write[:] = []
def _handle_new_writtens(self):
if len(self._uri_to_write) > 0:
logger.info("=========================================================")
logger.info("all outputs:")
for uri in self._uri_to_write:
logger.info("\t%s" % uri)
self._uri_to_write[:] = []
def _print_counters(self):
# print counters after run
c_dict = counter._get_all(grouped=True)
if len(c_dict) > 0:
logger.info("=========================================================")
logger.info("all counters:")
for group in sorted(c_dict.iterkeys()):
logger.info("\t%s:" % group)
for k, v in c_dict[group].iteritems():
logger.info("\t\t%s=%d" % (k, v))
def _print_counters(self):
# print counters after run
c_dict = counter._get_all(grouped=True)
if len(c_dict) > 0:
logger.info("=========================================================")
logger.info("all counters:")
for group in sorted(c_dict.iterkeys()):
logger.info("\t%s:" % group)
for k, v in c_dict[group].iteritems():
logger.info("\t\t%s=%d" % (k, v))
def _print_counters(self):
# print counters after run
c_dict = counter._get_all(grouped=True)
if len(c_dict) > 0:
logger.info("=========================================================")
logger.info("all counters:")
for group in sorted(c_dict.iterkeys()):
logger.info("\t%s:" % group)
for k, v in c_dict[group].iteritems():
logger.info("\t\t%s=%d" % (k, v))
def _generate_resource_message(self):
folder = entity.ENTITY_FOLDER
if os.path.exists(folder):
for file_name in os.listdir(folder):
src_file = os.path.join(folder, file_name)
target_file = os.path.join(entity.FLUME_WORKER_ENTITY_FOLDER, file_name)
self.add_file(src_file, target_file)
def _sorted_hooks(hooks_dict):
result = []
for name in sorted(hooks_dict.keys()):
result.append(hooks_dict[name])
return result
import copy
resource = copy.deepcopy(self._resource)
from bigflow.core.serde import cloudpickle
resource.add_file_from_bytes(
cloudpickle.dumps(_sorted_hooks(self._init_hooks)),
".init_hooks")
resource.add_file_from_bytes(
cloudpickle.dumps(_sorted_hooks(self._fini_hooks)),