|
|
"""
|
|
|
核心加密逻辑模块
|
|
|
支持 AES 对称加密和 RSA 非对称加密
|
|
|
"""
|
|
|
import os
|
|
|
import base64
|
|
|
from typing import Tuple, Optional
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
|
|
|
def generate_aes_key(key_size: int = 256) -> bytes:
|
|
|
"""
|
|
|
生成随机AES密钥
|
|
|
|
|
|
Args:
|
|
|
key_size: 密钥长度(位),支持128/192/256
|
|
|
|
|
|
Returns:
|
|
|
bytes: 随机密钥
|
|
|
"""
|
|
|
return os.urandom(key_size // 8)
|
|
|
|
|
|
|
|
|
def generate_iv(iv_size: int = 16) -> bytes:
|
|
|
"""
|
|
|
生成随机IV(初始化向量)
|
|
|
|
|
|
Args:
|
|
|
iv_size: IV长度(字节),AES通常用16
|
|
|
|
|
|
Returns:
|
|
|
bytes: 随机IV
|
|
|
"""
|
|
|
return os.urandom(iv_size)
|
|
|
|
|
|
|
|
|
def aes_encrypt_cbc(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
|
|
|
"""
|
|
|
使用AES-CBC模式加密
|
|
|
|
|
|
Args:
|
|
|
plaintext: 明文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量
|
|
|
|
|
|
Returns:
|
|
|
bytes: 密文
|
|
|
"""
|
|
|
# PKCS7 padding
|
|
|
pad_len = 16 - (len(plaintext) % 16)
|
|
|
plaintext = plaintext + bytes([pad_len]) * pad_len
|
|
|
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
|
|
encryptor = cipher.encryptor()
|
|
|
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
|
|
return ciphertext
|
|
|
|
|
|
|
|
|
def aes_decrypt_cbc(ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
|
|
|
"""
|
|
|
使用AES-CBC模式解密
|
|
|
|
|
|
Args:
|
|
|
ciphertext: 密文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量
|
|
|
|
|
|
Returns:
|
|
|
bytes: 明文
|
|
|
"""
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
|
|
decryptor = cipher.decryptor()
|
|
|
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
|
|
|
# 去除PKCS7 padding
|
|
|
pad_len = plaintext[-1]
|
|
|
plaintext = plaintext[:-pad_len]
|
|
|
return plaintext
|
|
|
|
|
|
|
|
|
def aes_encrypt_gcm(plaintext: bytes, key: bytes, iv: bytes) -> Tuple[bytes, bytes]:
|
|
|
"""
|
|
|
使用AES-GCM模式加密(带认证)
|
|
|
|
|
|
Args:
|
|
|
plaintext: 明文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量(nonce)
|
|
|
|
|
|
Returns:
|
|
|
Tuple[bytes, bytes]: (密文, 认证标签)
|
|
|
"""
|
|
|
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
|
|
|
encryptor = cipher.encryptor()
|
|
|
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
|
|
return ciphertext, encryptor.tag
|
|
|
|
|
|
|
|
|
def aes_decrypt_gcm(ciphertext: bytes, key: bytes, iv: bytes, tag: bytes) -> bytes:
|
|
|
"""
|
|
|
使用AES-GCM模式解密(带认证)
|
|
|
|
|
|
Args:
|
|
|
ciphertext: 密文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量(nonce)
|
|
|
tag: 认证标签
|
|
|
|
|
|
Returns:
|
|
|
bytes: 明文
|
|
|
|
|
|
Raises:
|
|
|
ValueError: 认证失败
|
|
|
"""
|
|
|
cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=default_backend())
|
|
|
decryptor = cipher.decryptor()
|
|
|
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
return plaintext
|
|
|
|
|
|
|
|
|
def aes_encrypt_ctr(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
|
|
|
"""
|
|
|
使用AES-CTR模式加密(流密码模式,无需padding)
|
|
|
|
|
|
Args:
|
|
|
plaintext: 明文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量(nonce)
|
|
|
|
|
|
Returns:
|
|
|
bytes: 密文
|
|
|
"""
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
|
|
|
encryptor = cipher.encryptor()
|
|
|
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
|
|
return ciphertext
|
|
|
|
|
|
|
|
|
def aes_decrypt_ctr(ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
|
|
|
"""
|
|
|
使用AES-CTR模式解密(流密码模式,无需unpadding)
|
|
|
|
|
|
Args:
|
|
|
ciphertext: 密文字节
|
|
|
key: AES密钥
|
|
|
iv: 初始化向量(nonce)
|
|
|
|
|
|
Returns:
|
|
|
bytes: 明文
|
|
|
"""
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
|
|
|
decryptor = cipher.decryptor()
|
|
|
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
return plaintext
|
|
|
|
|
|
|
|
|
def generate_rsa_keypair(key_size: int = 2048) -> Tuple[rsa.RSAPrivateKey, rsa.RSAPublicKey]:
|
|
|
"""
|
|
|
生成RSA密钥对
|
|
|
|
|
|
Args:
|
|
|
key_size: 密钥长度(位),推荐2048或4096
|
|
|
|
|
|
Returns:
|
|
|
Tuple: (私钥, 公钥)
|
|
|
"""
|
|
|
private_key = rsa.generate_private_key(
|
|
|
public_exponent=65537,
|
|
|
key_size=key_size,
|
|
|
backend=default_backend()
|
|
|
)
|
|
|
public_key = private_key.public_key()
|
|
|
return private_key, public_key
|
|
|
|
|
|
|
|
|
def rsa_encrypt(plaintext: bytes, public_key: rsa.RSAPublicKey) -> bytes:
|
|
|
"""
|
|
|
使用RSA公钥加密(OAEP填充)
|
|
|
|
|
|
Args:
|
|
|
plaintext: 明文字节(长度需小于密钥长度-padding开销)
|
|
|
public_key: RSA公钥
|
|
|
|
|
|
Returns:
|
|
|
bytes: 密文
|
|
|
"""
|
|
|
ciphertext = public_key.encrypt(
|
|
|
plaintext,
|
|
|
padding.OAEP(
|
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
|
algorithm=hashes.SHA256(),
|
|
|
label=None
|
|
|
)
|
|
|
)
|
|
|
return ciphertext
|
|
|
|
|
|
|
|
|
def rsa_decrypt(ciphertext: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
|
|
|
"""
|
|
|
使用RSA私钥解密(OAEP填充)
|
|
|
|
|
|
Args:
|
|
|
ciphertext: 密文字节
|
|
|
private_key: RSA私钥
|
|
|
|
|
|
Returns:
|
|
|
bytes: 明文
|
|
|
"""
|
|
|
plaintext = private_key.decrypt(
|
|
|
ciphertext,
|
|
|
padding.OAEP(
|
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
|
algorithm=hashes.SHA256(),
|
|
|
label=None
|
|
|
)
|
|
|
)
|
|
|
return plaintext
|
|
|
|
|
|
|
|
|
def load_public_key_from_pem(pem_path: str) -> rsa.RSAPublicKey:
|
|
|
"""
|
|
|
从PEM文件加载RSA公钥
|
|
|
|
|
|
Args:
|
|
|
pem_path: PEM文件路径
|
|
|
|
|
|
Returns:
|
|
|
RSAPublicKey: 公钥对象
|
|
|
"""
|
|
|
with open(pem_path, 'rb') as f:
|
|
|
pem_data = f.read()
|
|
|
|
|
|
public_key = serialization.load_pem_public_key(pem_data, backend=default_backend())
|
|
|
return public_key
|
|
|
|
|
|
|
|
|
def load_public_key_from_pem_string(pem_string: str) -> rsa.RSAPublicKey:
|
|
|
"""
|
|
|
从PEM格式字符串加载RSA公钥
|
|
|
|
|
|
Args:
|
|
|
pem_string: PEM格式的公钥字符串
|
|
|
|
|
|
Returns:
|
|
|
RSAPublicKey: 公钥对象
|
|
|
"""
|
|
|
pem_data = pem_string.encode('utf-8') if isinstance(pem_string, str) else pem_string
|
|
|
public_key = serialization.load_pem_public_key(pem_data, backend=default_backend())
|
|
|
return public_key
|
|
|
|
|
|
|
|
|
def load_private_key_from_pem(pem_path: str, password: Optional[bytes] = None) -> rsa.RSAPrivateKey:
|
|
|
"""
|
|
|
从PEM文件加载RSA私钥
|
|
|
|
|
|
Args:
|
|
|
pem_path: PEM文件路径
|
|
|
password: 私钥密码(如果加密)
|
|
|
|
|
|
Returns:
|
|
|
RSAPrivateKey: 私钥对象
|
|
|
"""
|
|
|
with open(pem_path, 'rb') as f:
|
|
|
pem_data = f.read()
|
|
|
|
|
|
private_key = serialization.load_pem_private_key(
|
|
|
pem_data,
|
|
|
password=password,
|
|
|
backend=default_backend()
|
|
|
)
|
|
|
return private_key
|
|
|
|
|
|
|
|
|
def save_private_key_to_pem(private_key: rsa.RSAPrivateKey, pem_path: str, password: Optional[bytes] = None):
|
|
|
"""
|
|
|
保存RSA私钥到PEM文件
|
|
|
|
|
|
Args:
|
|
|
private_key: RSA私钥
|
|
|
pem_path: 保存路径
|
|
|
password: 加密密码(可选)
|
|
|
"""
|
|
|
encryption_algorithm = serialization.BestAvailableEncryption(password) if password else serialization.NoEncryption()
|
|
|
|
|
|
pem_data = private_key.private_bytes(
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
|
encryption_algorithm=encryption_algorithm
|
|
|
)
|
|
|
|
|
|
with open(pem_path, 'wb') as f:
|
|
|
f.write(pem_data)
|
|
|
|
|
|
|
|
|
def save_public_key_to_pem(public_key: rsa.RSAPublicKey, pem_path: str):
|
|
|
"""
|
|
|
保存RSA公钥到PEM文件
|
|
|
|
|
|
Args:
|
|
|
public_key: RSA公钥
|
|
|
pem_path: 保存路径
|
|
|
"""
|
|
|
pem_data = public_key.public_bytes(
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
|
)
|
|
|
|
|
|
with open(pem_path, 'wb') as f:
|
|
|
f.write(pem_data)
|
|
|
|
|
|
|
|
|
def rsa_sign(data: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
|
|
|
"""
|
|
|
使用RSA私钥对数据进行签名(PSS填充)
|
|
|
|
|
|
Args:
|
|
|
data: 要签名的数据
|
|
|
private_key: RSA私钥
|
|
|
|
|
|
Returns:
|
|
|
bytes: 数字签名
|
|
|
"""
|
|
|
signature = private_key.sign(
|
|
|
data,
|
|
|
padding.PSS(
|
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
|
),
|
|
|
hashes.SHA256()
|
|
|
)
|
|
|
return signature
|
|
|
|
|
|
|
|
|
def rsa_verify(data: bytes, signature: bytes, public_key: rsa.RSAPublicKey) -> bool:
|
|
|
"""
|
|
|
使用RSA公钥验证签名
|
|
|
|
|
|
Args:
|
|
|
data: 原始数据
|
|
|
signature: 数字签名
|
|
|
public_key: RSA公钥
|
|
|
|
|
|
Returns:
|
|
|
bool: 签名是否有效
|
|
|
"""
|
|
|
try:
|
|
|
public_key.verify(
|
|
|
signature,
|
|
|
data,
|
|
|
padding.PSS(
|
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
|
),
|
|
|
hashes.SHA256()
|
|
|
)
|
|
|
return True
|
|
|
except Exception:
|
|
|
return False
|