Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise NotImplementedError('Pipeline type "{0}" not permitted.'.format(pipeline_type))
if not onetime:
if pipeline_type == 'lambda':
spinnakerpipeline = pipeline.SpinnakerPipelineLambda(**kwargs)
elif pipeline_type == 's3':
spinnakerpipeline = pipeline.SpinnakerPipelineS3(**kwargs)
elif pipeline_type == 'datapipeline':
spinnakerpipeline = pipeline.SpinnakerPipelineDataPipeline(**kwargs)
elif pipeline_type in consts.MANUAL_TYPES:
spinnakerpipeline = pipeline.SpinnakerPipelineManual(**kwargs)
else:
# Handles all other pipelines
spinnakerpipeline = pipeline.SpinnakerPipeline(**kwargs)
else:
spinnakerpipeline = pipeline.SpinnakerPipelineOnetime(onetime=onetime, **kwargs)
spinnakerpipeline.create_pipeline()
def create_pipeline(self, onetime=None):
""" Creates the spinnaker pipeline """
utils.banner("Creating Pipeline")
if not onetime:
spinnakerpipeline = pipeline.SpinnakerPipeline(
app=self.app,
trigger_job=self.trigger_job,
prop_path=self.json_path,
base=None,
token_file=self.gitlab_token_path)
else:
spinnakerpipeline = pipeline.SpinnakerPipelineOnetime(
app=self.app,
trigger_job=self.trigger_job,
prop_path=self.json_path,
base=None,
token_file=self.gitlab_token_path,
onetime=onetime)
spinnakerpipeline.create_pipeline()