''' 本模块用于定义云端读写的相关操作,包括上传、下载、删除等 ''' import oss2 import os import json from oss2.credentials import EnvironmentVariableCredentialsProvider # 云端存储的文件名称都是shixun_id.json,并且要保证都是有答案的. # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行 # 判断文件是否存在 # 这些key和secret都是我自己的,如果需要使用,需要自己去阿里云申请捏(云工开物神中神) # 就一个小40G的OSS存储,里面也没啥重要数据,就是用来存储答案的,hack了也没啥用 access_key_id='LTAI5t927vdUFZa9NRnWfrL3' access_key_secret='FbXoJUqe545eZhWFvADvGcFwatsGAx' bucket_name="tasks-jsons" endpoint="oss-cn-shenzhen.aliyuncs.com" bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) def is_exist(name:str) -> bool: return bucket.object_exists(name) # 如果文件存在,则下载到本地并命名为shixun_id_answer def download(name): # 将云端文件下载到本地并命名为shixun_id.json bucket.get_object_to_file(name, name) # 上传函数,用于获得答案后上传到云端,此步骤在获得答案后调用 # (to do)如果用户将答案认证为正确,则将本地json中的verified参数改为True后再上传并覆盖云端文件 def upload(name): # 上传前先检验文件中的每个键对应的值的answer键是否存在,以及其对应的值是否为空,如果为空,则不上传 with open(name,'r',encoding='utf-8') as f: data = json.load(f) for i,j in data.items(): if 'answer' not in j.keys() or j['answer'] == '': # 断言查询结果并抛出异常 assert False,'答案为空,不予上传' # 再检查文件是否存在,如果不存在,则上传 if not is_exist(name): bucket.put_object_from_file(name, name) def delete(name): # 先检查文件是否存在,如果存在,则删除 if is_exist(name): bucket.delete_object(name) if __name__ == '__main__': print('测试用') # 测试用 # print(is_exist('18503.json')) #download('18503.json') #检查云端所有文件 print([i for i in bucket.list_objects()])