Initial Commit

master
Mufanc 3 years ago
commit b85fe66bcb

2
.gitignore vendored

@ -0,0 +1,2 @@
/.cache/*
/venv/

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
<excludeFolder url="file://$MODULE_DIR$/.cache" />
</content>
<orderEntry type="jdk" jdkName="Python 3.8 (iSmartAuto2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,105 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
<inspection_tool class="HtmlFormInputWithoutLabel" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="HtmlUnknownTag" enabled="true" level="WARNING" enabled_by_default="true">
<option name="myValues">
<value>
<list size="8">
<item index="0" class="java.lang.String" itemvalue="nobr" />
<item index="1" class="java.lang.String" itemvalue="noembed" />
<item index="2" class="java.lang.String" itemvalue="comment" />
<item index="3" class="java.lang.String" itemvalue="noscript" />
<item index="4" class="java.lang.String" itemvalue="embed" />
<item index="5" class="java.lang.String" itemvalue="script" />
<item index="6" class="java.lang.String" itemvalue="tr" />
<item index="7" class="java.lang.String" itemvalue="td" />
</list>
</value>
</option>
<option name="myCustomValuesEnabled" value="true" />
</inspection_tool>
<inspection_tool class="HtmlUnknownTarget" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="HttpUrlsUsage" enabled="false" level="WEAK WARNING" enabled_by_default="false">
<option name="ignoredUrls">
<list>
<option value="http://localhost" />
<option value="http://127.0.0.1" />
<option value="http://0.0.0.0" />
<option value="http://www.w3.org/" />
<option value="http://json-schema.org/draft" />
<option value="http://java.sun.com/" />
<option value="http://xmlns.jcp.org/" />
<option value="http://javafx.com/javafx/" />
<option value="http://javafx.com/fxml" />
<option value="http://maven.apache.org/xsd/" />
<option value="http://maven.apache.org/POM/" />
<option value="http://www.springframework.org/schema/" />
<option value="http://www.springframework.org/tags" />
<option value="http://www.springframework.org/security/tags" />
<option value="http://www.thymeleaf.org" />
<option value="http://www.jboss.org/j2ee/schema/" />
<option value="http://www.jboss.com/xml/ns/" />
<option value="http://www.ibm.com/webservices/xsd" />
<option value="http://activemq.apache.org/schema/" />
<option value="http://schema.cloudfoundry.org/spring/" />
<option value="http://schemas.xmlsoap.org/" />
<option value="http://cxf.apache.org/schemas/" />
<option value="http://primefaces.org/ui" />
<option value="http://tiles.apache.org/" />
<option value="http://csee.hnu.edu.cn" />
</list>
</option>
</inspection_tool>
<inspection_tool class="JSJQueryEfficiency" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="PyBroadExceptionInspection" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
<inspection_tool class="PyDefaultArgumentInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="PyMandatoryEncodingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="7">
<item index="0" class="java.lang.String" itemvalue="tqdm" />
<item index="1" class="java.lang.String" itemvalue="scipy" />
<item index="2" class="java.lang.String" itemvalue="colour" />
<item index="3" class="java.lang.String" itemvalue="progressbar" />
<item index="4" class="java.lang.String" itemvalue="pydub" />
<item index="5" class="java.lang.String" itemvalue="argparse" />
<item index="6" class="java.lang.String" itemvalue="win32gui" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="E402" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N802" />
<option value="N806" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PySimplifyBooleanCheckInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoreComparisonToZero" value="false" />
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="_importlib_modulespec.ModuleType.main" />
<option value="requests.models.Response.html" />
<option value="tuple.spines" />
<option value="ctypes.c_long.*" />
</list>
</option>
</inspection_tool>
<inspection_tool class="SqlDialectInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="SqlNoDataSourceInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (iSmartAuto2)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/iSmartAuto2.iml" filepath="$PROJECT_DIR$/.idea/iSmartAuto2.iml" />
</modules>
</component>
</project>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -0,0 +1,2 @@
from .ismart import finish
from .ismart import export

@ -0,0 +1 @@
from .captcha import recognize

@ -0,0 +1,38 @@
from os import path
import cv2
import numpy as np
from loguru import logger
from numpy import average, dot, linalg
base_path = path.join(path.split(__file__)[0], 'models')
def similarity(img_1, img_2):
images = [img_1, img_2]
vectors = []
norms = []
for image in images:
vector = [average(pixels) for pixels in image]
vectors.append(vector)
norms.append(linalg.norm(vector, 2))
a, b = vectors
a_norm, b_norm = norms
return dot(a / a_norm, b / b_norm)
def recognize(img_content: bytes):
img = cv2.imdecode(np.asarray(bytearray(img_content), dtype=np.uint8), cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = cv2.threshold(img, 200, 255, cv2.THRESH_BINARY)[1]
models = [cv2.imread(path.join(base_path, f'{i}.png')) for i in range(10)]
code = ''
for i in range(4):
code += sorted(
[(f'{j}', similarity(img[4:24, 9 + i * 15:24 + i * 15], std)) for j, std in enumerate(models)],
key=lambda x: x[1]
)[-1][0]
logger.info(f'识别结果:{code}')
if len(code) != 4:
logger.warning('验证码长度不是 4 位')
return code

Binary file not shown.

After

Width:  |  Height:  |  Size: 426 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 410 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 429 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 404 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 395 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 390 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 403 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 414 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 417 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 422 B

@ -0,0 +1,77 @@
import asyncio
import ctypes
import json
import re
import httpx
import websockets
from loguru import logger
from configs import configs
_default_port = configs['browser']['port']
_executable = configs['browser']['executable']
_args = configs['browser']['args']
class Browser(object):
@classmethod
def connect(cls):
return cls(_default_port)
@classmethod
def launch(cls):
ctypes.windll.shell32.ShellExecuteW(
None, 'runas', _executable,
' '.join([f'--remote-debugging-port={_default_port}', *_args]),
None, 1
)
return cls(_default_port)
def __init__(self, dev_port):
self.port = dev_port
async def wait_for_book(self): # 等待「教材学习」页面
async with httpx.AsyncClient() as client:
while True:
logger.info('等待「教材学习」页面...')
try:
pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json()
for page in pages:
if re.match(r'.*me.ismartlearning.cn/center/student/course/bookLearn\.html.*', page['url']):
return Page(page['url'], page['webSocketDebuggerUrl'])
await asyncio.sleep(2) # 这样写跟套 finally 有区别
except httpx.ConnectError:
await asyncio.sleep(2)
class Page(object):
def __init__(self, url, dev_url):
self.id = 0
self.url, self.dev_url = url, dev_url
async def send(self, command, params):
async with websockets.connect(self.dev_url) as devtools:
await devtools.send(json.dumps({
'id': self.id,
'method': command,
'params': params
}))
self.id += 1
return json.loads(await devtools.recv())
async def eval(self, script):
result = await self.send(
'Runtime.evaluate', {
'expression': script,
'awaitPromise': True
}
)
return result['result']
async def submit(self, book_id, chapter_id, task_id, score, seconds, percent, user_id):
model = 'NetBrowser.submitTask("%s", "%s", "%s", 0, "%d", %d, %d, "%s");'
result = f'%7B%22studentid%22:{user_id},%22testInfo%22:%7B%22answerdata%22:%22%22,%22markdatao%22:%22%22%7D%7D'
return await self.eval(
model % (book_id, chapter_id, task_id, score, seconds, percent, result)
)

@ -0,0 +1,122 @@
import json
import os
import pickle
import urllib.parse as parser
from bs4 import BeautifulSoup
from loguru import logger
from random import random, randint
from configs import configs
from .devtools import Browser
from .markdown import generate_md
from .spider import Spider
random_args = { # 不同题型对应的随机时长和分数范围
'1': { # 单选题
'time': (20, 60), # 完成时长 / 秒
'score': 1 # 得分 (归一化, 向上至满分)
},
'2': { # 多选题
'time': (40, 120),
'score': 0.9
},
'3': { # 判断题
'time': (20, 50),
'score': 1
},
'4': { # 填空题
'time': (60, 180),
'score': 1
},
'6': { # 连线题
'time': (60, 180),
'score': 0.8
},
'8': { # 匹配题
'time': (30, 90),
'score': 1
},
'9': { # 口语跟读
'time': (15, 30),
'score': 0.8
},
'10': { # 短文改错
'time': (120, 180),
'score': 0.7
},
'11': { # 选词填空
'time': (30, 90),
'score': 0.9
},
}
def _random_progress(paper):
paper = BeautifulSoup(paper, 'lxml-xml')
questions = paper.select('element[knowledge]:has(> question_type)')
if questions:
total_score = 0
my_score, my_time = 0, 0
for que in questions:
qt_type = que.select_one('question_type').text
qt_score = int(que.select_one('question_score').text)
total_score += qt_score
rate = 1 - (1 - random_args[qt_type]['score']) * random()
my_score += qt_score * rate
my_time += randint(*random_args[qt_type]['time'])
return int(100 * my_score / total_score), my_time
return 100, 5
async def export(): # 导出某书籍的答案
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
if not os.path.exists(f'.cache/books/{book_id}'):
async with Spider() as spider:
await spider.login(**configs['user'])
book = await spider.book_info(book_id)
book['courseId'] = course_id
tasks = await spider.get_tasks(book, tree=True)
await spider.download_tree(tasks)
with open(f'.cache/books/{book_id}/Tree.pck', 'rb') as fp:
generate_md(pickle.load(fp))
async def finish(): # 直接完成某书籍的任务
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
async with Spider() as spider:
await spider.login(**configs['user'])
if not os.path.exists(f'.cache/books/{book_id}'):
book = await spider.book_info(book_id)
book['courseId'] = course_id
tasks = await spider.get_tasks(book, tree=True)
await spider.download_tree(tasks)
user_id = (await spider.get_user())['data']['uid']
logger.info('正在提交任务...')
for file in os.listdir(f'.cache/books/{book_id}'):
paper_id, ext = os.path.splitext(file)
if ext != '.json':
continue
with open(f'.cache/books/{book_id}/{file}') as fp:
data = json.load(fp)
task = data['task']
paper = data['paperData']
score, time = _random_progress(paper)
result = await page.submit(book_id, task['chapterId'], task['id'], score, time, 100, user_id)
if result['wasThrown'] or not result['result']['value']:
logger.warning(f'任务 {task["name"]} [paperId: {paper_id}] 可能提交失败,请留意最终结果!')
logger.info('全部提交完成!')
async def finish_all(): # Todo: 全刷了?
pass

@ -0,0 +1 @@
from .md import generate_md

@ -0,0 +1,34 @@
import re
class Formatter:
@staticmethod
def fix_img(text): # 处理 <img/> 标签
return re.sub('<img.+?>', '「暂不支持图片显示澳」', text)
@staticmethod
def rm_lgt(text): # 处理括号对
return re.sub('<.+?>', '', text)
@staticmethod
def fix_uline(text): # 处理下划线
return re.sub('_{3,}', lambda mch: '\\_' * len(mch.group()), text)
@staticmethod
def rm_head(text): # 处理数字标号
return re.sub(r'^(?:\d+(?:\.| +\b))+\d+ ', '', text)
@staticmethod
def fix_lf(text): # 处理换行
text = re.sub('<br/?>', '\n\n', text)
return re.sub('<p>(.+?)</p>', lambda mch: mch.group(1) + '\n\n', text)
@staticmethod
def fix_space(text):
return re.sub('(?:&nbsp;)+', ' ', text)
def fix(text, func_ptrs):
for func in func_ptrs:
text = getattr(Formatter, func)(text)
return text

@ -0,0 +1,133 @@
"""
不同 question type 对应的解析方法
传入两个参数 ( question, answer, output ), 将输出行依次 append output 队列中
"""
import re
from .formatter import fix
class Generators:
@staticmethod
def type_1(que, ans, output): # 单选题
# 提取题目内容
question = que.select_one("question_text").text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
ans_id = que.attrs['id']
corrects = set(ans.select_one(f'[id="{ans_id}"] > answers').text)
# 生成对应 Markdown
options = que.select('options > *')
for opt in options:
opt_id = opt.attrs['id']
answer_text = fix(opt.text, ('rm_lgt', 'fix_space'))
if opt_id in corrects: # 高亮正确答案
output.append(f'<p><font color="#2ed573">&emsp;&emsp;<b>{opt_id}.</b> {answer_text}</font></p>\n')
else:
output.append(f'&emsp;&emsp;<b>{opt_id}.</b> {answer_text}\n')
@staticmethod
def type_2(*args): # 多选题
return Generators.type_1(*args)
@staticmethod
def type_3(que, ans, output): # 判断题
question = que.select_one("question_text").text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
ans_id = que.attrs['id']
correct = ans.select_one(f'[id="{ans_id}"] > answers').text
# 生成对应 Markdown
output.append(f'* 答案:「**{correct}**」\n')
@staticmethod
def type_4(que, ans, output): # 填空题
# 提取题目内容
question = que.select_one('question_text').text
question = re.sub('<br/?>', '\n', question)
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>[{ans.text}]</b></font> '
)
output.append(question + '\n')
@staticmethod
def type_6(que, ans, output): # 连线题
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('rm_lgt', 'fix_uline', 'fix_space'))
output.append(f'* **{question}**\n')
# 提取答案
options = que.select('options > *')
pairs = {}
for opt in options:
opt_id = opt.attrs['id']
if opt_id not in pairs:
pairs[opt_id] = [0, 0]
flag = int(opt.attrs['flag'])
pairs[opt_id][flag - 1] = opt.text
output.append('| Part-A | Part-B |')
output.append('| :- | :- |')
for gp_id in pairs:
left = fix(pairs[gp_id][0], ('fix_img', 'rm_lgt', 'fix_uline', 'fix_space')).replace('|', '\\|')
right = fix(pairs[gp_id][1], ('fix_img', 'rm_lgt', 'fix_uline', 'fix_space')).replace('|', '\\|')
output.append(f'| {left} | {right} |')
output.append('')
@staticmethod
def type_8(que, ans, output): # 匹配题
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('rm_lgt', 'fix_uline'))
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
question = fix(question, ('fix_lf', 'rm_lgt', 'fix_space'))
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>{ans.text}</b></font> '
)
output.append(question + '\n')
@staticmethod
def type_9(que, ans, output): # 口语跟读
output.append('「口语跟读」\n')
@staticmethod
def type_10(que, ans, output): # 短文改错
output.append('* **短文改错**')
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
for i, ans in enumerate(corrects):
desc = re.sub('(?<=[A-Za-z0-9])(?=[\u4e00-\u9fa5])', ' ', ans.attrs['desc'])
desc = re.sub('(?<=[\u4e00-\u9fa5])(?=[A-Za-z0-9])', ' ', desc)
output.append(f'{i + 1}. {desc}\n')
output.append('')
@staticmethod
def type_11(que, ans, output): # 选词填空
# 提取题目内容
question = que.select_one('question_text').text
question = fix(question, ('fix_uline', 'fix_lf', 'rm_lgt', 'fix_space'))
options = {opt.attrs['id']: opt.text for opt in que.select('options > option[flag="2"]')}
# 提取答案
ans_id = que.attrs['id']
corrects = ans.select(f'[id="{ans_id}"] answers > answer')
# 执行替换
for ans in corrects:
question = question.replace(
'{{' + ans.attrs['id'] + '}}',
f' <font color="#2ed573"><b>{options[ans.text]}</b></font> '
)
output.append(question + '\n')

@ -0,0 +1,59 @@
import json
from collections import deque
from bs4 import BeautifulSoup
from loguru import logger
from .formatter import fix
from .generator import Generators
_output = deque()
# 解码题目与答案 xml
def decode(que, ans, qt_type):
getattr(Generators, f'type_{qt_type}')(que, ans, _output)
# 生成每个 paper 的答案
def unescape(node, book_id):
paper_id = node.task['paperId']
with open(f'.cache/books/{book_id}/{paper_id}.json', 'r') as fp:
task = json.load(fp)
paper = BeautifulSoup(task['paperData'], 'lxml-xml')
answer = BeautifulSoup(task['answerData'], 'lxml-xml')
questions = paper.select('element[knowledge]:has(> question_type)')
if questions:
for que in questions:
qt_type = int(que.select_one('question_type').text)
decode(que, answer, qt_type)
return True
return False
# 深搜创建目录树
def dfs(node, book_id, depth=2):
if title := node.task['name']:
logger.info(f'{". " * (depth - 1)}{title}')
title = fix(title, ('rm_head',))
_output.append(f'{"#" * depth} {title}\n')
flag = False
if 'paperId' in node.task:
flag = unescape(node, book_id)
for ch in node.children:
if dfs(ch, book_id, depth + 1):
flag = True
if not flag:
_output.pop()
return flag
def generate_md(root): # 生成答案
book_id = root.task['book_id']
for ch in root.children:
dfs(ch, book_id)
with open('.cache/answer.md', 'w', encoding='utf-8') as file:
while len(_output):
line = _output.popleft()
file.write(line + '\n')
logger.info('Done.')

@ -0,0 +1,184 @@
import asyncio
import json
import os
import pickle
from hashlib import md5
from random import random
import httpx
from loguru import logger
from .captcha import recognize
class Tree:
def __init__(self, task):
self.task = task
self.children = []
class Spider(httpx.AsyncClient):
def __init__(self):
super().__init__()
async def login(self, username, password): # 账号密码登录
logger.info('正在获取验证码...')
result = await self.get(f'http://sso.ismartlearning.cn/captcha.html?{random()}')
code = recognize(result.content)
token = md5(password.encode()).hexdigest()
info = (await self.post(
'http://sso.ismartlearning.cn/v2/tickets-v2',
data={
'username': username,
'password': md5(token.encode() + b'fa&s*l%$k!fq$k!ld@fjlk').hexdigest(),
'captcha': code
},
headers={
'X-Requested-With': 'XMLHttpRequest',
'Origin': 'http://me.ismartlearning.cn',
'Referer': 'http://me.ismartlearning.cn/'
}
)).json()
logger.debug(info['result'])
if info['result']['code'] != -26:
raise AssertionError(f'[!] 登录失败: {info["result"]["msg"]}')
return info['result']
async def get_courses(self): # 获取用户课程列表
logger.info('正在获取课程列表...')
courses = (await self.post(
'https://school.ismartlearning.cn/client/course/list-of-student?status=1',
data={
'pager.currentPage': 1,
'pager.pageSize': 32767
}
)).json()['data']
return courses['list']
async def get_books(self, course): # 获取某课程的书籍列表
logger.info('正在获取书籍列表...')
await self.get_courses() # 必须有这个请求,否则后面会报错
books = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/list-of-student',
data={
'courseId': course['courseId']
}
)).json()['data']
return books
@staticmethod
def _merge_tasks(tasks): # 将任务列表重组成树形结构
id_record = {task['id']: Tree(task) for task in tasks}
root = Tree({
'book_id': tasks[0]['book_id'],
'unitStudyPercent': 0
})
for task_id in id_record:
node = id_record[task_id]
node_name = (f'{node.task["name"]} ' if 'name' in node.task else '') + f'[id:{node.task["id"]}]'
if 'parent_id' in node.task:
if (parent_id := node.task['parent_id']) in id_record:
id_record[parent_id].children.append(node)
else:
logger.warning(f'任务已忽略(父节点不存在):{node_name}')
else:
root.children.append(node)
return root
async def get_tasks(self, book, tree=False): # 获取某书籍的任务列表
logger.info('正在获取任务列表...')
await self.post('http://school.ismartlearning.cn/client/course/textbook/chapters')
tasks = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/chapters',
data={key: book[key] for key in ('bookId', 'bookType', 'courseId')}
)).json()['data']
if tree:
return self._merge_tasks(tasks)
else:
return tasks
async def get_paper(self, paper_id): # 获取任务点信息(包括题目和答案)
ticket = (await self.post(
'http://sso.ismartlearning.cn/v1/serviceTicket',
data={
'service': 'http://xot-api.ismartlearning.cn/client/textbook/paperinfo'
}
)).json()['data']['serverTicket']
logger.debug(f'Ticket: {ticket}')
paper_info = (await self.post(
'http://xot-api.ismartlearning.cn/client/textbook/paperinfo',
data={
'paperId': paper_id
},
headers={
'Origin': 'http://me.ismartlearning.cn',
'Referer': 'http://me.ismartlearning.cn/',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Encoding': 'gzip, deflate'
},
params={
'ticket': ticket
}
)).json()['data']
return paper_info
async def download_tree(self, root):
async def download(task):
paper_id = task['paperId']
filepath = f'.cache/books/{root.task["book_id"]}/{paper_id}.json'
if os.path.exists(filepath):
return
async with limit: # 防止并发过高
result = await self.get_paper(paper_id)
result['task'] = task # 继续存入 Task
with open(filepath, 'w') as file:
json.dump(result, file)
def dfs(src):
if 'paperId' in (task := src.task):
logger.info(f'添加任务:{task["name"]}')
tasks.append(download(task))
for child in src.children:
dfs(child)
logger.info('开始下载试题及答案...')
os.makedirs(f'.cache/books/{root.task["book_id"]}', exist_ok=True)
with open(f'.cache/books/{root.task["book_id"]}/Tree.pck', 'wb') as fp:
pickle.dump(root, fp)
tasks, limit = [], asyncio.Semaphore(4)
dfs(root)
await asyncio.gather(*tasks)
logger.info('下载完成.')
async def get_user(self):
return (await self.post(
'https://school.ismartlearning.cn/client/user/student-info')
).json()
async def book_info(self, book_id):
ticket = (await self.post(
'http://sso.ismartlearning.cn/v1/serviceTicket',
data={
'service': 'http://book-api.ismartlearning.cn/client/v2/book/info'
}
)).json()['data']['serverTicket']
book_info = (await self.post(
'http://book-api.ismartlearning.cn/client/v2/book/info',
headers={
'Origin': 'http://me.ismartlearning.cn',
'Referer': 'http://me.ismartlearning.cn/',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Encoding': 'gzip, deflate'
},
params={
'ticket': ticket
},
data={
'bookId': book_id,
'bookType': 0
}
)).json()
return book_info['data']

@ -0,0 +1,9 @@
import yaml
with open('configs.yml', 'r', encoding='utf-8') as _fp:
configs = yaml.safe_load(_fp)
if __name__ == '__main__':
import json
print(json.dumps(configs, indent=4))

@ -0,0 +1,13 @@
# Todo: 每次 commit 之前务必清除账号密码
# iSmart 客户端相关配置
browser:
executable: Z:\iSmart\client\iSmart.exe # 客户端可执行文件的路径
args: # 启动 iSmart 客户端时额外提供的参数
- --disable-web-security
port: 9222 # devTools 调试端口
# 用户相关配置(务必保持账号密码与 iSmart 中已登录的相同)
user:
username: <用户名> # 手机号
password: <密码> # 密码

@ -0,0 +1,14 @@
import asyncio
from automaton import finish
async def main():
await finish()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
Loading…
Cancel
Save