Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
project_config = self.make_snapcraft_project(
textwrap.dedent(
"""\
parts:
part1:
plugin: nil
part2:
plugin: nil
after: [part1]
"""
)
)
# Stage dependency
lifecycle.execute(steps.STAGE, project_config, part_names=["part1"])
# Build dependent
lifecycle.execute(steps.BUILD, project_config, part_names=["part2"])
def _fake_dirty_report(self, step):
if step == steps.STAGE:
return pluginhandler.DirtyReport(
dirty_properties={"foo"}, dirty_project_options={"bar"}
)
return None
# Should stage no problem
with mock.patch.object(
pluginhandler.PluginHandler, "get_dirty_report", _fake_dirty_report
):
lifecycle.execute(steps.STAGE, project_config, part_names=["part1"])
if not part_name or part_name in ("main", "dependent", "nested-dependent"):
events.extend(
[
dict(part="main", step=steps.PULL),
dict(part="main", step=steps.BUILD),
dict(part="main", step=steps.STAGE),
]
)
if not part_name or part_name in ("dependent", "nested-dependent"):
events.extend(
[
dict(part="dependent", step=steps.PULL),
dict(part="dependent", step=steps.BUILD),
dict(part="dependent", step=steps.STAGE),
]
)
if not part_name or part_name == "nested-dependent":
events.extend(
[
dict(part="nested-dependent", step=steps.PULL),
dict(part="nested-dependent", step=steps.BUILD),
dict(part="nested-dependent", step=steps.STAGE),
]
)
return events
step = step.previous_step()
self.expectThat(step, Equals(steps.PULL))
self.assertIsNone(step.previous_step())
def test_next_step_handles_none(self):
self.assertThat(steps.next_step(None), Equals(steps.PULL))
class NextStepsTestCase(unit.TestCase):
scenarios = [
(
"pull",
{
"step": steps.PULL,
"expected_steps": [steps.BUILD, steps.STAGE, steps.PRIME],
},
),
("build", {"step": steps.BUILD, "expected_steps": [steps.STAGE, steps.PRIME]}),
("stage", {"step": steps.STAGE, "expected_steps": [steps.PRIME]}),
("prime", {"step": steps.PRIME, "expected_steps": []}),
]
def test_next_steps(self):
self.assertThat(self.step.next_steps(), Equals(self.expected_steps))
class PreviousStepsTestCase(unit.TestCase):
scenarios = [
("pull", {"step": steps.PULL, "expected_steps": []}),
("build", {"step": steps.BUILD, "expected_steps": [steps.PULL]}),
def test_build_is_outdated(self):
self.handler.mark_build_done()
self.assertFalse(
self.handler.is_outdated(steps.BUILD),
"Build step was unexpectedly outdated",
)
# Now mark the pull state as done, but ensure it has a later
# timestamp, thereby making the build step out of date.
build_state_path = states.get_step_state_file(
self.handler.plugin.statedir, steps.BUILD
)
pull_state_path = states.get_step_state_file(
self.handler.plugin.statedir, steps.PULL
)
open(pull_state_path, "w").close()
self.set_modified_time_later(pull_state_path, build_state_path)
self.assertTrue(
self.handler.is_outdated(steps.BUILD), "Expected build step to be outdated"
)
def _migrate_state_file(self):
# In previous versions of Snapcraft, the state directory was a file.
# Rather than die if we're running on output from an old version,
# migrate it for them.
if os.path.isfile(self.plugin.statedir):
with open(self.plugin.statedir, "r") as f:
step = f.read()
if step:
os.remove(self.plugin.statedir)
os.makedirs(self.plugin.statedir)
self.mark_done(steps.get_step_by_name(step))
def run(self, step: steps.Step, part_names=None):
if part_names:
self.parts_config.validate(part_names)
# self.config.all_parts is already ordered, let's not lose that
# and keep using a list.
parts = [p for p in self.config.all_parts if p.name in part_names]
processed_part_names = part_names
else:
parts = self.config.all_parts
processed_part_names = self.config.part_names
with config.CLIConfig() as cli_config:
for current_step in step.previous_steps() + [step]:
if current_step == steps.STAGE:
# XXX check only for collisions on the parts that have
# already been built --elopio - 20170713
pluginhandler.check_for_collisions(self.config.all_parts)
for part in parts:
self._handle_step(part_names, part, step, current_step, cli_config)
self._create_meta(step, processed_part_names)
def _cleanup_parts_dir(parts_dir, local_plugins_dir, parts):
if os.path.exists(parts_dir):
logger.info("Cleaning up parts directory")
for subdirectory in os.listdir(parts_dir):
path = os.path.join(parts_dir, subdirectory)
if path != local_plugins_dir:
try:
shutil.rmtree(path)
except NotADirectoryError:
os.remove(path)
for part in parts:
part.mark_cleaned(steps.BUILD)
part.mark_cleaned(steps.PULL)
def _get_steps_run(part: pluginhandler.PluginHandler) -> Set[steps.Step]:
steps_run = set() # type: Set[steps.Step]
for step in steps.STEPS:
if not part.should_step_run(step):
steps_run.add(step)
return steps_run