临时存档

master
Mufanc 3 years ago
parent e28ef98194
commit 1db550bee8

@ -1,2 +1,2 @@
from .ismart import finish from .spider import Spider
from .ismart import export from .devtools import Browser

@ -8,6 +8,7 @@ import websockets
from loguru import logger from loguru import logger
from configs import configs from configs import configs
from .utils import ainput
_default_port = configs['browser']['port'] _default_port = configs['browser']['port']
_executable = configs['browser']['executable'] _executable = configs['browser']['executable']
@ -31,6 +32,28 @@ class Browser(object):
def __init__(self, dev_port): def __init__(self, dev_port):
self.port = dev_port self.port = dev_port
async def verify(self): # 校验客户端和配置文件中的用户是否相同
logger.info('正在校验账号...')
page = await self._any_http_page()
user_info = json.loads((await page.eval('''
(function () {
var xhr = new XMLHttpRequest()
xhr.open('POST', 'https://school.ismartlearning.cn/client/user/student-info', false)
xhr.withCredentials = true
xhr.send(null)
return xhr.responseText
})()
'''))['result']['value'])['data']
spider_user = configs['user']['username']
if spider_user != user_info['mobile'] and spider_user != user_info['username']:
logger.warning('检测到 iSmart 客户端中登录的账号与配置文件中账号不符!')
choice = await ainput('继续使用可能会出现意料之外的问题,是否继续?[y/N]')
if choice.lower() != 'y':
return False
else:
logger.info('校验通过!')
return True
async def wait_for_book(self): # 等待「教材学习」页面 async def wait_for_book(self): # 等待「教材学习」页面
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
while True: while True:
@ -38,25 +61,34 @@ class Browser(object):
try: try:
pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json() pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json()
for page in pages: for page in pages:
if re.match(r'.*me.ismartlearning.cn/center/student/course/bookLearn\.html.*', page['url']): if re.match(r'.*me.ismartlearning.cn/center/student/course/bookLearn.html.*', page['url']) and \
'webSocketDebuggerUrl' in page:
return Page(page['url'], page['webSocketDebuggerUrl']) return Page(page['url'], page['webSocketDebuggerUrl'])
except httpx.ConnectError: except httpx.ConnectError:
pass pass
await asyncio.sleep(2) await asyncio.sleep(2)
async def any_http_page(self): # 等待任意 http 页面 async def _any_http_page(self):
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
while True: while True:
logger.info('等待可用页面...') logger.info('等待可用页面...')
try: try:
pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json() pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json()
for page in pages: for page in pages:
if re.match(r'https?://.*', page['url']): if re.match(r'https?://.*', page['url']) and 'webSocketDebuggerUrl' in page:
return Page(page['url'], page['webSocketDebuggerUrl']) return Page(page['url'], page['webSocketDebuggerUrl'])
except httpx.ConnectError: except httpx.ConnectError:
pass pass
await asyncio.sleep(2) await asyncio.sleep(2)
async def submit(self, book_id, chapter_id, task_id, score, seconds, percent, user_id): # 提交任务点的得分
page = await self._any_http_page()
model = 'NetBrowser.submitTask("%s", "%s", "%s", 0, "%d", %d, %d, "%s");'
result = f'%7B%22studentid%22:{user_id},%22testInfo%22:%7B%22answerdata%22:%22%22,%22markdatao%22:%22%22%7D%7D'
return await page.eval(
model % (book_id, chapter_id, task_id, score, seconds, percent, result)
)
class Page(object): class Page(object):
def __init__(self, url, dev_url): def __init__(self, url, dev_url):
@ -81,10 +113,3 @@ class Page(object):
} }
) )
return result['result'] return result['result']
async def submit(self, book_id, chapter_id, task_id, score, seconds, percent, user_id):
model = 'NetBrowser.submitTask("%s", "%s", "%s", 0, "%d", %d, %d, "%s");'
result = f'%7B%22studentid%22:{user_id},%22testInfo%22:%7B%22answerdata%22:%22%22,%22markdatao%22:%22%22%7D%7D'
return await self.eval(
model % (book_id, chapter_id, task_id, score, seconds, percent, result)
)

@ -1,122 +0,0 @@
import json
import os
import pickle
import urllib.parse as parser
from bs4 import BeautifulSoup
from loguru import logger
from random import random, randint
from configs import configs
from .devtools import Browser
from .markdown import generate_md
from .spider import Spider
random_args = { # 不同题型对应的随机时长和分数范围
'1': { # 单选题
'time': (20, 60), # 完成时长 / 秒
'score': 1 # 得分 (归一化, 向上随机取至满分)
},
'2': { # 多选题
'time': (40, 120),
'score': 0.9
},
'3': { # 判断题
'time': (20, 50),
'score': 1
},
'4': { # 填空题
'time': (60, 180),
'score': 1
},
'6': { # 连线题
'time': (60, 180),
'score': 0.8
},
'8': { # 匹配题
'time': (30, 90),
'score': 1
},
'9': { # 口语跟读
'time': (15, 30),
'score': 0.8
},
'10': { # 短文改错
'time': (120, 180),
'score': 0.7
},
'11': { # 选词填空
'time': (30, 90),
'score': 0.9
}
}
def _random_progress(paper):
paper = BeautifulSoup(paper, 'lxml-xml')
questions = paper.select('element[knowledge]:has(> question_type)')
if questions:
total_score = 0
my_score, my_time = 0, 0
for que in questions:
qt_type = que.select_one('question_type').text
qt_score = int(que.select_one('question_score').text)
total_score += qt_score
rate = 1 - (1 - random_args[qt_type]['score']) * random()
my_score += qt_score * rate
my_time += randint(*random_args[qt_type]['time'])
return int(100 * my_score / total_score), my_time
return 100, 5
async def export(): # 导出某书籍的答案
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
if not os.path.exists(f'.cache/books/{book_id}'):
async with Spider() as spider:
await spider.login(**configs['user'])
book = await spider.book_info(book_id)
book['courseId'] = course_id
tasks = await spider.get_tasks(book, tree=True)
await spider.download_tree(tasks)
with open(f'.cache/books/{book_id}/Tree.pck', 'rb') as fp:
generate_md(pickle.load(fp))
async def finish(): # 直接完成某书籍的任务
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
async with Spider() as spider:
await spider.login(**configs['user'])
if not os.path.exists(f'.cache/books/{book_id}'):
book = await spider.book_info(book_id)
book['courseId'] = course_id
tasks = await spider.get_tasks(book, tree=True)
await spider.download_tree(tasks)
user_id = (await spider.get_user())['data']['uid']
logger.info('正在提交任务...')
for file in os.listdir(f'.cache/books/{book_id}'):
paper_id, ext = os.path.splitext(file)
if ext != '.json':
continue
with open(f'.cache/books/{book_id}/{file}') as fp:
data = json.load(fp)
task = data['task']
paper = data['paperData']
score, time = _random_progress(paper)
result = await page.submit(book_id, task['chapterId'], task['id'], score, time, 100, user_id)
if result['wasThrown'] or not result['result']['value']:
logger.warning(f'任务 {task["name"]} [paperId: {paper_id}] 可能提交失败,请留意最终结果!')
logger.info('全部提交完成!')
async def finish_all(): # Todo: 全刷了?
pass

@ -1 +0,0 @@
from .md import generate_md

@ -1,34 +0,0 @@
import re
class Formatter:
@staticmethod
def fix_img(text): # 处理 <img/> 标签
return re.sub('<img.+?>', '「暂不支持图片显示澳」', text)
@staticmethod
def rm_lgt(text): # 处理括号对
return re.sub('<.+?>', '', text)
@staticmethod
def fix_uline(text): # 处理下划线
return re.sub('_{3,}', lambda mch: '\\_' * len(mch.group()), text)
@staticmethod
def rm_head(text): # 处理数字标号
return re.sub(r'^(?:\d+(?:\.| +\b))+\d+ ', '', text)
@staticmethod
def fix_lf(text): # 处理换行
text = re.sub('<br/?>', '\n\n', text)
return re.sub('<p>(.+?)</p>', lambda mch: mch.group(1) + '\n\n', text)
@staticmethod
def fix_space(text):
return re.sub('(?:&nbsp;)+', ' ', text)
def fix(text, func_ptrs):
for func in func_ptrs:
text = getattr(Formatter, func)(text)
return text

@ -1,133 +0,0 @@
"""
不同 question type 对应的解析方法
传入两个参数 ( question, answer, output ), 将输出行依次 append output 队列中
"""
import re
from .formatter import fix
class Generators:
@staticmethod
def type_1(que, ans, output): # 单选题
# 提取题目内容
question = que.select_one("question_text").text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
ans_id = que.attrs['id']
corrects = set(ans.select_one(f'[id="{ans_id}"] > answers').text)
# 生成对应 Markdown
options = que.select('options > *')
for opt in options:
opt_id = opt.attrs['id']
answer_text = fix(opt.text, ('rm_lgt', 'fix_space'))
if opt_id in corrects: # 高亮正确答案
output.append(f'<p><font color="#2ed573">&emsp;&emsp;<b>{opt_id}.</b> {answer_text}</font></p>\n')
else:
output.append(f'&emsp;&emsp;<b>{opt_id}.</b> {answer_text}\n')
@staticmethod
def type_2(*args): # 多选题
return Generators.type_1(*args)
@staticmethod
def type_3(que, ans, output): # 判断题
question = que.select_one("question_text").text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
ans_id = que.attrs['id']
correct = ans.select_one(f'[id="{ans_id}"] > answers').text
# 生成对应 Markdown
output.append(f'* 答案:「**{correct}**」\n')
@staticmethod
def type_4(que, ans, output): # 填空题
# 提取题目内容
question = que.select_one('question_text').text
question = re.sub('<br/?>', '\n', question)
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>[{ans.text}]</b></font> '
)
output.append(question + '\n')
@staticmethod
def type_6(que, ans, output): # 连线题
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
options = que.select('options > *')
pairs = {}
for opt in options:
opt_id = opt.attrs['id']
if opt_id not in pairs:
pairs[opt_id] = [0, 0]
flag = int(opt.attrs['flag'])
pairs[opt_id][flag - 1] = opt.text
output.append('| Part-A | Part-B |')
output.append('| :- | :- |')
for gp_id in pairs:
left = fix(pairs[gp_id][0], ('fix_img', 'rm_lgt', 'fix_uline', 'fix_space')).replace('|', '\\|')
right = fix(pairs[gp_id][1], ('fix_img', 'rm_lgt', 'fix_uline', 'fix_space')).replace('|', '\\|')
output.append(f'| {left} | {right} |')
output.append('')
@staticmethod
def type_8(que, ans, output): # 匹配题
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('rm_lgt', 'fix_uline'))
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
question = fix(question, ('fix_lf', 'rm_lgt', 'fix_space'))
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>{ans.text}</b></font> '
)
output.append(question + '\n')
@staticmethod
def type_9(que, ans, output): # 口语跟读
output.append('「口语跟读」\n')
@staticmethod
def type_10(que, ans, output): # 短文改错
output.append('* **短文改错**')
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
for i, ans in enumerate(corrects):
desc = re.sub('(?<=[A-Za-z0-9])(?=[\u4e00-\u9fa5])', ' ', ans.attrs['desc'])
desc = re.sub('(?<=[\u4e00-\u9fa5])(?=[A-Za-z0-9])', ' ', desc)
output.append(f'{i + 1}. {desc}\n')
output.append('')
@staticmethod
def type_11(que, ans, output): # 选词填空
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('fix_uline', 'fix_lf', 'rm_lgt', 'fix_space'))
options = {opt.attrs['id']: opt.text for opt in que.select('options > option[flag="2"]')}
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>{options[ans.text]}</b></font> '
)
output.append(question + '\n')

@ -1,59 +0,0 @@
import json
from collections import deque
from bs4 import BeautifulSoup
from loguru import logger
from .formatter import fix
from .generator import Generators
_output = deque()
# 解码题目与答案 xml
def decode(que, ans, qt_type):
getattr(Generators, f'type_{qt_type}')(que, ans, _output)
# 生成每个 paper 的答案
def unescape(node, book_id):
paper_id = node.task['paperId']
with open(f'.cache/books/{book_id}/{paper_id}.json', 'r') as fp:
task = json.load(fp)
paper = BeautifulSoup(task['paperData'], 'lxml-xml')
answer = BeautifulSoup(task['answerData'], 'lxml-xml')
questions = paper.select('element[knowledge]:has(> question_type)')
if questions:
for que in questions:
qt_type = int(que.select_one('question_type').text)
decode(que, answer, qt_type)
return True
return False
# 深搜创建目录树
def dfs(node, book_id, depth=2):
if title := node.task['name']:
logger.info(f'{". " * (depth - 1)}{title}')
title = fix(title, ('rm_head',))
_output.append(f'{"#" * depth} {title}\n')
flag = False
if 'paperId' in node.task:
flag = unescape(node, book_id)
for ch in node.children:
if dfs(ch, book_id, depth + 1):
flag = True
if not flag:
_output.pop()
return flag
def generate_md(root): # 生成答案
book_id = root.task['book_id']
for ch in root.children:
dfs(ch, book_id)
with open('.cache/answer.md', 'w', encoding='utf-8') as file:
while len(_output):
line = _output.popleft()
file.write(line + '\n')
logger.info('Done.')

@ -0,0 +1 @@
from .spider import *

Before

Width:  |  Height:  |  Size: 426 B

After

Width:  |  Height:  |  Size: 426 B

Before

Width:  |  Height:  |  Size: 410 B

After

Width:  |  Height:  |  Size: 410 B

Before

Width:  |  Height:  |  Size: 429 B

After

Width:  |  Height:  |  Size: 429 B

Before

Width:  |  Height:  |  Size: 404 B

After

Width:  |  Height:  |  Size: 404 B

Before

Width:  |  Height:  |  Size: 395 B

After

Width:  |  Height:  |  Size: 395 B

Before

Width:  |  Height:  |  Size: 390 B

After

Width:  |  Height:  |  Size: 390 B

Before

Width:  |  Height:  |  Size: 403 B

After

Width:  |  Height:  |  Size: 403 B

Before

Width:  |  Height:  |  Size: 414 B

After

Width:  |  Height:  |  Size: 414 B

Before

Width:  |  Height:  |  Size: 417 B

After

Width:  |  Height:  |  Size: 417 B

Before

Width:  |  Height:  |  Size: 422 B

After

Width:  |  Height:  |  Size: 422 B

@ -1,7 +1,3 @@
import asyncio
import json
import os
import pickle
from hashlib import md5 from hashlib import md5
from random import random from random import random
@ -9,19 +5,15 @@ import httpx
from loguru import logger from loguru import logger
from .captcha import recognize from .captcha import recognize
from ..utils import Tree
class Tree:
def __init__(self, task):
self.task = task
self.children = []
class Spider(httpx.AsyncClient): class Spider(httpx.AsyncClient):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
async def login(self, username, password): # 账号密码登录 async def login(self, username, password):
self.cookies.clear() # 重置 cookies
logger.info('正在获取验证码...') logger.info('正在获取验证码...')
result = await self.get(f'http://sso.ismartlearning.cn/captcha.html?{random()}') result = await self.get(f'http://sso.ismartlearning.cn/captcha.html?{random()}')
code = recognize(result.content) code = recognize(result.content)
@ -30,7 +22,7 @@ class Spider(httpx.AsyncClient):
'http://sso.ismartlearning.cn/v2/tickets-v2', 'http://sso.ismartlearning.cn/v2/tickets-v2',
data={ data={
'username': username, 'username': username,
'password': md5(token.encode() + b'fa&s*l%$k!fq$k!ld@fjlk').hexdigest(), 'password': md5(token.encode() + b'fa&s*l%$k!fq$k!ld@fjlk').hexdigest(), # 啥时候炸了就写成动态获取的
'captcha': code 'captcha': code
}, },
headers={ headers={
@ -41,11 +33,10 @@ class Spider(httpx.AsyncClient):
)).json() )).json()
logger.debug(info['result']) logger.debug(info['result'])
if info['result']['code'] != -26: assert info['result']['code'] == -26 # 断言登录结果
raise AssertionError(f'[!] 登录失败: {info["result"]["msg"]}')
return info['result'] return info['result']
async def get_courses(self): # 获取用户课程列表 async def get_courses(self): # 获取课程列表
logger.info('正在获取课程列表...') logger.info('正在获取课程列表...')
courses = (await self.post( courses = (await self.post(
'https://school.ismartlearning.cn/client/course/list-of-student?status=1', 'https://school.ismartlearning.cn/client/course/list-of-student?status=1',
@ -56,50 +47,44 @@ class Spider(httpx.AsyncClient):
)).json()['data'] )).json()['data']
return courses['list'] return courses['list']
async def get_books(self, course): # 获取某课程的书籍列表 async def get_books(self, course_id): # 获取某课程的书籍列表
logger.info('正在获取书籍列表...') logger.info('正在获取书籍列表...')
await self.get_courses() # 必须有这个请求,否则后面会报错 await self.get_courses() # 必须有这个请求,否则后面会报错
books = (await self.post( books = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/list-of-student', 'http://school.ismartlearning.cn/client/course/textbook/list-of-student',
data={ data={
'courseId': course['courseId'] 'courseId': course_id
} }
)).json()['data'] )).json()['data']
return books return books
@staticmethod async def get_tasks(self, book_id, book_type, course_id): # 获取某书籍的任务树
def _merge_tasks(tasks): # 将任务列表重组成树形结构 logger.info('正在获取任务列表...')
tasks = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/chapters',
data={
'bookId': book_id,
'bookType': book_type,
'courseId': course_id
}
)).json()['data']
id_record = {task['id']: Tree(task) for task in tasks} id_record = {task['id']: Tree(task) for task in tasks}
root = Tree({ root = Tree({
'book_id': tasks[0]['book_id'], 'book_id': tasks[0]['book_id'],
'unitStudyPercent': 0 'unitStudyPercent': 0
}) })
for task_id in id_record: for task_id in id_record:
node = id_record[task_id] node = id_record[task_id]
node_name = (f'{node.task["name"]} ' if 'name' in node.task else '') + f'[id:{node.task["id"]}]' node_name = (f'{node.task["name"]} ' if 'name' in node.task else '') + f'[id:{node.task["id"]}]'
if 'parent_id' in node.task: if 'parent_id' in node.task:
if (parent_id := node.task['parent_id']) in id_record: if (parent_id := node.task['parent_id']) in id_record:
id_record[parent_id].children.append(node) id_record[parent_id].child.append(node)
else: else:
logger.warning(f'任务已忽略(父节点不存在):{node_name}') logger.warning(f'任务已忽略(父节点不存在):{node_name}')
else: else:
root.children.append(node) root.child.append(node)
return root return root
async def get_tasks(self, book, tree=False): # 获取某书籍的任务列表
logger.info('正在获取任务列表...')
await self.post('http://school.ismartlearning.cn/client/course/textbook/chapters')
tasks = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/chapters',
data={key: book[key] for key in ('bookId', 'bookType', 'courseId')}
)).json()['data']
if tree:
return self._merge_tasks(tasks)
else:
return tasks
async def get_paper(self, paper_id): # 获取任务点信息(包括题目和答案) async def get_paper(self, paper_id): # 获取任务点信息(包括题目和答案)
ticket = (await self.post( ticket = (await self.post(
'http://sso.ismartlearning.cn/v1/serviceTicket', 'http://sso.ismartlearning.cn/v1/serviceTicket',
@ -107,7 +92,6 @@ class Spider(httpx.AsyncClient):
'service': 'http://xot-api.ismartlearning.cn/client/textbook/paperinfo' 'service': 'http://xot-api.ismartlearning.cn/client/textbook/paperinfo'
} }
)).json()['data']['serverTicket'] )).json()['data']['serverTicket']
logger.debug(f'Ticket: {ticket}')
paper_info = (await self.post( paper_info = (await self.post(
'http://xot-api.ismartlearning.cn/client/textbook/paperinfo', 'http://xot-api.ismartlearning.cn/client/textbook/paperinfo',
data={ data={
@ -125,60 +109,8 @@ class Spider(httpx.AsyncClient):
)).json()['data'] )).json()['data']
return paper_info return paper_info
async def download_tree(self, root): async def user_info(self):
async def download(task): logger.info('正在获取用户信息...')
paper_id = task['paperId']
filepath = f'.cache/books/{root.task["book_id"]}/{paper_id}.json'
if os.path.exists(filepath):
return
async with limit: # 防止并发过高
result = await self.get_paper(paper_id)
result['task'] = task # 继续存入 Task
with open(filepath, 'w') as file:
json.dump(result, file)
def dfs(src):
if 'paperId' in (task := src.task):
logger.info(f'添加任务:{task["name"]}')
tasks.append(download(task))
for child in src.children:
dfs(child)
logger.info('开始下载试题及答案...')
os.makedirs(f'.cache/books/{root.task["book_id"]}', exist_ok=True)
with open(f'.cache/books/{root.task["book_id"]}/Tree.pck', 'wb') as fp:
pickle.dump(root, fp)
tasks, limit = [], asyncio.Semaphore(4)
dfs(root)
await asyncio.gather(*tasks)
logger.info('下载完成.')
async def get_user(self):
return (await self.post( return (await self.post(
'https://school.ismartlearning.cn/client/user/student-info') 'https://school.ismartlearning.cn/client/user/student-info')
).json() ).json()
async def book_info(self, book_id):
ticket = (await self.post(
'http://sso.ismartlearning.cn/v1/serviceTicket',
data={
'service': 'http://book-api.ismartlearning.cn/client/v2/book/info'
}
)).json()['data']['serverTicket']
book_info = (await self.post(
'http://book-api.ismartlearning.cn/client/v2/book/info',
headers={
'Origin': 'http://me.ismartlearning.cn',
'Referer': 'http://me.ismartlearning.cn/',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Encoding': 'gzip, deflate'
},
params={
'ticket': ticket
},
data={
'bookId': book_id,
'bookType': 0
}
)).json()
return book_info['data']

@ -0,0 +1,33 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import urllib.parse as parser
from configs import configs
from .devtools import Browser
from .spider import Spider
class Tree: # 任务树
def __init__(self, task):
self.task = task
self.child = []
async def ainput(prompt: str = ''):
with ThreadPoolExecutor(1, 'ainput') as executor:
return (
await asyncio.get_event_loop().run_in_executor(executor, input, prompt)
).rstrip()
async def flash_recent(): # 对当前书籍执行刷课
if configs['browser']['mode'] == 'launch':
browser = Browser.launch()
else:
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
async with Spider() as spider:
await spider.login(**configs['user'])

@ -4,6 +4,7 @@ import yaml
with open('configs.yml', 'r', encoding='utf-8') as _fp: with open('configs.yml', 'r', encoding='utf-8') as _fp:
configs = yaml.safe_load(_fp) configs = yaml.safe_load(_fp)
if __name__ == '__main__': if __name__ == '__main__':
import json import json
print(json.dumps(configs, indent=4)) print(json.dumps(configs, indent=4))

@ -4,9 +4,10 @@
project: project:
skip-finished: true # 跳过已完成任务(暂不支持) skip-finished: true # 跳过已完成任务(暂不支持)
# iSmart 客户端配置 # iSmart 客户端配置
browser: browser:
mode: connect # 连接模式( connect / launch
# 下面为 mode = launch 时的配置
executable: Z:\iSmart\client\iSmart.exe # 客户端可执行文件的路径 executable: Z:\iSmart\client\iSmart.exe # 客户端可执行文件的路径
args: # 启动 iSmart 客户端时额外提供的参数 args: # 启动 iSmart 客户端时额外提供的参数
- --disable-web-security - --disable-web-security
@ -14,5 +15,24 @@ browser:
# 用户配置(务必保持账号密码与 iSmart 中已登录的相同) # 用户配置(务必保持账号密码与 iSmart 中已登录的相同)
user: user:
username: <用户名> # 手机号 username: <用户名> # 用户名/手机号
password: <密码> # 密码 password: <密码> # 密码
# 答题配置
paper:
# 不同题目类型的随机得分
ramdom-score: # Todo: 判断列表长度
- 1 # 1.单选
- 1 # 2.多选
- 1 # 3.判断
- [ 0.9, 1 ] # 4.填空
-
- [ 0.9, 1 ] # 6.连线
-
- 1 # 8.匹配
- [ 0.7, 1 ] # 9.口语跟读
- [ 0.7, 1 ] # 10.短文改错
- 1 # 11.选词填空
defaults: pause # 未知题型的处理方式
random-time: [ 90, 240 ] # 每道题的随机用时(秒)

@ -1,10 +1,23 @@
import asyncio import asyncio
from automaton import finish from argparse import ArgumentParser
async def main(): async def main():
await finish() parser = ArgumentParser('main.py')
parser.add_argument('-v', dest='LEVEL', default='warning', help='日志过滤等级,默认为 warning')
subparsers = parser.add_subparsers(help='模式选择')
method_list = subparsers.add_parser('list', help='列出所有课程和书籍')
method_flash = subparsers.add_parser('flash', help='对选定的一个或几个课程执行刷课')
target = method_flash.add_mutually_exclusive_group()
target.add_argument('-b', '--book', action='store_true', help='对当前打开的书籍执行刷课')
target.add_argument('-c', '--course', action='store_true', help='对当前打开的课程执行刷课')
target.add_argument('-a', '--all', action='store_true', help='对所有课程和书籍执行刷课')
method_flash.add_argument('-f', '--filter', help='')
method_flash.add_argument('-i', '--invert', help='过滤器反向')
parser.parse_args()
if __name__ == '__main__': if __name__ == '__main__':
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
@ -12,3 +25,4 @@ if __name__ == '__main__':
loop.run_until_complete(main()) loop.run_until_complete(main())
finally: finally:
loop.close() loop.close()

Loading…
Cancel
Save