You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/specgen/util/rate_limiter.py

284 lines
8.9 KiB

#!/usr/bin/env python3
"""
Rate limiter utility for managing API request frequency.
Implements token bucket algorithm for rate limiting with thread safety.
"""
import time
import threading
from typing import Optional, Dict, Any
from enum import Enum
class RateLimitExceeded(Exception):
"""Raised when rate limit is exceeded"""
pass
class APIPlan(Enum):
"""Predefined API rate limit configurations"""
FREE = {"rate": 1, "burst": 5} # 1 request per second, burst 5
PRO = {"rate": 3, "burst": 10} # 3 requests per second, burst 10
ENTERPRISE = {"rate": 5, "burst": 20} # 5 requests per second, burst 20
class TokenBucketRateLimiter:
"""
Token bucket rate limiter implementation.
Args:
rate: Tokens per second (refill rate)
burst: Maximum tokens (burst capacity)
initial_tokens: Initial tokens in bucket (default: burst)
"""
def __init__(self, rate: float, burst: int, initial_tokens: Optional[int] = None):
self.rate = rate # tokens per second
self.burst = burst # bucket capacity
self.tokens = initial_tokens if initial_tokens is not None else burst
self.last_refill = time.time()
self._lock = threading.Lock()
def _refill(self):
"""Refill tokens based on elapsed time"""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.rate
self.tokens = min(self.burst, self.tokens + tokens_to_add)
self.last_refill = now
def acquire(self, timeout: Optional[float] = None) -> bool:
"""
Acquire a token from the bucket.
Args:
timeout: Maximum time to wait for a token (seconds)
None means wait indefinitely
Returns:
True if token was acquired, False if timeout occurred
"""
deadline = time.time() + timeout if timeout is not None else None
while True:
with self._lock:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
# Check timeout
if deadline is not None and time.time() >= deadline:
return False
# Sleep to avoid busy waiting
sleep_time = min(0.1, 1.0 / self.rate)
time.sleep(sleep_time)
def can_acquire(self) -> bool:
"""Check if a token is available without blocking"""
with self._lock:
self._refill()
return self.tokens >= 1
def try_acquire(self) -> bool:
"""
Try to acquire a token without blocking.
Returns:
True if token was acquired, False if no token available
"""
with self._lock:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def acquire_batch(self, count: int, timeout: Optional[float] = None) -> bool:
"""
Acquire multiple tokens at once.
Args:
count: Number of tokens to acquire
timeout: Maximum time to wait for tokens (seconds)
Returns:
True if all tokens were acquired, False if timeout occurred
"""
if count <= 0:
return True
if count > self.burst:
return False # Cannot acquire more than burst capacity
deadline = time.time() + timeout if timeout is not None else None
while True:
with self._lock:
self._refill()
if self.tokens >= count:
self.tokens -= count
return True
# Check timeout
if deadline is not None and time.time() >= deadline:
return False
# Sleep to avoid busy waiting
sleep_time = min(0.1, count / self.rate)
time.sleep(sleep_time)
def try_acquire_batch(self, count: int) -> bool:
"""
Try to acquire multiple tokens without blocking.
Args:
count: Number of tokens to acquire
Returns:
True if all tokens were acquired, False if insufficient tokens
"""
if count <= 0:
return True
if count > self.burst:
return False
with self._lock:
self._refill()
if self.tokens >= count:
self.tokens -= count
return True
return False
def get_status(self) -> Dict[str, Any]:
"""Get current limiter status"""
with self._lock:
self._refill()
return {
"tokens_available": self.tokens,
"tokens_capacity": self.burst,
"refill_rate": self.rate,
"tokens_needed": 1,
"wait_time": (1.0 / self.rate) if self.tokens < 1 else 0.0
}
def get_detailed_status(self) -> Dict[str, Any]:
"""Get detailed limiter status with additional metrics"""
with self._lock:
self._refill()
current_time = time.time()
time_since_refill = current_time - self.last_refill
return {
"tokens_available": self.tokens,
"tokens_capacity": self.burst,
"refill_rate": self.rate,
"tokens_needed": 1,
"wait_time": max(0.0, (1.0 - self.tokens) / self.rate),
"time_since_refill": time_since_refill,
"next_refill_tokens": min(self.burst, self.tokens + time_since_refill * self.rate),
"utilization_percent": (self.burst - self.tokens) / self.burst * 100
}
def wait_for_token(self):
"""Wait until a token is available"""
self.acquire(timeout=None)
def wait_for_batch(self, count: int):
"""Wait until the specified number of tokens are available"""
self.acquire_batch(count, timeout=None)
class AsyncRateLimiter:
"""Placeholder for async rate limiter implementation"""
pass
class RateLimiterFactory:
"""Factory for creating rate limiters with predefined configurations"""
@staticmethod
def create(plan: APIPlan) -> TokenBucketRateLimiter:
"""Create a rate limiter for a specific API plan"""
config = plan.value
return TokenBucketRateLimiter(
rate=config["rate"],
burst=config["burst"]
)
@staticmethod
def create_custom(rate: float, burst: int) -> TokenBucketRateLimiter:
"""Create a custom rate limiter"""
return TokenBucketRateLimiter(rate=rate, burst=burst)
@staticmethod
def create_conservative() -> TokenBucketRateLimiter:
"""Create a conservative rate limiter for safety"""
return TokenBucketRateLimiter(rate=0.5, burst=2) # 1 request every 2 seconds
@staticmethod
def create_for_deepseek() -> TokenBucketRateLimiter:
"""Create rate limiter optimized for DeepSeek API"""
return TokenBucketRateLimiter(rate=2, burst=8) # 2 requests per second
class RequestMonitor:
"""Monitor request patterns and suggest optimal rate limit settings"""
def __init__(self, window_size: int = 60):
self.window_size = window_size
self.requests = []
self._lock = threading.Lock()
def record_request(self, timestamp: Optional[float] = None):
"""Record a request at the given timestamp"""
if timestamp is None:
timestamp = time.time()
with self._lock:
self.requests.append(timestamp)
# Keep only requests within the window
cutoff = timestamp - self.window_size
self.requests = [req_time for req_time in self.requests if req_time > cutoff]
def get_request_rate(self) -> float:
"""Get current request rate (requests per second)"""
with self._lock:
if len(self.requests) < 2:
return 0.0
time_span = self.requests[-1] - self.requests[0]
if time_span == 0:
return float('inf')
return (len(self.requests) - 1) / time_span
def suggest_rate_limit(self) -> float:
"""Suggest an optimal rate limit based on current usage"""
current_rate = self.get_request_rate()
# Add 20% buffer to current rate
suggested = current_rate * 1.2
# Apply minimum and maximum limits
return max(0.1, min(10.0, suggested))
def get_stats(self) -> Dict[str, Any]:
"""Get monitoring statistics"""
with self._lock:
current_time = time.time()
cutoff = current_time - self.window_size
recent_requests = [req_time for req_time in self.requests if req_time > cutoff]
return {
"total_requests": len(recent_requests),
"window_size_seconds": self.window_size,
"current_rate_rps": self.get_request_rate(),
"suggested_rate_limit": self.suggest_rate_limit()
}