Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(DataCheckTest, self).setUp()
self.conn = pymysql.Connect(**db_save.PLUGIN_PARAM)
self.cursor = self.conn.cursor()
self.cursor.execute("DROP TABLE IF EXISTS test_data_check")
self.cursor.execute("""
CREATE TABLE test_data_check(
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(32),
term_id VARCHAR(32),
item_id VARCHAR(32),
time DATETIME,
value FLOAT,
warn_msg VARCHAR(1000)
) ENGINE=MyISAM DEFAULT CHARSET=utf8
""")
self.conn.commit()
self.redis_client = redis.StrictRedis(db=config.getint('REDIS', 'db', fallback=1), decode_responses=True)
self.redis_client.flushdb()
def saveJokesToDatabase(jokes):
"""
将爬取的数据放入MySQL数据库
:param jokes:
:return:
"""
try:
print('total:%d' % len(jokes))
connection = pymysql.Connect(host="localhost", user='root', password='root', database='wechat', charset='utf8')
connection.autocommit(True)
with connection.cursor() as cursor:
# 保存之前先过滤重复()
filter_sql = "SELECT joke_id from jokes WHERE create_time > date_sub(current_timestamp(),INTERVAL 7 day) "
cursor.execute(filter_sql)
ids = []
for row in cursor.fetchall():
ids.append(row[0])
jokes = [joke for joke in jokes if int(joke[0]) not in ids]
sql = "INSERT INTO jokes(joke_id,joke_content) VALUES (%s,%s)"
cursor.executemany(sql, jokes)
if song_name_obj.group():
song_name = song_name_obj.group(1)
if comments_info.get((singer_name, song_name)): # 若在含有comment_num的列表中,则添加到数据库,否则直接忽略
with open(os.path.join(song_file_path, song_file), 'r', encoding='utf8') as f:
content = f.read()
get_song_vector(content, model, singer_name, song_name, vocabulary, comments_info[(singer_name, song_name)][0], comments_info[(singer_name, song_name)][1], idfs)
f.close()
print(singer_name + "处理完毕")
save_song_to_mysql(connection)
if __name__ == '__main__':
model_path = './word2vec_res.model' # word2vec模型结果存储地址
model, vocabulary, vec_string_res = load_model(model_path)
save_vocabulary_to_redis(vec_string_res)
connection = pymysql.Connect(
host='localhost',
port=3306,
user='root',
passwd='root',
db="personal_practice",
charset='utf8'
)
main(model, vocabulary, connection)
connection.close()
def __init__(self):
"""
链接数据库
"""
self.conn = pymysql.Connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DB_NAME,
)
def get_connect():
connect = pymysql.Connect(
host='localhost',
port=3306,
user='root',
passwd='root',
db='toutiao',
charset='utf8'
)
return connect
def ConnectData(host,sql):
connect = pymysql.Connect(
host=host,
port=8066,
user='mala',
passwd='Mala123456$',
db='boss',
charset='utf8'
)
# 获取游标
cursor = connect.cursor()
# 查询数据
sql = sql
cursor.execute(sql ) #执行sql
print('共查找出', cursor.rowcount, '条数据')
if cursor.rowcount ==0:
print("数据库中没有找到你要的值")
return 0
def get_connection(parsed_url, options, defer_connect=False):
host = parsed_url.hostname
port = parsed_url.port or MySQLLock.MYSQL_DEFAULT_PORT
dbname = parsed_url.path[1:]
username = parsed_url.username
password = parsed_url.password
unix_socket = options.get("unix_socket")
try:
if unix_socket:
return pymysql.Connect(unix_socket=unix_socket,
port=port,
user=username,
passwd=password,
database=dbname,
defer_connect=defer_connect)
else:
return pymysql.Connect(host=host,
port=port,
user=username,
passwd=password,
database=dbname,
defer_connect=defer_connect)
except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
def open_spider(self, spider):
print('开始爬虫,链接数据库')
self.conn = pymysql.Connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DB_NAME,
)