You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
12 KiB

#!/usr/bin/env python
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See http://www.python.org/psf/license for licensing details.
#
"""ctypes interface to Window's certificate store
Requirements:
Windows XP, Windows Server 2003 or newer
Python 2.3+
Python 3.2+
Python 2.3 and 2.4 need ctypes 1.0.2 from
http://sourceforge.net/projects/ctypes/
"""
__all__ = ("CertSystemStore",)
import os
import shutil
import sys
import tempfile
from ctypes import WinDLL, FormatError, string_at, pointer
from ctypes import create_unicode_buffer, resize
from ctypes import Structure, POINTER, c_void_p
from ctypes.wintypes import LPCWSTR, LPSTR, DWORD, BOOL, BYTE, LPWSTR
try:
from ctypes import get_last_error
except ImportError:
from ctypes import GetLastError as get_last_error
USE_LAST_ERROR = False
else:
USE_LAST_ERROR = True
try:
from base64 import b64encode
except ImportError:
# Python 2.3
from binascii import b2a_base64
def b64encode(s):
return b2a_base64(s)[:-1]
PY3 = sys.version_info[0] == 3
HCERTSTORE = c_void_p
PCCERT_INFO = c_void_p
PCCRL_INFO = c_void_p
LPTCSTR = LPCWSTR
PKCS_7_ASN_ENCODING = 0x00010000
# enhanced key usage
CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG = 0x2
CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG = 0x4
#CRYPT_E_NOT_FOUND = 0x80092004
CRYPT_E_NOT_FOUND = -2146885628
# cert name
CERT_NAME_SIMPLE_DISPLAY_TYPE = 4
CERT_NAME_FRIENDLY_DISPLAY_TYPE = 5
CERT_NAME_ISSUER_FLAG = 0x1
# OID mapping for enhanced key usage
SERVER_AUTH = "1.3.6.1.5.5.7.3.1"
CLIENT_AUTH = "1.3.6.1.5.5.7.3.2"
CODE_SIGNING = "1.3.6.1.5.5.7.3.3"
EMAIL_PROTECTION = "1.3.6.1.5.5.7.3.4"
IPSEC_END_SYSTEM = "1.3.6.1.5.5.7.3.5"
IPSEC_TUNNEL = "1.3.6.1.5.5.7.3.6"
IPSEC_USER = "1.3.6.1.5.5.7.3.7"
TIME_STAMPING = "1.3.6.1.5.5.7.3.8"
OCSP_SIGNING = "1.3.6.1.5.5.7.3.9"
DVCS = "1.3.6.1.5.5.7.3.10"
TrustOIDs = {
SERVER_AUTH: "SERVER_AUTH",
CLIENT_AUTH: "CLIENT_AUTH",
CODE_SIGNING: "CODE_SIGNING",
EMAIL_PROTECTION: "EMAIL_PROTECTION",
IPSEC_END_SYSTEM: "IPSEC_END_SYSTEM",
IPSEC_TUNNEL: "IPSEC_TUNNEL",
IPSEC_USER: "IPSEC_USER",
TIME_STAMPING: "TIME_STAMPING",
OCSP_SIGNING: "OCSP_SIGNING",
DVCS: "DVCS",
}
def isPKCS7(value):
"""PKCS#7 check
"""
return (value & PKCS_7_ASN_ENCODING) == PKCS_7_ASN_ENCODING
class ContextStruct(Structure):
cert_type = None
__slots__ = ()
_fields_ = []
def get_encoded(self):
"""Get encoded cert as byte string
"""
pass
def encoding_type(self):
"""Get encoding type for PEM
"""
if isPKCS7(self.dwCertEncodingType):
return "PKCS7"
else:
return self.cert_type
encoding_type = property(encoding_type)
def get_pem(self):
"""Get PEM encoded cert
"""
encoding_type = self.encoding_type
b64data = b64encode(self.get_encoded()).decode("ascii")
lines = ["-----BEGIN %s-----" % encoding_type]
# split up in lines of 64 chars each
quotient, remainder = divmod(len(b64data), 64)
linecount = quotient + bool(remainder)
for i in range(linecount):
lines.append(b64data[i * 64:(i + 1) * 64])
lines.append("-----END %s-----" % encoding_type)
# trailing newline
lines.append("")
return "\n".join(lines)
class CERT_CONTEXT(ContextStruct):
"""Cert context
"""
cert_type = "CERTIFICATE"
__slots__ = ("_enhkey")
_fields_ = [
("dwCertEncodingType", DWORD),
("pbCertEncoded", POINTER(BYTE)),
("cbCertEncoded", DWORD),
("pCertInfo", PCCERT_INFO),
("hCertStore", HCERTSTORE),
]
def get_encoded(self):
return string_at(self.pbCertEncoded, self.cbCertEncoded)
def _enhkey_error(self):
err = get_last_error()
if err == CRYPT_E_NOT_FOUND:
return True
errmsg = FormatError(err)
raise OSError(err, errmsg)
def _get_enhkey(self, flag):
pCertCtx = pointer(self)
enhkey = CERT_ENHKEY_USAGE()
size = DWORD()
res = CertGetEnhancedKeyUsage(pCertCtx, flag, None, pointer(size))
if res == 0:
return self._enhkey_error()
resize(enhkey, size.value)
res = CertGetEnhancedKeyUsage(pCertCtx, flag, pointer(enhkey),
pointer(size))
if res == 0:
return self._enhkey_error()
oids = set()
for i in range(enhkey.cUsageIdentifier):
oid = enhkey.rgpszUsageIdentifier[i]
if oid:
if PY3:
oid = oid.decode("ascii")
oids.add(oid)
return oids
def enhanced_keyusage(self):
enhkey = getattr(self, "_enhkey", None)
if enhkey is not None:
return enhkey
keyusage = self._get_enhkey(CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG)
if keyusage is True:
keyusage = self._get_enhkey(CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG)
if keyusage is True:
self._enhkey = True
else:
self._enhkey = frozenset(keyusage)
return keyusage
def enhanced_keyusage_names(self):
enhkey = self.enhanced_keyusage()
if enhkey is True:
return True
result = set()
for oid in self.enhanced_keyusage():
result.add(TrustOIDs.get(oid, oid))
return result
def get_name(self, typ=CERT_NAME_SIMPLE_DISPLAY_TYPE, flag=0):
pCertCtx = pointer(self)
cbsize = CertGetNameStringW(pCertCtx, typ, flag, None, None, 0)
buf = create_unicode_buffer(cbsize)
cbsize = CertGetNameStringW(pCertCtx, typ, flag, None, buf, cbsize)
return buf.value
class CRL_CONTEXT(ContextStruct):
"""Cert revocation list context
"""
cert_type = "X509 CRL"
__slots__ = ()
_fields_ = [
("dwCertEncodingType", DWORD),
("pbCrlEncoded", POINTER(BYTE)),
("cbCrlEncoded", DWORD),
("pCrlInfo", PCCRL_INFO),
("hCertStore", HCERTSTORE),
]
def get_encoded(self):
return string_at(self.pbCrlEncoded, self.cbCrlEncoded)
class CERT_ENHKEY_USAGE(Structure):
"""Enhanced Key Usage
"""
__slots__ = ()
_fields_ = [
("cUsageIdentifier", DWORD),
("rgpszUsageIdentifier", POINTER(LPSTR))
]
if USE_LAST_ERROR:
crypt32 = WinDLL("crypt32.dll", use_last_error=True)
else:
crypt32 = WinDLL("crypt32.dll")
CertOpenSystemStore = crypt32.CertOpenSystemStoreW
CertOpenSystemStore.argtypes = [c_void_p, LPTCSTR]
CertOpenSystemStore.restype = HCERTSTORE
CertCloseStore = crypt32.CertCloseStore
CertCloseStore.argtypes = [HCERTSTORE, DWORD]
CertCloseStore.restype = BOOL
PCCERT_CONTEXT = POINTER(CERT_CONTEXT)
CertEnumCertificatesInStore = crypt32.CertEnumCertificatesInStore
CertEnumCertificatesInStore.argtypes = [HCERTSTORE, PCCERT_CONTEXT]
CertEnumCertificatesInStore.restype = PCCERT_CONTEXT
PCCRL_CONTEXT = POINTER(CRL_CONTEXT)
CertEnumCRLsInStore = crypt32.CertEnumCRLsInStore
CertEnumCRLsInStore.argtypes = [HCERTSTORE, PCCRL_CONTEXT]
CertEnumCRLsInStore.restype = PCCRL_CONTEXT
PCERT_ENHKEY_USAGE = POINTER(CERT_ENHKEY_USAGE)
CertGetEnhancedKeyUsage = crypt32.CertGetEnhancedKeyUsage
CertGetEnhancedKeyUsage.argtypes = [PCCERT_CONTEXT, DWORD, PCERT_ENHKEY_USAGE,
POINTER(DWORD)]
CertGetEnhancedKeyUsage.restype = BOOL
CertGetNameStringW = crypt32.CertGetNameStringW
CertGetNameStringW.argtypes = [PCCERT_CONTEXT, DWORD, DWORD, c_void_p,
LPWSTR, DWORD]
CertGetNameStringW.restype = DWORD
class CertSystemStore(object):
"""Wrapper for Window's cert system store
http://msdn.microsoft.com/en-us/library/windows/desktop/aa376560%28v=vs.85%29.aspx
store names
-----------
CA:
Certification authority certificates
MY:
Certs with private keys
ROOT:
Root certificates
SPC:
Software Publisher Certificate
"""
__slots__ = ("_storename", "_hStore")
def __init__(self, storename):
self._storename = storename
self._hStore = CertOpenSystemStore(None, self.storename)
if not self._hStore: # NULL ptr
self._hStore = None
errmsg = FormatError(get_last_error())
raise OSError(errmsg)
def storename(self):
"""Get store name
"""
return self._storename
storename = property(storename)
def close(self):
CertCloseStore(self._hStore, 0)
self._hStore = None
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
def itercerts(self, usage=SERVER_AUTH):
"""Iterate over certificates
"""
pCertCtx = CertEnumCertificatesInStore(self._hStore, None)
while pCertCtx:
certCtx = pCertCtx[0]
enhkey = certCtx.enhanced_keyusage()
if usage is not None:
if enhkey is True or usage in enhkey:
yield certCtx
else:
yield certCtx
pCertCtx = CertEnumCertificatesInStore(self._hStore, pCertCtx)
def itercrls(self):
"""Iterate over cert revocation lists
"""
pCrlCtx = CertEnumCRLsInStore(self._hStore, None)
while pCrlCtx:
crlCtx = pCrlCtx[0]
yield crlCtx
pCrlCtx = CertEnumCRLsInStore(self._hStore, pCrlCtx)
def __iter__(self):
for cert in self.itercerts():
yield cert
for crl in self.itercrls():
yield crl
class CertFile(object):
"""Wrapper to handle a temporary file for a CA.pem
Note: The object uses a temporary file because older Python versions have
no means to keep a tempfile after it has been closed.
Usage:
import wincertstore
import atexit
certfile = wincertstore.CertFile()
certfile.addstore("CA")
certfile.addstore("ROOT")
atexit.register(certfile.close) # cleanup and remove files on shutdown)
ca_cert = certfile.name
"""
def __init__(self, suffix="certstore"):
self._tempdir = tempfile.mkdtemp(suffix=suffix)
self._capem = os.path.join(self._tempdir, "ca.pem")
def name(self):
"""Path to CA.pem
"""
return self._capem
name = property(name)
def close(self):
shutil.rmtree(self._tempdir)
self._tempdir = None
self._capem = None
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
def addcerts(self, certs):
"""Add certs to store
"""
f = open(self._capem, "a")
try:
#f.seek(0, os.SEEK_END)
for cert in certs:
f.write(cert.get_pem())
finally:
f.close()
def addstore(self, store):
"""Add store to CertFile
:param store: either a name of a store or a CertSystemStore instance
"""
if hasattr(store, "itercerts"):
self.addcerts(store.itercerts())
else:
store = CertSystemStore(store)
try:
self.addcerts(store.itercerts())
finally:
store.close()
def read(self):
"""Read CA.pem file and return content
"""
f = open(self._capem, "r")
try:
return f.read()
finally:
f.close()
if __name__ == "__main__":
for storename in ("CA", "ROOT"):
store = CertSystemStore(storename)
try:
for cert in store.itercerts():
print(cert.get_pem().strip())
finally:
store.close()