Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(self):
self.server0 = self.servers.get_server(0)
self.server5 = None
self.s5_serverid = None
replicate.test.setup(self)
index = self.servers.find_server_by_name("rep_slave_no_innodb")
if index >= 0:
self.server5 = self.servers.get_server(index)
try:
res = self.server5.show_server_variable("server_id")
except MUTLibError as err:
raise MUTLibError("Cannot get replication slave "
"server_id: {0}".format(err.errmsg))
self.s5_serverid = int(res[0][1])
else:
self.s5_serverid = self.servers.get_next_id()
res = self.servers.spawn_new_server(
self.server0, self.s5_serverid, "rep_slave_no_innodb",
' --mysqld="--log-bin=mysql-bin --skip-innodb '
'--default-storage-engine=MyISAM"')
def setup(self):
self.res_fname = "result.txt"
result = replicate.test.setup(self)
if not result:
return False
master_str = "--master={0}".format(
self.build_connection_string(self.server1))
slave_str = " --slave={0}".format(
self.build_connection_string(self.server2))
conn_str = master_str + slave_str
self.server2.exec_query("STOP SLAVE")
self.server2.exec_query("RESET SLAVE")
self.server1.exec_query("STOP SLAVE")
self.server1.exec_query("RESET SLAVE")
data_file = os.path.normpath("./std_data/basic_data.sql")
try:
self.server1.exec_query("DROP DATABASE IF EXISTS util_test")
def setup(self):
self.server3 = None
return replicate.test.setup(self)
def setup(self):
res = replicate.test.setup(self)
index = self.servers.find_server_by_name("rep_slave_table")
if index >= 0:
self.server3 = self.servers.get_server(index)
else:
self.s3_serverid = self.servers.get_next_id()
res = self.servers.spawn_new_server(self.server0, self.s3_serverid,
"rep_slave_table", _MYSQLD)
if not res:
raise MUTLibError("Cannot spawn replication slave server.")
self.server3 = res[0]
self.servers.add_new_server(self.server3, True)
return res
def setup(self):
return replicate.test.setup(self)
def setup(self):
self.server0 = self.servers.get_server(0)
self.server3 = None
self.server2 = None
self.server4 = None
self.s1_serverid = None
self.s2_serverid = None
self.s3_serverid = None
self.s4_serverid = None
replicate.test.setup(self)
index = self.servers.find_server_by_name("rep_slave_missing_engines")
if index >= 0:
self.server3 = self.servers.get_server(index)
try:
res = self.server3.show_server_variable("server_id")
except MUTLibError as err:
raise MUTLibError("Cannot get replication slave "
"server_id: {0}".format(err.errmsg))
self.s3_serverid = int(res[0][1])
else:
self.s3_serverid = self.servers.get_next_id()
res = self.servers.spawn_new_server(
self.server0, self.s3_serverid, "rep_slave_missing_engines",
' --mysqld="--log-bin=mysql-bin '
'--default_storage_engine=blackhole"')
def setup(self):
return replicate.test.setup(self)