Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Copyright (c) 2017 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: GPL-3.0
from testtools import TestCase
from column import APIRunner
class TestAPIRunner(TestCase):
def setUp(self):
super(TestAPIRunner, self).setUp()
def test_run_module_on_localhost(self):
api_runner = APIRunner()
api_runner.run_module('localhost', remote_user=None)
def test_run_playbook_on_localhost(self):
api_runner = APIRunner()
pb_path = './tests/fixtures/playbooks/hello_world.yml'
result = api_runner.run_playbook(pb_path, 'localhost,',
remote_user=None, connection='local')
self.assertEqual('', result['error_msg'])
self.assertEqual(0, len(result['unreachable_hosts']))
self.assertEqual(0, len(result['failed_hosts']))
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import testtools
from keystoneclient.middleware import memcache_crypt
class MemcacheCryptPositiveTests(testtools.TestCase):
def _setup_keys(self, strategy):
return memcache_crypt.derive_keys(b'token', b'secret', strategy)
def test_constant_time_compare(self):
# make sure it works as a compare, the "constant time" aspect
# isn't appropriate to test in unittests
ctc = memcache_crypt.constant_time_compare
self.assertTrue(ctc('abcd', 'abcd'))
self.assertTrue(ctc('', ''))
self.assertFalse(ctc('abcd', 'efgh'))
self.assertFalse(ctc('abc', 'abcd'))
self.assertFalse(ctc('abc', 'abc\x00'))
self.assertFalse(ctc('', 'abc'))
# For Python 3, we want to test these functions with both str and bytes
# as input.
if not os.path.isfile(ssh_key):
continue
# Ensure that it is a real ssh key
with open(ssh_key, "rb") as stream:
if stream.readline() != b"-----BEGIN RSA PRIVATE KEY-----\n":
continue
candidates.append(ssh_key)
# Sort the keys by modification time, pick the most recent key
candidates.sort(key=lambda f: os.stat(f).st_mtime, reverse=True)
logger.debug("Available ssh public keys: %r", candidates)
if not candidates:
raise LookupError("Unable to find any private ssh key")
return candidates[0]
class SnapsTestCase(testtools.TestCase):
def __init__(self, *args, **kwargs):
# match base snap src path on current
relative_path = os.path.relpath(
os.path.dirname(inspect.getfile(self.__class__)), os.path.dirname(__file__)
)
self.src_dir = os.path.join(*re.findall("(.*?)_tests/?(.*)", relative_path)[0])
super().__init__(*args, **kwargs)
def setUp(self):
filter_ = config.get("filter", None)
if filter_:
if not re.match(filter_, self.snap_content_dir):
self.skipTest(
"{} does not match the filter {}".format(
self.snap_content_dir, filter_
)
# under the License.
import glob
import re
import docutils.core
import testtools
# Used for new sections introduced during a release.
# - "History" introduced in Liberty should be
# mandatory for M.
OPTIONAL_SECTIONS = ("History",)
class TestTitles(testtools.TestCase):
def _get_title(self, section_tree):
section = {
'subtitles': [],
}
for node in section_tree:
if node.tagname == 'title':
section['name'] = node.rawsource
elif node.tagname == 'section':
subsection = self._get_title(node)
section['subtitles'].append(subsection['name'])
return section
def _get_titles(self, spec):
titles = {}
for node in spec:
if node.tagname == 'section':
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import testtools
from molecule.provisioners import DockerProvisioner
from molecule.core import Molecule
import yaml
from molecule.ansible_playbook import AnsiblePlaybook
class TestDockerProvisioner(testtools.TestCase):
def setUp(self):
super(TestDockerProvisioner, self).setUp()
# Setup mock molecule
self._mock_molecule = Molecule(dict())
self.temp = '/tmp/test_config_load_defaults_external_file.yml'
data = {
'molecule': {
'molecule_dir': '.test_molecule',
'inventory_file': 'tests/ansible_inventory'
},
'docker': {
'containers': [
{'name': 'test1',
'image': 'ubuntu',
'image_version': 'latest',
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import pprint
from syntribos.utils import string_utils
class TestDebugLogger(testtools.TestCase):
def test_sanitize_dicts(self):
content = {"creds": {"password": "12345"}, "sl.no": 1}
sanitized_content = {"creds": {"password": "****"}, "sl.no": 1}
self.assertEqual(sanitized_content,
string_utils.sanitize_secrets(content))
def test_sanitize_strings(self):
content = "password = 12344"
sanitized_content = "password = ****"
self.assertEqual(sanitized_content,
string_utils.sanitize_secrets(content))
def test_compress(self):
content = "Sample data for compression"
encoded_content = "eJwLTswtyElVSEksSVRIyy9SSM7PLShKLS7OzM8DAIvJClY="
},
"updated_at": {
"type": "string",
"description": "Date and time of the last resource "
"type association modification "
"(READ-ONLY)",
"format": "date-time"
},
}
}
)
}
}
class TestResoureTypeController(testtools.TestCase):
def setUp(self):
super(TestResoureTypeController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = metadefs.ResourceTypeController(self.api,
self.schema_api)
def test_list_resource_types(self):
resource_types = list(self.controller.list())
names = [rt.name for rt in resource_types]
self.assertEqual([RESOURCE_TYPE1, RESOURCE_TYPE2], names)
def test_get_resource_types(self):
resource_types = list(self.controller.get(NAMESPACE1))
names = [rt.name for rt in resource_types]
self.assertEqual([RESOURCE_TYPE3, RESOURCE_TYPE4], names)
def test_revert_to_default_marks_document_as_dirty(self):
document = FakeDocument()
start_revision = document.revision
fragment = DocumentFragment(
document, None, "value", None, {"default": "value"})
fragment.revert_to_default()
self.assertNotEqual(document.revision, start_revision)
def test_default_value(self):
fragment = DocumentFragment(
None, None, "other value", None, {"default": "value"})
self.assertEqual(fragment.default_value, "value")
class DocumentFragmentMiscPropertiesTests(TestCase):
"""
Miscellaneous tests
"""
def test_document_works(self):
document = object()
fragment = DocumentFragment(document, None, {})
self.assertIs(fragment._document, document)
self.assertIs(fragment.document, document)
def test_parent_works(self):
parent = object()
fragment = DocumentFragment(None, parent, None, {})
self.assertIs(fragment._parent, parent)
self.assertIs(fragment.parent, parent)
'expected_body': None,
}),
('simple', {
'test_args': {'body': "Foo bar baz."},
'expected_body': "Foo bar baz.",
}),
]
def test_has_expected_body(self):
""" Should have default `body` attribute. """
instance = version.ChangeLogEntry(**self.test_args)
self.assertEqual(self.expected_body, instance.body)
class ChangeLogEntry_as_version_info_entry_TestCase(
testscenarios.WithScenarios, testtools.TestCase):
""" Test cases for ‘ChangeLogEntry.as_version_info_entry’ attribute. """
scenarios = [
('default', {
'test_args': {},
'expected_result': collections.OrderedDict([
('release_date', version.ChangeLogEntry.default_release_date),
('version', version.ChangeLogEntry.default_version),
('maintainer', None),
('body', None),
]),
}),
]
def setUp(self):
""" Set up test fixtures. """
result = None
try:
result = lfunction()
LOG.info('No Exception in %d second',
time.time() - start_time)
return result
except exc_class as exc:
if exc_matcher is not None:
res = exc_matcher.match(exc)
if res is not None:
LOG.info(res)
raise exc
# Let the other exceptions propagate
dtime = time.time() - start_time
if dtime > CONF.boto.build_timeout:
raise testtools.TestCase\
.failureException("Wait timeout exceeded! (%ds)" % dtime)
time.sleep(CONF.boto.build_interval)