重构项目,结构基本完善

新增 | 命令行参数和选项的支持
新增 | 支持列出所学课程
新增 | 现在可以自动判断爬虫和客户端账户是否相同
优化 | 随机分数和时间的配置
master
Mufanc 3 years ago
parent 1db550bee8
commit 2d23b78ac8

@ -1,4 +1,4 @@
## iSmart 课程自动姬 v1.0.0
## iSmart 课程自动姬 v1.0.2
> <div align="center"><b>「不止于自动化,追求极致效率」</b></div><br/>
>
@ -39,14 +39,42 @@
### 使用方法
&emsp;&emsp;修改 `configs.yml` 中的账号和密码,保证与 iSmart 客户端中登录的账号一致,然后修改 iSmart 的启动快捷方式,增加参数 `--remote-debugging-port=9222`
&emsp;&emsp;修改 iSmart 的启动快捷方式,增加参数 `--remote-debugging-port=9222`(如下图),**然后启动 iSmart 客户端并保持登录**
![](images/edit-lnk.png)
&emsp;&emsp;此时运行 main.py启动 iSmart 客户端,进入某本书籍的教材学习页(如下图),脚本会自动提交成绩。
&emsp;&emsp;修改 `configs.yml` 中的账号和密码,保证与 iSmart 客户端中登录的账号一致,然后根据需要调整下方参数。在终端中执行 `py main.py -h` 可以查看更多帮助信息,这里列举几个常用命令
* 列出所有课程和书籍的详细信息
```shell
py main.py list -d
```
<br/>
* 根据书籍 id 执行刷课
```shell
py main.py flash -i 51627#7B6911511DB6B33638F6C58531D8FBD3
```
<br/>
* 根据当前打开的页面执行刷课
```shell
py main.py flash -c
```
注意如果打开的是「教材学习」页(如下图),只会刷打开的这一本书籍的任务
![](images/booklearn.png)
### 写在最后
而如果是在课程详情页面,则会对该课程下的所有书籍执行刷课:
![](images/current_course.png)
### 过滤器语法
&emsp;&emsp;该项目尚处于起步阶段,项目结构还没有完全确定下来,所以后续可能会经历多次重构。目前很多功能虽然存在于源码中,但还不完善或者未经测试,可能造成意料之外的结果,所以在使用前还请三思
* 待完善

@ -1,2 +0,0 @@
from .spider import Spider
from .devtools import Browser

@ -1,40 +1,29 @@
import asyncio
import ctypes
import json
import re
import urllib.parse as parser
import httpx
import websockets
from loguru import logger
from configs import configs
from .utils import ainput
_default_port = configs['browser']['port']
_executable = configs['browser']['executable']
_args = configs['browser']['args']
class Browser(object):
@classmethod
def connect(cls):
return cls(_default_port)
@classmethod
def launch(cls):
ctypes.windll.shell32.ShellExecuteW(
None, 'runas', _executable,
' '.join([f'--remote-debugging-port={_default_port}', *_args]),
None, 1
)
return cls(_default_port)
async def connect(cls):
browser = cls(configs['browser']['port'])
if configs['browser']['verify'] and not await browser._verify():
return None
return browser
def __init__(self, dev_port):
self.port = dev_port
async def verify(self): # 校验客户端和配置文件中的用户是否相同
async def _verify(self): # 校验客户端和配置文件中的用户是否相同
logger.info('正在校验账号...')
page = await self._any_http_page()
page = await self.wait_for_page(r'https?://.*')
user_info = json.loads((await page.eval('''
(function () {
var xhr = new XMLHttpRequest()
@ -47,47 +36,35 @@ class Browser(object):
spider_user = configs['user']['username']
if spider_user != user_info['mobile'] and spider_user != user_info['username']:
logger.warning('检测到 iSmart 客户端中登录的账号与配置文件中账号不符!')
choice = await ainput('继续使用可能会出现意料之外的问题,是否继续?[y/N]')
choice = input('继续使用可能会出现意料之外的问题,是否继续?[y/N]')
if choice.lower() != 'y':
return False
else:
logger.info('校验通过!')
return True
async def wait_for_book(self): # 等待「教材学习」页面
async def wait_for_page(self, regexp): # 等待符合条件的页面出现
async with httpx.AsyncClient() as client:
while True:
logger.info('等待「教材学习」页面...')
logger.info('等待可用页面...')
try:
pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json()
for page in pages:
if re.match(r'.*me.ismartlearning.cn/center/student/course/bookLearn.html.*', page['url']) and \
'webSocketDebuggerUrl' in page:
if re.fullmatch(regexp, page['url']) and 'webSocketDebuggerUrl' in page:
return Page(page['url'], page['webSocketDebuggerUrl'])
except httpx.ConnectError:
pass
await asyncio.sleep(2)
async def _any_http_page(self):
# noinspection PyTypeChecker
async def get_current(self):
async with httpx.AsyncClient() as client:
while True:
logger.info('等待可用页面...')
try:
pages = (await client.get(f'http://127.0.0.1:{self.port}/json')).json()
for page in pages:
if re.match(r'https?://.*', page['url']) and 'webSocketDebuggerUrl' in page:
return Page(page['url'], page['webSocketDebuggerUrl'])
except httpx.ConnectError:
pass
await asyncio.sleep(2)
async def submit(self, book_id, chapter_id, task_id, score, seconds, percent, user_id): # 提交任务点的得分
page = await self._any_http_page()
model = 'NetBrowser.submitTask("%s", "%s", "%s", 0, "%d", %d, %d, "%s");'
result = f'%7B%22studentid%22:{user_id},%22testInfo%22:%7B%22answerdata%22:%22%22,%22markdatao%22:%22%22%7D%7D'
return await page.eval(
model % (book_id, chapter_id, task_id, score, seconds, percent, result)
)
params = dict(parser.parse_qsl(parser.urlsplit(page['url']).query))
if 'bookId' in params:
return params['courseId'], params['bookId']
return params['courseId'], None
class Page(object):
@ -113,3 +90,10 @@ class Page(object):
}
)
return result['result']
async def submit(self, book_id, chapter_id, task_id, score, seconds, percent, user_id): # 提交任务点的得分
model = 'NetBrowser.submitTask("%s", "%s", "%s", 0, "%d", %d, %d, "%s");'
result = f'%7B%22studentid%22:{user_id},%22testInfo%22:%7B%22answerdata%22:%22%22,%22markdatao%22:%22%22%7D%7D'
return await self.eval(
model % (book_id, chapter_id, task_id, score, seconds, percent, result)
)

@ -5,14 +5,30 @@ import httpx
from loguru import logger
from .captcha import recognize
from ..utils import Tree
class Tree: # 任务树
def __init__(self, task):
self.task = task
self.child = []
def sort(self):
self.child.sort(
key=lambda node: node.task['displayOrder']
)
for ch in self.child:
ch.sort()
class Spider(httpx.AsyncClient):
def __init__(self):
super().__init__()
self.is_login = False
async def login(self, username, password):
if self.is_login:
return {}
self.cookies.clear() # 重置 cookies
logger.info('正在获取验证码...')
result = await self.get(f'http://sso.ismartlearning.cn/captcha.html?{random()}')
@ -34,6 +50,7 @@ class Spider(httpx.AsyncClient):
logger.debug(info['result'])
assert info['result']['code'] == -26 # 断言登录结果
self.is_login = True
return info['result']
async def get_courses(self): # 获取课程列表
@ -49,7 +66,13 @@ class Spider(httpx.AsyncClient):
async def get_books(self, course_id): # 获取某课程的书籍列表
logger.info('正在获取书籍列表...')
await self.get_courses() # 必须有这个请求,否则后面会报错
await self.post( # 必须有这个请求,否则后面会报错
'http://school.ismartlearning.cn/client/course/list-of-student?status=1',
data={
'pager.currentPage': 1,
'pager.pageSize': 32767
}
)
books = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/list-of-student',
data={
@ -60,6 +83,7 @@ class Spider(httpx.AsyncClient):
async def get_tasks(self, book_id, book_type, course_id): # 获取某书籍的任务树
logger.info('正在获取任务列表...')
await self.post('http://school.ismartlearning.cn/client/course/textbook/chapters')
tasks = (await self.post(
'http://school.ismartlearning.cn/client/course/textbook/chapters',
data={
@ -69,9 +93,11 @@ class Spider(httpx.AsyncClient):
}
)).json()['data']
id_record = {task['id']: Tree(task) for task in tasks}
book_name = (await self.book_info(book_id))['bookName']
root = Tree({
'book_id': tasks[0]['book_id'],
'unitStudyPercent': 0
'unitStudyPercent': 0,
'name': book_name
})
for task_id in id_record:
node = id_record[task_id]
@ -80,9 +106,10 @@ class Spider(httpx.AsyncClient):
if (parent_id := node.task['parent_id']) in id_record:
id_record[parent_id].child.append(node)
else:
logger.warning(f'任务已忽略(父节点不存在{node_name}')
logger.warning(f'父节点不存在:{node_name}')
else:
root.child.append(node)
root.sort()
return root
async def get_paper(self, paper_id): # 获取任务点信息(包括题目和答案)
@ -92,6 +119,7 @@ class Spider(httpx.AsyncClient):
'service': 'http://xot-api.ismartlearning.cn/client/textbook/paperinfo'
}
)).json()['data']['serverTicket']
logger.debug(f'Ticket: {ticket}')
paper_info = (await self.post(
'http://xot-api.ismartlearning.cn/client/textbook/paperinfo',
data={
@ -114,3 +142,30 @@ class Spider(httpx.AsyncClient):
return (await self.post(
'https://school.ismartlearning.cn/client/user/student-info')
).json()
async def book_info(self, book_id):
logger.info('正在获取书籍信息...')
ticket = (await self.post(
'http://sso.ismartlearning.cn/v1/serviceTicket',
data={
'service': 'http://book-api.ismartlearning.cn/client/v2/book/info'
}
)).json()['data']['serverTicket']
logger.debug(f'Ticket: {ticket}')
book_info = (await self.post(
'http://book-api.ismartlearning.cn/client/v2/book/info',
headers={
'Origin': 'http://me.ismartlearning.cn',
'Referer': 'http://me.ismartlearning.cn/',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Encoding': 'gzip, deflate'
},
params={
'ticket': ticket
},
data={
'bookId': book_id,
'bookType': 0
}
)).json()
return book_info['data']

@ -1,33 +1,123 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import json
import urllib.parse as parser
from random import uniform, randint
from bs4 import BeautifulSoup
from loguru import logger
from configs import configs
from .devtools import Browser
from .spider import Spider
PLACEHOLDER = '. '
_paper_config = configs['paper']
async def list_books(detail):
async with Spider() as spider:
await spider.login(**configs['user'])
courses = await spider.get_courses()
for cr in courses:
if detail:
hint = f'{cr["courseName"]} ({cr["teacherName"]})'
else:
hint = cr['courseName']
print(hint)
books = await spider.get_books(cr["courseId"])
for book in books:
if detail:
hint = f'> [{cr["courseId"]}#{book["bookId"]}] {book["bookName"]} ({book["percent"]}%)'
else:
hint = f'> {book["bookName"]}'
print(PLACEHOLDER + hint)
async def _random(spider, paper_id): # 随机的分数和学习时长
paper = BeautifulSoup((await spider.get_paper(paper_id))['paperData'], 'lxml-xml')
questions = paper.select('element[knowledge]:has(> question_type)')
if not questions:
return 100, 5
total = 0
score, time = 0, 0
for q in questions:
q_type = int(q.select_one('question_type').text)
q_score = float(q.select_one('question_score').text)
total += q_score
ranges = _paper_config['random-score']
if q_type <= len(ranges) and (r := ranges[q_type-1]) is not None:
if not isinstance(ranges[q_type-1], list):
r = [r, r]
score += q_score * uniform(*r)
else:
q_no = q.select_one('question_no').text
logger.warning(f'T{q_no}:未知的题型!')
if (r := _paper_config['defaults']) == 'pause':
score += float(input('请手动输入该题得分 [0-1]'))
else: # defaults
score += q_score * uniform(*r)
time += randint(*_paper_config['random-time'])
class Tree: # 任务树
def __init__(self, task):
self.task = task
self.child = []
return int(100 * score / total), time
async def ainput(prompt: str = ''):
with ThreadPoolExecutor(1, 'ainput') as executor:
return (
await asyncio.get_event_loop().run_in_executor(executor, input, prompt)
).rstrip()
async def _flash(course_id, book_id, spider):
async def dfs(node, depth=0):
task = node.task
print(PLACEHOLDER * depth + task['name'])
if _paper_config['skip-finished'] and task['unitStudyPercent'] == 100:
print(PLACEHOLDER * depth + '# Skipped')
return
if 'paperId' in task: # 如果有任务则提交
chapter_id = task['chapterId']
task_id = task['id']
score, time = await _random(spider, task['paperId'])
result = await page.submit(book_id, chapter_id, task_id, score, time, 100, user_id)
if result['wasThrown'] or not result['result']['value']:
logger.warning(f'任务 {task["name"]} 可能提交失败,请留意最终结果!')
for ch in node.child:
await dfs(ch, depth + 1)
browser = await Browser.connect()
page = await browser.wait_for_page(r'https?://pc\.ismartin\.com.*')
# With Spider
await spider.login(**configs['user'])
user_id = (await spider.user_info())['data']['uid']
async def flash_recent(): # 对当前书籍执行刷课
if configs['browser']['mode'] == 'launch':
browser = Browser.launch()
book_type = (await spider.book_info(book_id))['bookType']
root = await spider.get_tasks(book_id, book_type, course_id)
await dfs(root)
async def flash_by_id(identity):
async with Spider() as spider:
await _flash(*identity.split('#'), spider)
async def flash_current(): # 对当前课程或书籍执行刷课
browser = await Browser.connect()
course_id, book_id = await browser.get_current()
async with Spider() as spider:
if book_id:
await _flash(course_id, book_id, spider)
else:
browser = Browser.connect()
page = await browser.wait_for_book()
params = dict(parser.parse_qsl(parser.urlsplit(page.url).query))
# noinspection PyTypeChecker
book_id, course_id = params['bookId'], params['courseId']
await spider.login(**configs['user'])
books = await spider.get_books(course_id)
for book in books:
await _flash(course_id, book['bookId'], spider)
async def flash_all():
async with Spider() as spider:
await spider.login(**configs['user'])
courses = await spider.get_courses()
for course in courses:
course_id = course['courseId']
books = await spider.get_books(course_id)
for book in books:
await _flash(course_id, book['bookId'], spider)

@ -1,10 +1,13 @@
import yaml
with open('configs.yml', 'r', encoding='utf-8') as _fp:
configs = yaml.safe_load(_fp)
def update(conf):
configs.update(conf)
if __name__ == '__main__':
import json
print(json.dumps(configs, indent=4))

@ -1,17 +1,7 @@
# Todo: 每次 commit 之前务必清除账号密码
# 刷课配置
project:
skip-finished: true # 跳过已完成任务(暂不支持)
# iSmart 客户端配置
browser:
mode: connect # 连接模式( connect / launch
# 下面为 mode = launch 时的配置
executable: Z:\iSmart\client\iSmart.exe # 客户端可执行文件的路径
args: # 启动 iSmart 客户端时额外提供的参数
- --disable-web-security
port: 9222 # devTools 调试端口
port: 9222
verify: true
# 用户配置(务必保持账号密码与 iSmart 中已登录的相同)
user:
@ -20,8 +10,10 @@ user:
# 答题配置
paper:
skip-finished: true # 跳过已完成任务
# 不同题目类型的随机得分
ramdom-score: # Todo: 判断列表长度
random-score:
- 1 # 1.单选
- 1 # 2.多选
- 1 # 3.判断
@ -33,6 +25,8 @@ paper:
- [ 0.7, 1 ] # 9.口语跟读
- [ 0.7, 1 ] # 10.短文改错
- 1 # 11.选词填空
defaults: pause # 未知题型的处理方式
random-time: [ 90, 240 ] # 每道题的随机用时(秒)
-
defaults: pause # 未知题型的处理方式(暂停或使用默认得分)
# defaults: [ 1, 1 ]
random-time: [ 90, 180 ] # 每道题的随机用时(秒)

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

@ -1,23 +1,42 @@
import asyncio
import sys
from argparse import ArgumentParser
from loguru import logger
from automaton import utils
async def main():
parser = ArgumentParser('main.py')
parser.add_argument('-v', dest='LEVEL', default='warning', help='日志过滤等级,默认为 warning')
subparsers = parser.add_subparsers(help='模式选择')
parser.add_argument('-v', dest='level', action='count', help='日志过滤等级,依次为 warning, info, debug')
subparsers = parser.add_subparsers(dest='method', help='模式选择')
method_list = subparsers.add_parser('list', help='列出所有课程和书籍')
method_list.add_argument('-d', '--detail', action='store_true', help='显示详细信息')
method_flash = subparsers.add_parser('flash', help='对选定的一个或几个课程执行刷课')
target = method_flash.add_mutually_exclusive_group()
target.add_argument('-b', '--book', action='store_true', help='对当前打开的书籍执行刷课')
target.add_argument('-c', '--course', action='store_true', help='对当前打开的课程执行刷课')
target.add_argument('-a', '--all', action='store_true', help='对所有课程和书籍执行刷课')
method_flash.add_argument('-f', '--filter', help='')
method_flash.add_argument('-i', '--invert', help='过滤器反向')
target.add_argument('-i', '--id', help='直接指定书籍 id')
target.add_argument('-c', '--current', action='store_true', help='限定当前课程或书籍')
target.add_argument('-a', '--all', action='store_true', help='选择全部')
method_flash.add_argument('-f', '--filter', help='任务过滤器,设置后只刷匹配的任务(尚未实现)') # Todo: 实现这个
args = parser.parse_args()
logger.remove()
logger.add(sys.stdout, level=['WARNING', 'INFO', 'DEBUG'][args.level or 0])
parser.parse_args()
if args.method == 'list':
await utils.list_books(detail=args.detail)
elif args.method == 'flash':
if args.id:
await utils.flash_by_id(args.id)
elif args.current:
await utils.flash_current()
elif args.all:
await utils.flash_all()
if __name__ == '__main__':
loop = asyncio.new_event_loop()

Loading…
Cancel
Save