You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
psrGROUP/env/Lib/site-packages/python_ipware/python_ipware.py

336 lines
12 KiB

import ipaddress
import logging
from typing import Dict, List, Optional, Tuple, Union
IpAddressType = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
OptionalIpAddressType = Optional[IpAddressType]
logger = logging.getLogger(__name__)
class IpWareMeta:
"""
A class to manage metadata extracted from HTTP request headers, primarily for identifying
the client's IP address behind load balancers and proxies.
"""
def __init__(
self, precedence: Optional[Tuple[str, ...]] = None, leftmost: bool = True
) -> None:
"""
Initializes the metadata handler with a precedence list of HTTP headers and the position of the client IP.
:param precedence: A tuple of header names that define the order to check for a valid IP.
If not provided, a default list of commonly used headers is applied.
:param leftmost: A boolean that indicates whether to use the left-most IP in the header
when multiple IPs are listed (typically the case with proxies).
"""
# Default precedence list of headers if none is provided
if precedence is None:
precedence = (
"X_FORWARDED_FOR", # Common header for proxies, load balancers, like AWS ELB. Default to the left-most IP.
"HTTP_X_FORWARDED_FOR", # Alternative header similar to `X_FORWARDED_FOR`.
"HTTP_CLIENT_IP", # Header used by some providers like Amazon EC2, Heroku.
"HTTP_X_REAL_IP", # Header used by some providers like Amazon EC2, Heroku.
"HTTP_X_FORWARDED", # Used by Squid and similar software.
"HTTP_X_CLUSTER_CLIENT_IP", # Used by Rackspace LB, Riverbed Stingray.
"HTTP_FORWARDED_FOR", # Standard header defined by RFC 7239.
"HTTP_FORWARDED", # Standard header defined by RFC 7239.
"HTTP_CF_CONNECTING_IP", # Used by CloudFlare.
"X-CLIENT-IP", # Used by Microsoft Azure.
"X-REAL-IP", # Commonly used by NGINX.
"X-CLUSTER-CLIENT-IP", # Used by Rackspace Cloud Load Balancers.
"X_FORWARDED", # Used by Squid.
"FORWARDED_FOR", # Standard header defined by RFC 7239.
"CF-CONNECTING-IP", # Used by CloudFlare.
"TRUE-CLIENT-IP", # Header for CloudFlare Enterprise.
"FASTLY-CLIENT-IP", # Used by Fastly, Firebase.
"FORWARDED", # Standard header defined by RFC 7239.
"CLIENT-IP", # Used by Akamai, Cloudflare's True-Client-IP, and Fastly's Fastly-Client-IP.
"REMOTE_ADDR", # The default IP address header (direct connection).
)
self.precedence = precedence
self.leftmost = leftmost
class IpWareIpAddress:
"""
A class for handling and parsing IP address data from HTTP requests.
"""
def extract_ipv4(self, ip_address: str) -> str:
"""
Extracts the IPv4 address from a given string that may include a port number.
Args:
ip_address (str): An IPv4 address possibly including a port (e.g., "192.168.1.1:8080").
Returns:
str: The IPv4 address without the port. Returns an empty string if input is None.
"""
if ip_address:
return ip_address.split(":")[0].strip()
return ""
def extract_ipv6(self, ip_address: str) -> str:
"""
Extracts the IPv6 address from a given string that may include a port number.
Args:
ip_address (str): An IPv6 address possibly including a port (e.g., "[2001:db8::1]:8080").
Returns:
str: The IPv6 address without brackets or port. Returns an empty string if input is None.
"""
if ip_address and "]:" in ip_address:
return ip_address.split("]:")[0].replace("[", "").strip()
return ip_address.strip() if ip_address else ""
def parse_ip_address(self, ip_str: str):
"""
Parses the given IP address string to an IPv4Address or IPv6Address object.
Args:
ip_str (str): An IP address possibly including a port.
Returns:
ipaddress.IPv4Address|ipaddress.IPv6Address|None: The parsed IP object or None if the address is invalid.
"""
try:
# First, try to parse as IPv6.
ipv6 = self.extract_ipv6(ip_str)
ip = ipaddress.IPv6Address(ipv6)
return ip.ipv4_mapped or ip
except ipaddress.AddressValueError:
try:
# If IPv6 parsing fails, try to parse as IPv4.
ipv4 = self.extract_ipv4(ip_str)
return ipaddress.IPv4Address(ipv4)
except ipaddress.AddressValueError:
# Log error if IP is invalid
# print(f"Invalid IP address: {ip_str}")
return None
def get_ips_from_string(
self,
ip_str: str,
strict: bool = False,
) -> Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]:
"""
Parses a comma-separated list of IP addresses, each possibly including a port.
Args:
ip_str (str): A comma-separated list of IP addresses or address:port entries.
strict (bool): True to bail out on first invalid ip, False to skip invalid ip
Returns:
List[ipaddress.IPv4Address|ipaddress.IPv6Address]|None: A list of IP address objects or None if any IP is invalid.
"""
ip_list = []
for ip_address in ip_str.split(","):
ip = self.parse_ip_address(ip_address.strip())
if ip:
ip_list.append(ip)
else:
if strict:
return None
if not self.leftmost:
ip_list.reverse()
return ip_list
class IpWareProxy:
"""
A class to handle proxy data from an HTTP request, including validating
proxy counts and trusted proxy lists.
"""
def __init__(
self, proxy_count: Optional[int] = None, proxy_list: Optional[List[str]] = None
) -> None:
"""
Initialize the IpWareProxy class with optional proxy count and proxy list.
Args:
proxy_count: The expected number of proxies, can be None if no specific count is enforced.
proxy_list: A list of partial IP addresses as trusted proxies, can be None.
"""
if proxy_count is not None and proxy_count < 0:
raise ValueError("proxy_count must be non-negative")
self.proxy_count = proxy_count
self.proxy_list = self._validate_proxy_list(proxy_list or [])
def _validate_proxy_list(self, proxy_list: List[str]) -> List[str]:
"""
Validates that the proxy list contains only strings.
Args:
proxy_list: A list of proxies.
Returns:
The proxy list if all items are valid strings.
Raises:
ValueError: If the proxy list is not a list of strings.
"""
if not all(isinstance(ip, str) for ip in proxy_list):
raise ValueError("All elements in the proxy list must be strings.")
return proxy_list
def is_proxy_count_valid(self, ip_list: List[str], strict: bool = False) -> bool:
"""
Validates the proxy count against a list of IP addresses.
Args:
ip_list: A list of IP addresses from the request headers.
strict: If True, the number of proxies must exactly match the proxy_count.
Returns:
True if the proxy count is valid, False otherwise.
"""
if self.proxy_count is None:
return True # No proxy count specified, so always valid.
ip_count = len(ip_list)
if strict:
return ip_count - 1 == self.proxy_count
# Exact match required, excluding client's own IP.
return ip_count - 1 >= self.proxy_count
# Allow more proxies than the count, excluding client's IP.
def is_proxy_trusted_list_valid(
self, ip_list: List[str], strict: bool = False
) -> bool:
"""
Checks if all proxies in the incoming list are trusted based on the proxy_list.
Args:
ip_list: A list of IP addresses from the request headers.
strict: If True, the number of proxies must exactly match the length of proxy_list.
Returns:
True if all proxies are trusted, False otherwise.
"""
if not self.proxy_list:
return True # No specific proxies to trust, so always valid.
ip_count = len(ip_list)
proxy_list_count = len(self.proxy_list)
if strict and ip_count - 1 != proxy_list_count:
return False # Strict mode: Exact count match required.
if ip_count - 1 < proxy_list_count:
return False # Not enough IPs to match the trusted proxies.
# Verify each proxy against the trusted list by comparing each corresponding element.
return all(
str(ip).startswith(trusted_proxy_pattern)
for ip, trusted_proxy_pattern in zip(
ip_list[-proxy_list_count:], self.proxy_list
)
)
class IpWare(IpWareMeta, IpWareProxy, IpWareIpAddress):
"""
A class that makes a best effort to determine the client's IP address.
"""
def __init__(
self,
precedence: Optional[Tuple[str, ...]] = None,
leftmost: bool = True,
proxy_count: Optional[int] = None,
proxy_list: Optional[List[str]] = None,
) -> None:
IpWareMeta.__init__(self, precedence, leftmost)
IpWareProxy.__init__(self, proxy_count, proxy_list)
def get_meta_value(self, meta: Dict[str, str], key: str) -> str:
"""
Returns a cleaned up version of the value for a given key.
@param key: The key to look up.
@return: The value of the key or an empty string if the key is not found.
"""
meta = meta or {}
return meta.get(key, meta.get(key.replace("_", "-"), "")).strip()
def get_meta_values(self, meta: Dict[str, str]) -> List[str]:
"""
Returns a list of cleaned up values for the keys defined in 'precedence'.
@return: A list of values.
"""
meta_list: List[str] = []
for key in self.precedence:
value = self.get_meta_value(meta, key).strip()
if value:
meta_list.append(value)
return meta_list
def get_client_ip(
self,
meta: Dict[str, str],
strict: bool = False,
) -> Tuple[OptionalIpAddressType, bool]:
"""
Returns the client's IP address.
"""
loopback_list: List[IpAddressType] = []
private_list: List[OptionalIpAddressType] = []
for ip_str in self.get_meta_values(meta):
ip_list = self.get_ips_from_string(ip_str, strict)
if not ip_list:
continue
proxy_count_validated = self.is_proxy_count_valid(ip_list, strict)
if not proxy_count_validated:
continue
proxy_list_validated = self.is_proxy_trusted_list_valid(ip_list, strict)
if not proxy_list_validated:
continue
client_ip, trusted_route = self.get_best_ip(ip_list)
if client_ip is not None:
if client_ip.is_global:
return client_ip, trusted_route
if client_ip.is_loopback:
loopback_list.append(client_ip)
else:
private_list.append(client_ip)
if private_list:
return private_list[0], False
if loopback_list:
return loopback_list[0], False
return None, False
def get_best_ip(
self,
ip_list: List[IpAddressType],
) -> Tuple[OptionalIpAddressType, bool]:
"""
Determines the best possible IP address for the client from a list of IP addresses.
"""
if not ip_list:
logger.warning("Invalid IP list provided.")
return None, False
if self.proxy_list and len(self.proxy_list) > 0:
best_client_ip_index = len(self.proxy_list) + 1
best_client_ip = ip_list[-best_client_ip_index]
return best_client_ip, True
if self.proxy_count is not None:
best_client_ip_index = self.proxy_count + 1
best_client_ip = ip_list[-best_client_ip_index]
return best_client_ip, True
return ip_list[0], False