Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def transfer_data():
"""Transfer missing data, terminate script on failure."""
if not await _transfer_data():
await send_manager_command(
ExecutorProtocol.UPDATE,
extra_fields={
ExecutorProtocol.UPDATE_CHANGESET: {
"process_error": ["Failed to transfer data."],
"status": DATA_META["STATUS_ERROR"],
}
},
)
await send_manager_command(ExecutorProtocol.ABORT, expect_reply=False)
sys.exit(1)
async def _sequential():
"""Run some things sequentially but asynchronously."""
try:
# Try to obtains exclusive lock over stdout and jsonout files.
# When lock can not be obtained this task is probably a duplicate
# spawned by celery so lock error and skip processing.
log_file, json_file = get_stdout_json_file()
await transfer_data()
await run_executor(log_file, json_file)
await collect_files()
except FileExistsError:
logger.error("Stdout or jsonout file already exists, aborting.")
await send_manager_command(ExecutorProtocol.ABORT, expect_reply=False)
return True
except DataTransferError:
logger.exception(
"Data transfer error downloading data with id {}, retry {} out of".format(
data_id, retry, RETRIES
)
)
except Exception:
logger.exception(
"Unknown error downloading data with id {}, retry {} out of".format(
data_id, retry, RETRIES
)
)
finally:
if access_log_id is not None:
await send_manager_command(
ExecutorProtocol.STORAGE_LOCATION_UNLOCK,
expect_reply=False,
extra_fields={
ExecutorProtocol.STORAGE_ACCESS_LOG_ID: access_log_id
},
)
# None od the retries has been successfull, abort the download.
await send_manager_command(
ExecutorProtocol.DOWNLOAD_ABORTED,
expect_reply=False,
extra_fields={ExecutorProtocol.STORAGE_LOCATION_ID: to_storage_location_id},
)
return False
message = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
logger.error("Unhandled exception in executor: {}".format(message))
loop.run_until_complete(
asyncio.gather(
*logging_future_list,
manager_commands.send_manager_command(
ExecutorProtocol.UPDATE,
extra_fields={
ExecutorProtocol.UPDATE_CHANGESET: {
"process_error": ["Unhandled exception in executor."],
"status": DATA_META["STATUS_ERROR"],
}
},
),
manager_commands.send_manager_command(
ExecutorProtocol.ABORT, expect_reply=False
),
def handle_exception(exc_type, exc_value, exc_traceback):
"""Log unhandled exceptions."""
message = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
logger.error("Unhandled exception in executor: {}".format(message))
loop.run_until_complete(
asyncio.gather(
*logging_future_list,
manager_commands.send_manager_command(
ExecutorProtocol.UPDATE,
extra_fields={
ExecutorProtocol.UPDATE_CHANGESET: {
"process_error": ["Unhandled exception in executor."],
"status": DATA_META["STATUS_ERROR"],
}
},
),
manager_commands.send_manager_command(
ExecutorProtocol.ABORT, expect_reply=False
),
# Make file importable from outside executor environment
from .global_settings import DATA, EXECUTOR_SETTINGS
from .manager_commands import send_manager_command
logger.debug("Collecting files for data object with id {}".format(DATA["id"]))
reply = await send_manager_command(ExecutorProtocol.GET_REFERENCED_FILES)
refs = reply[ExecutorProtocol.REFERENCED_FILES]
base_dir = EXECUTOR_SETTINGS["DATA_DIR"]
collected, _ = collect_and_purge(base_dir, refs)
base_dir = Path(base_dir)
collected_objects = [
get_transfer_object(base_dir / object_, base_dir) for object_ in collected
]
await send_manager_command(
ExecutorProtocol.REFERENCED_FILES,
extra_fields={ExecutorProtocol.REFERENCED_FILES: collected_objects},
)
},
)
access_log_id = response[ExecutorProtocol.STORAGE_ACCESS_LOG_ID]
if objects is None:
response = await send_manager_command(
ExecutorProtocol.GET_FILES_TO_DOWNLOAD,
extra_fields={
ExecutorProtocol.STORAGE_LOCATION_ID: from_storage_location_id,
},
)
objects = response[ExecutorProtocol.REFERENCED_FILES]
t = Transfer(from_connector, to_connector)
t.transfer_objects(missing_data["url"], objects)
await send_manager_command(
ExecutorProtocol.DOWNLOAD_FINISHED,
extra_fields={
ExecutorProtocol.STORAGE_LOCATION_ID: to_storage_location_id
},
)
return True
except DataTransferError:
logger.exception(
"Data transfer error downloading data with id {}, retry {} out of".format(
data_id, retry, RETRIES
)
)
except Exception:
logger.exception(
"Unknown error downloading data with id {}, retry {} out of".format(
data_id, retry, RETRIES
async def _transfer_data():
"""Fetch missing data.
Get a list of missing storage locations from the manager fetch
data using appropriate storage connectors.
"""
result = await send_manager_command(ExecutorProtocol.MISSING_DATA_LOCATIONS)
data_to_transfer = result[ExecutorProtocol.STORAGE_DATA_LOCATIONS]
to_connector = connectors["local"]
base_path = to_connector.base_path
data_downloading = []
# Notify manager to change status of the data object.
if data_to_transfer:
await send_manager_command(
ExecutorProtocol.UPDATE,
extra_fields={
ExecutorProtocol.UPDATE_CHANGESET: {
"status": DATA_META["STATUS_PREPARING"]
}
},
)
async def _send_manager_command(self, *args, **kwargs):
"""Send an update to manager and terminate the process if it fails."""
try:
await send_manager_command(*args, **kwargs)
except RuntimeError:
await self.terminate()