You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dcs/dcs/tests/database.py

210 lines
6.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from hashlib import *
import pymysql
import dcs.tests.config as config
import dcs.tests.cookie as cookie
# 获取数据库连接对象
def mysql_conn(host='127.0.0.1', user='root', passwd='xwdjzwy5252', db='test'):
conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db)
return conn
def register(u_name, u_pwd):
s1 = sha1()
s1.update(u_pwd.encode())
sha_pwd = s1.hexdigest()
try:
# 获取数据库连接对象
conn = mysql_conn()
# 获取数据库操作cursor游标
cur = conn.cursor()
# 编写查询的sql语句
select_sql = f'select user_password from user_info where user_name = "{u_name}"'
# 执行sql语句
cur.execute(select_sql)
# 获取执行结果 fetch_one(),判断结果
res = cur.fetchone()
# 如果res返回None 表示没有找到数据,不存在可注册,存在注册失败
if res is not None:
info = '用户名已存在,注册失败'
else:
# 注册-> 插入数据手动commit
insert_sql = 'insert into user_info (user_name, user_password, create_time, login_state) values (%s,%s,now(),false)'
insert_params = [u_name, sha_pwd]
cur.execute(insert_sql, insert_params)
conn.commit()
info = '注册成功'
# 关闭连接
cur.close()
conn.close()
return info
except Exception as e:
print(e)
def get_now():
try:
conn = mysql_conn()
cur = conn.cursor()
select_sql = f'select now()'
cur.execute(select_sql)
res = cur.fetchone()
# 关闭连接
cur.close()
conn.close()
return res[0]
except Exception as e:
print(e)
def login(u_name, u_pwd, st):
s1 = sha1()
s1.update(u_pwd.encode())
sha_pwd = s1.hexdigest()
try:
conn = mysql_conn()
cur = conn.cursor()
select_sql = f'select user_password from user_info where user_name = "{u_name}"'
cur.execute(select_sql)
res = cur.fetchone()
if res is None:
# 登录:根据用户名没有获取密码
info = '用户名错误,登录失败'
else:
# res有值用户名正确判断密码正确与否
m_pwd = res[0]
if m_pwd == sha_pwd:
# info = '用户' + u_name + '登录成功'
time = str(get_now())
info = cookie.Cookie(u_name, time, 'true').generate_cookie()
config.add_user(u_name, time, 'true', 'busy', info, st)
else:
info = '密码错误,登录失败'
# 关闭连接
cur.close()
conn.close()
return info
except Exception as e:
print(e)
def cancel(u_name):
try:
conn = mysql_conn()
cur = conn.cursor()
select_sql = f'delete from user_info where user_name = "{u_name}"'
cur.execute(select_sql)
cur.close()
conn.close()
except Exception as e:
print(e)
def get_last_crawl_id(table_name: str) -> int:
"""
:param table_name: 目标用户对应的爬取结果信息表
:return: 要取得的用户最后一次爬取序列号
"""
try:
conn = mysql_conn()
cur = conn.cursor()
get_id_sql = f'SELECT crawl_id from {table_name} where time = (SELECT max(time) FROM {table_name})'
cur.execute(get_id_sql)
last_crawl_id_res = cur.fetchone()
if last_crawl_id_res is None:
last_crawl_id_res = [0]
last_crawl_id = int(last_crawl_id_res[0])
cur.close()
conn.close()
return last_crawl_id
except Exception as e:
print(e)
def drop_table(table_name: str):
try:
conn = mysql_conn()
cur = conn.cursor()
drop_sql = f'drop table if exists {table_name}'
cur.execute(drop_sql)
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(e)
def create_table(create_sql: str):
try:
conn = mysql_conn()
cur = conn.cursor()
cur.execute(create_sql)
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(e)
def create_crawl_result_table(table_name: str):
create_sql = f'create table if not exists {table_name} (' \
f'id int primary key not null auto_increment,' \
f'crawl_id int not null,' \
f'time timestamp not null,' \
f'name varchar(100),' \
f'college varchar(200),' \
f'major varchar(200),' \
f'paper varchar(200)' \
f')'
create_table(create_sql)
def create_user_info(table_name: str = 'user_info'):
create_sql = f'create table if not exists {table_name} (' \
f'id int primary key not null auto_increment,' \
f'create_time timestamp not null default now(),' \
f'user_name varchar(100),' \
f'user_password varchar(200),' \
f'login_state boolean default false' \
f')'
create_table(create_sql)
def write_result2database(res: list, table_name: str, last_crawl_id: int):
try:
conn = mysql_conn()
cur = conn.cursor()
insert_sql = f"insert into {table_name} (name,college,major,paper,crawl_id,time) values ('%s','%s','%s','%s',%s,now())" % (
res[0], res[1], res[2], res[3], last_crawl_id + 1)
cur.execute(insert_sql)
conn.commit()
cur.close()
conn.close()
info = '插入成功'
except Exception as e:
print(e)
info = '插入失败'
return info
if __name__ == '__main__':
get_now()
# create_crawl_result_table('table_name')
# print(write_result2database(['name', 'college', 'major', 'paper'], "table_name", last_crawl_id=0))
pass
'''
u_name = input('请输入用户名')
u_pwd = input('请输入密码')
# sha1加密
s1 = sha1()
s1.update(u_pwd.encode())
sha_pwd = s1.hexdigest()
print(sha_pwd)
# register()
login()
'''