Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_snapshot_info(self, **kwargs):
"""Execute get_snapshot_info API."""
res_details = self._get_res_details(
'/cgi-bin/disk/snapshot.cgi?',
func='extra_get',
LUNIndex=kwargs['lun_index'],
snapshot_list='1',
snap_start='0',
snap_count='100',
sid=self.sid)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.VolumeBackendAPIException(
data=_('Session id expired'))
if root.find('result').text < '0':
raise exception.VolumeBackendAPIException(
data=_('Unexpected response from QNAP API'))
snapshot_list = root.find('SnapshotList')
if snapshot_list is None:
return None
snapshot_tree = snapshot_list.findall('row')
for snapshot in snapshot_tree:
if (kwargs['snapshot_name'] ==
snapshot.find('snapshot_name').text):
return snapshot
def execute_login(self):
"""Login and return sid."""
params = OrderedDict(
pwd=base64.b64encode(self.password.encode('utf-8')).decode(),
serviceKey='1',
user=self.username,
)
encoded_params = urllib.parse.urlencode(params)
url = ('/cgi-bin/authLogin.cgi?')
res_details = self._execute_and_get_response_details(
self.ip, url, encoded_params)
root = ET.fromstring(res_details['data'])
LOG.debug('execute_login data: %s', res_details['data'])
session_id = root.find('authSid').text
LOG.debug('execute_login session_id: %s', session_id)
return session_id
def delete_snapshot_api(self, snapshot_id):
"""Execute CGI to delete snapshot by snapshot id."""
res_details = self._get_res_details(
'/cgi-bin/disk/snapshot.cgi?',
func='del_snapshots',
snapshotID=snapshot_id,
sid=self.sid)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.VolumeBackendAPIException(
data=_('Session id expired'))
# snapshot not exist
if root.find('result').text == '-206021':
return
# lun not exist
if root.find('result').text == '-200005':
return
if root.find('result').text < '0':
raise exception.VolumeBackendAPIException(
data=_('delete snapshot %s failed') % snapshot_id)
def get_one_lun_info(self, lunID):
"""Execute get_one_lun_info API."""
res_details = self._get_res_details(
'/cgi-bin/disk/iscsi_portal_setting.cgi?',
func='extra_get',
lun_info='1',
lunID=lunID,
sid=self.sid)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.VolumeBackendAPIException(
data=_('Session id expired'))
else:
return res_details
def convert_queue_status(queue_status_output):
def leaf_to_dict(leaf):
return [{sub_child.tag: sub_child.text for sub_child in child} for child in leaf]
tree = ETree.fromstring(queue_status_output)
df_running_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[0]))
df_pending_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[1]))
df_merge = df_running_jobs.append(df_pending_jobs, sort=True)
df_merge.state[df_merge.state == 'r'] = 'running'
df_merge.state[df_merge.state == 'qw'] = 'pending'
df_merge.state[df_merge.state == 'Eqw'] = 'error'
return pandas.DataFrame({'jobid': pandas.to_numeric(df_merge.JB_job_number),
'user': df_merge.JB_owner,
'jobname': df_merge.JB_name,
'status': df_merge.state})
def _get_portal_info(self, volume, connector, lun_slot_id, lun_owner):
"""Get portal info."""
# Cache portal info for twenty seconds
# If connectors were the same then use the portal info which was cached
LOG.debug('get into _get_portal_info')
self.initiator = connector['initiator']
ret = self.api_executor.get_iscsi_portal_info()
root = ET.fromstring(ret['data'])
iscsi_port = root.find('iSCSIPortal').find('servicePort').text
LOG.debug('iscsiPort: %s', iscsi_port)
target_iqn_prefix = root.find(
'iSCSIPortal').find('targetIQNPrefix').text
LOG.debug('targetIQNPrefix: %s', target_iqn_prefix)
internal_model_name = (self.nasInfoCache
[self.configuration.qnap_management_url][1])
LOG.debug('internal_model_name: %s', internal_model_name)
fw_version = (self.nasInfoCache
[self.configuration.qnap_management_url][2])
LOG.debug('fw_version: %s', fw_version)
target_index = ''
target_iqn = ''
def get_target_info(self, target_index):
"""Get nas target info."""
res_details = self._get_res_details(
'/cgi-bin/disk/iscsi_portal_setting.cgi?',
func='extra_get',
targetInfo=1,
targetIndex=target_index,
ha_sync='1',
sid=self.sid)
root = ET.fromstring(res_details['data'])
LOG.debug('TS get_target_info.authPassed: (%s)',
root.find('authPassed').text)
if root.find('authPassed').text == '0':
raise exception.VolumeBackendAPIException(
data=_('Session id expired'))
if root.find('result').text < '0':
raise exception.VolumeBackendAPIException(
data=_('Get target info failed'))
target_list = root.find('targetInfo')
target_tree = target_list.findall('row')
for target in target_tree:
if target_index == target.find('targetIndex').text:
return target
res_details = self._get_res_details(
'/cgi-bin/disk/iscsi_target_setting.cgi?',
func='add_init',
targetIQN=target_iqn,
initiatorIQN=init_iqn,
initiatorAlias=init_iqn,
bCHAPEnable='1' if use_chap_auth else '0',
CHAPUserName=chap_username,
CHAPPasswd=chap_password,
bMutualCHAPEnable='0',
mutualCHAPUserName='',
mutualCHAPPasswd='',
ha_sync='1',
sid=self.sid)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.VolumeBackendAPIException(
data=_('Session id expired'))
if root.find('result').text < '0':
raise exception.VolumeBackendAPIException(
data=_('Add target acl failed'))