You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
284 lines
8.9 KiB
284 lines
8.9 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Rate limiter utility for managing API request frequency.
|
|
Implements token bucket algorithm for rate limiting with thread safety.
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from typing import Optional, Dict, Any
|
|
from enum import Enum
|
|
|
|
|
|
class RateLimitExceeded(Exception):
|
|
"""Raised when rate limit is exceeded"""
|
|
pass
|
|
|
|
|
|
class APIPlan(Enum):
|
|
"""Predefined API rate limit configurations"""
|
|
FREE = {"rate": 1, "burst": 5} # 1 request per second, burst 5
|
|
PRO = {"rate": 3, "burst": 10} # 3 requests per second, burst 10
|
|
ENTERPRISE = {"rate": 5, "burst": 20} # 5 requests per second, burst 20
|
|
|
|
|
|
class TokenBucketRateLimiter:
|
|
"""
|
|
Token bucket rate limiter implementation.
|
|
|
|
Args:
|
|
rate: Tokens per second (refill rate)
|
|
burst: Maximum tokens (burst capacity)
|
|
initial_tokens: Initial tokens in bucket (default: burst)
|
|
"""
|
|
|
|
def __init__(self, rate: float, burst: int, initial_tokens: Optional[int] = None):
|
|
self.rate = rate # tokens per second
|
|
self.burst = burst # bucket capacity
|
|
self.tokens = initial_tokens if initial_tokens is not None else burst
|
|
self.last_refill = time.time()
|
|
self._lock = threading.Lock()
|
|
|
|
def _refill(self):
|
|
"""Refill tokens based on elapsed time"""
|
|
now = time.time()
|
|
elapsed = now - self.last_refill
|
|
tokens_to_add = elapsed * self.rate
|
|
|
|
self.tokens = min(self.burst, self.tokens + tokens_to_add)
|
|
self.last_refill = now
|
|
|
|
def acquire(self, timeout: Optional[float] = None) -> bool:
|
|
"""
|
|
Acquire a token from the bucket.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for a token (seconds)
|
|
None means wait indefinitely
|
|
|
|
Returns:
|
|
True if token was acquired, False if timeout occurred
|
|
"""
|
|
deadline = time.time() + timeout if timeout is not None else None
|
|
|
|
while True:
|
|
with self._lock:
|
|
self._refill()
|
|
|
|
if self.tokens >= 1:
|
|
self.tokens -= 1
|
|
return True
|
|
|
|
# Check timeout
|
|
if deadline is not None and time.time() >= deadline:
|
|
return False
|
|
|
|
# Sleep to avoid busy waiting
|
|
sleep_time = min(0.1, 1.0 / self.rate)
|
|
time.sleep(sleep_time)
|
|
|
|
def can_acquire(self) -> bool:
|
|
"""Check if a token is available without blocking"""
|
|
with self._lock:
|
|
self._refill()
|
|
return self.tokens >= 1
|
|
|
|
def try_acquire(self) -> bool:
|
|
"""
|
|
Try to acquire a token without blocking.
|
|
|
|
Returns:
|
|
True if token was acquired, False if no token available
|
|
"""
|
|
with self._lock:
|
|
self._refill()
|
|
if self.tokens >= 1:
|
|
self.tokens -= 1
|
|
return True
|
|
return False
|
|
|
|
def acquire_batch(self, count: int, timeout: Optional[float] = None) -> bool:
|
|
"""
|
|
Acquire multiple tokens at once.
|
|
|
|
Args:
|
|
count: Number of tokens to acquire
|
|
timeout: Maximum time to wait for tokens (seconds)
|
|
|
|
Returns:
|
|
True if all tokens were acquired, False if timeout occurred
|
|
"""
|
|
if count <= 0:
|
|
return True
|
|
|
|
if count > self.burst:
|
|
return False # Cannot acquire more than burst capacity
|
|
|
|
deadline = time.time() + timeout if timeout is not None else None
|
|
|
|
while True:
|
|
with self._lock:
|
|
self._refill()
|
|
|
|
if self.tokens >= count:
|
|
self.tokens -= count
|
|
return True
|
|
|
|
# Check timeout
|
|
if deadline is not None and time.time() >= deadline:
|
|
return False
|
|
|
|
# Sleep to avoid busy waiting
|
|
sleep_time = min(0.1, count / self.rate)
|
|
time.sleep(sleep_time)
|
|
|
|
def try_acquire_batch(self, count: int) -> bool:
|
|
"""
|
|
Try to acquire multiple tokens without blocking.
|
|
|
|
Args:
|
|
count: Number of tokens to acquire
|
|
|
|
Returns:
|
|
True if all tokens were acquired, False if insufficient tokens
|
|
"""
|
|
if count <= 0:
|
|
return True
|
|
|
|
if count > self.burst:
|
|
return False
|
|
|
|
with self._lock:
|
|
self._refill()
|
|
if self.tokens >= count:
|
|
self.tokens -= count
|
|
return True
|
|
return False
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current limiter status"""
|
|
with self._lock:
|
|
self._refill()
|
|
return {
|
|
"tokens_available": self.tokens,
|
|
"tokens_capacity": self.burst,
|
|
"refill_rate": self.rate,
|
|
"tokens_needed": 1,
|
|
"wait_time": (1.0 / self.rate) if self.tokens < 1 else 0.0
|
|
}
|
|
|
|
def get_detailed_status(self) -> Dict[str, Any]:
|
|
"""Get detailed limiter status with additional metrics"""
|
|
with self._lock:
|
|
self._refill()
|
|
current_time = time.time()
|
|
time_since_refill = current_time - self.last_refill
|
|
|
|
return {
|
|
"tokens_available": self.tokens,
|
|
"tokens_capacity": self.burst,
|
|
"refill_rate": self.rate,
|
|
"tokens_needed": 1,
|
|
"wait_time": max(0.0, (1.0 - self.tokens) / self.rate),
|
|
"time_since_refill": time_since_refill,
|
|
"next_refill_tokens": min(self.burst, self.tokens + time_since_refill * self.rate),
|
|
"utilization_percent": (self.burst - self.tokens) / self.burst * 100
|
|
}
|
|
|
|
def wait_for_token(self):
|
|
"""Wait until a token is available"""
|
|
self.acquire(timeout=None)
|
|
|
|
def wait_for_batch(self, count: int):
|
|
"""Wait until the specified number of tokens are available"""
|
|
self.acquire_batch(count, timeout=None)
|
|
|
|
|
|
class AsyncRateLimiter:
|
|
"""Placeholder for async rate limiter implementation"""
|
|
pass
|
|
|
|
|
|
class RateLimiterFactory:
|
|
"""Factory for creating rate limiters with predefined configurations"""
|
|
|
|
@staticmethod
|
|
def create(plan: APIPlan) -> TokenBucketRateLimiter:
|
|
"""Create a rate limiter for a specific API plan"""
|
|
config = plan.value
|
|
return TokenBucketRateLimiter(
|
|
rate=config["rate"],
|
|
burst=config["burst"]
|
|
)
|
|
|
|
@staticmethod
|
|
def create_custom(rate: float, burst: int) -> TokenBucketRateLimiter:
|
|
"""Create a custom rate limiter"""
|
|
return TokenBucketRateLimiter(rate=rate, burst=burst)
|
|
|
|
@staticmethod
|
|
def create_conservative() -> TokenBucketRateLimiter:
|
|
"""Create a conservative rate limiter for safety"""
|
|
return TokenBucketRateLimiter(rate=0.5, burst=2) # 1 request every 2 seconds
|
|
|
|
@staticmethod
|
|
def create_for_deepseek() -> TokenBucketRateLimiter:
|
|
"""Create rate limiter optimized for DeepSeek API"""
|
|
return TokenBucketRateLimiter(rate=2, burst=8) # 2 requests per second
|
|
|
|
|
|
class RequestMonitor:
|
|
"""Monitor request patterns and suggest optimal rate limit settings"""
|
|
|
|
def __init__(self, window_size: int = 60):
|
|
self.window_size = window_size
|
|
self.requests = []
|
|
self._lock = threading.Lock()
|
|
|
|
def record_request(self, timestamp: Optional[float] = None):
|
|
"""Record a request at the given timestamp"""
|
|
if timestamp is None:
|
|
timestamp = time.time()
|
|
|
|
with self._lock:
|
|
self.requests.append(timestamp)
|
|
|
|
# Keep only requests within the window
|
|
cutoff = timestamp - self.window_size
|
|
self.requests = [req_time for req_time in self.requests if req_time > cutoff]
|
|
|
|
def get_request_rate(self) -> float:
|
|
"""Get current request rate (requests per second)"""
|
|
with self._lock:
|
|
if len(self.requests) < 2:
|
|
return 0.0
|
|
|
|
time_span = self.requests[-1] - self.requests[0]
|
|
if time_span == 0:
|
|
return float('inf')
|
|
|
|
return (len(self.requests) - 1) / time_span
|
|
|
|
def suggest_rate_limit(self) -> float:
|
|
"""Suggest an optimal rate limit based on current usage"""
|
|
current_rate = self.get_request_rate()
|
|
|
|
# Add 20% buffer to current rate
|
|
suggested = current_rate * 1.2
|
|
|
|
# Apply minimum and maximum limits
|
|
return max(0.1, min(10.0, suggested))
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get monitoring statistics"""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
cutoff = current_time - self.window_size
|
|
recent_requests = [req_time for req_time in self.requests if req_time > cutoff]
|
|
|
|
return {
|
|
"total_requests": len(recent_requests),
|
|
"window_size_seconds": self.window_size,
|
|
"current_rate_rps": self.get_request_rate(),
|
|
"suggested_rate_limit": self.suggest_rate_limit()
|
|
} |