Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _withdraw_identity(conn, **kwargs):
"""Run a server operation to withdraw an identity."""
args = {k: v for k, v in kwargs.items() if v is not None}
op = Operation(SortingHatSchema.SortingHatMutation)
op.withdraw(**args)
op.withdraw.uuid()
result = conn.execute(op)
return result['data']['withdraw']['uuid']
def _remove_domain(client, **kwargs):
"""Run a server operation to remove a domain from the registry."""
args = {k: v for k, v in kwargs.items() if v is not None}
op = Operation(SortingHatSchema.SortingHatMutation)
op.delete_domain(**args)
op.delete_domain.domain.domain()
result = client.execute(op)
return result['data']['deleteDomain']['domain']['domain']
def _remove_identity(conn, **kwargs):
"""Run a server operation to remove an identity from the registry."""
args = {k: v for k, v in kwargs.items() if v is not None}
op = Operation(SortingHatSchema.SortingHatMutation)
op.delete_identity(**args)
op.delete_identity.individual().mk()
result = conn.execute(op)
individual = result['data']['deleteIdentity']['individual']
return individual is not None
def _add_organization(client, **kwargs):
"""Run a server operation to add an organization to the registry."""
args = {k: v for k, v in kwargs.items() if v is not None}
op = Operation(SortingHatSchema.SortingHatMutation)
op.add_organization(**args)
op.add_organization.organization.name()
result = client.execute(op)
return result['data']['addOrganization']['organization']['name']
def _check_organization_domain(client, organization, domain):
"""Run a server operation to check if a domain belongs to an organization."""
op = Operation(SortingHatSchema.Query)
filters = SortingHatSchema.OrganizationFilterType(name=organization)
op.organizations(filters=filters)
op.organizations.page_info.total_results()
op.organizations.entities.domains.domain()
result = client.execute(op)
data = op + result
total = data.organizations.page_info.total_results
if total == 0:
error = "{} not found in the registry".format(organization)
exc = click.ClickException(error)
exc.exit_code = 9
raise exc
def _generate_individuals_operation(page, uuid):
"""Define an operation to get the list of individuals."""
args = {
'page': page
}
if uuid:
args['filters'] = {'uuid': uuid}
op = Operation(SortingHatSchema.Query)
op.individuals(**args)
# Select page information
op.individuals().page_info.has_next()
# Select identities information
individual = op.individuals().entities()
individual.mk()
individual.is_locked()
individual.profile().__fields__('name', 'email',
'gender', 'is_bot')
individual.profile().country().__fields__('code', 'name')
individual.identities().__fields__('uuid', 'email', 'name', 'username', 'source')
individual.enrollments().__fields__('start', 'end')
def _merge_individuals(conn, **kwargs):
"""Run a server operation to merge individuals."""
args = {k: v for k, v in kwargs.items() if v is not None}
op = Operation(SortingHatSchema.SortingHatMutation)
op.merge(**args)
op.merge.uuid()
result = conn.execute(op)
return result['data']['merge']['uuid']
def _generate_orgs_operation(page):
"""Define an operation to get the list of organizations."""
op = Operation(SortingHatSchema.Query)
op.organizations(page=page)
op.organizations().page_info.has_next()
op.organizations().entities().name()
op.organizations().entities().domains().__fields__('domain', 'is_top_domain')
return op
def _update_profile(conn, uuid, **kwargs):
"""Run a server operation to update the profile of an individual."""
args = {k: v for k, v in kwargs.items() if v is not None}
pd = SortingHatSchema.ProfileInputType(**args)
op = Operation(SortingHatSchema.SortingHatMutation)
op.update_profile(uuid=uuid, data=pd)
op.update_profile().individual().mk()
op.update_profile().individual().profile().__fields__('name', 'email',
'gender', 'is_bot')
op.update_profile().individual().profile().country().__fields__('code', 'name')
result = conn.execute(op)
data = op + result
return data.update_profile.individual