You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
9.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
核心加密逻辑模块
支持 AES 对称加密和 RSA 非对称加密
"""
import os
import base64
from typing import Tuple, Optional
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
def generate_aes_key(key_size: int = 256) -> bytes:
"""
生成随机AES密钥
Args:
key_size: 密钥长度支持128/192/256
Returns:
bytes: 随机密钥
"""
return os.urandom(key_size // 8)
def generate_iv(iv_size: int = 16) -> bytes:
"""
生成随机IV初始化向量
Args:
iv_size: IV长度字节AES通常用16
Returns:
bytes: 随机IV
"""
return os.urandom(iv_size)
def aes_encrypt_cbc(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
"""
使用AES-CBC模式加密
Args:
plaintext: 明文字节
key: AES密钥
iv: 初始化向量
Returns:
bytes: 密文
"""
# PKCS7 padding
pad_len = 16 - (len(plaintext) % 16)
plaintext = plaintext + bytes([pad_len]) * pad_len
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext
def aes_decrypt_cbc(ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
"""
使用AES-CBC模式解密
Args:
ciphertext: 密文字节
key: AES密钥
iv: 初始化向量
Returns:
bytes: 明文
"""
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# 去除PKCS7 padding
pad_len = plaintext[-1]
plaintext = plaintext[:-pad_len]
return plaintext
def aes_encrypt_gcm(plaintext: bytes, key: bytes, iv: bytes) -> Tuple[bytes, bytes]:
"""
使用AES-GCM模式加密带认证
Args:
plaintext: 明文字节
key: AES密钥
iv: 初始化向量nonce
Returns:
Tuple[bytes, bytes]: (密文, 认证标签)
"""
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext, encryptor.tag
def aes_decrypt_gcm(ciphertext: bytes, key: bytes, iv: bytes, tag: bytes) -> bytes:
"""
使用AES-GCM模式解密带认证
Args:
ciphertext: 密文字节
key: AES密钥
iv: 初始化向量nonce
tag: 认证标签
Returns:
bytes: 明文
Raises:
ValueError: 认证失败
"""
cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def aes_encrypt_ctr(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
"""
使用AES-CTR模式加密流密码模式无需padding
Args:
plaintext: 明文字节
key: AES密钥
iv: 初始化向量nonce
Returns:
bytes: 密文
"""
cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext
def aes_decrypt_ctr(ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
"""
使用AES-CTR模式解密流密码模式无需unpadding
Args:
ciphertext: 密文字节
key: AES密钥
iv: 初始化向量nonce
Returns:
bytes: 明文
"""
cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def generate_rsa_keypair(key_size: int = 2048) -> Tuple[rsa.RSAPrivateKey, rsa.RSAPublicKey]:
"""
生成RSA密钥对
Args:
key_size: 密钥长度推荐2048或4096
Returns:
Tuple: (私钥, 公钥)
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
public_key = private_key.public_key()
return private_key, public_key
def rsa_encrypt(plaintext: bytes, public_key: rsa.RSAPublicKey) -> bytes:
"""
使用RSA公钥加密OAEP填充
Args:
plaintext: 明文字节(长度需小于密钥长度-padding开销
public_key: RSA公钥
Returns:
bytes: 密文
"""
ciphertext = public_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def rsa_decrypt(ciphertext: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
"""
使用RSA私钥解密OAEP填充
Args:
ciphertext: 密文字节
private_key: RSA私钥
Returns:
bytes: 明文
"""
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
def load_public_key_from_pem(pem_path: str) -> rsa.RSAPublicKey:
"""
从PEM文件加载RSA公钥
Args:
pem_path: PEM文件路径
Returns:
RSAPublicKey: 公钥对象
"""
with open(pem_path, 'rb') as f:
pem_data = f.read()
public_key = serialization.load_pem_public_key(pem_data, backend=default_backend())
return public_key
def load_public_key_from_pem_string(pem_string: str) -> rsa.RSAPublicKey:
"""
从PEM格式字符串加载RSA公钥
Args:
pem_string: PEM格式的公钥字符串
Returns:
RSAPublicKey: 公钥对象
"""
pem_data = pem_string.encode('utf-8') if isinstance(pem_string, str) else pem_string
public_key = serialization.load_pem_public_key(pem_data, backend=default_backend())
return public_key
def load_private_key_from_pem(pem_path: str, password: Optional[bytes] = None) -> rsa.RSAPrivateKey:
"""
从PEM文件加载RSA私钥
Args:
pem_path: PEM文件路径
password: 私钥密码(如果加密)
Returns:
RSAPrivateKey: 私钥对象
"""
with open(pem_path, 'rb') as f:
pem_data = f.read()
private_key = serialization.load_pem_private_key(
pem_data,
password=password,
backend=default_backend()
)
return private_key
def save_private_key_to_pem(private_key: rsa.RSAPrivateKey, pem_path: str, password: Optional[bytes] = None):
"""
保存RSA私钥到PEM文件
Args:
private_key: RSA私钥
pem_path: 保存路径
password: 加密密码(可选)
"""
encryption_algorithm = serialization.BestAvailableEncryption(password) if password else serialization.NoEncryption()
pem_data = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption_algorithm
)
with open(pem_path, 'wb') as f:
f.write(pem_data)
def save_public_key_to_pem(public_key: rsa.RSAPublicKey, pem_path: str):
"""
保存RSA公钥到PEM文件
Args:
public_key: RSA公钥
pem_path: 保存路径
"""
pem_data = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
with open(pem_path, 'wb') as f:
f.write(pem_data)
def rsa_sign(data: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
"""
使用RSA私钥对数据进行签名PSS填充
Args:
data: 要签名的数据
private_key: RSA私钥
Returns:
bytes: 数字签名
"""
signature = private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
def rsa_verify(data: bytes, signature: bytes, public_key: rsa.RSAPublicKey) -> bool:
"""
使用RSA公钥验证签名
Args:
data: 原始数据
signature: 数字签名
public_key: RSA公钥
Returns:
bool: 签名是否有效
"""
try:
public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False