Accepted
Sifaka interacts with multiple external systems (LLM APIs, storage backends, web services) and processes user input, making it susceptible to various failure modes:
We need a comprehensive error handling strategy that:
We will implement a hierarchical exception system with structured error handling, automatic retry mechanisms, and graceful degradation strategies.
# Structured exceptions with suggestions
try:
result = await improve("text")
except ModelProviderError as e:
print(f"LLM API error: {e.message}")
print(f"Suggestion: {e.suggestion}")
print(f"Provider: {e.provider}")
print(f"Error code: {e.error_code}")
class SifakaError(Exception):
def __init__(self, message: str, suggestion: str = None):
self.message = message
self.suggestion = suggestion
super().__init__(message)
def __str__(self):
if self.suggestion:
return f"{self.message}\n💡 Suggestion: {self.suggestion}"
return self.message
@dataclass
class RetryConfig:
max_attempts: int = 3
delay: float = 1.0
backoff: float = 2.0
def calculate_delay(self, attempt: int) -> float:
return self.delay * (self.backoff ** attempt)
@with_retry(RetryConfig(max_attempts=3, delay=1.0, backoff=2.0))
async def call_llm_api(prompt: str) -> str:
# API call implementation
pass
When a critic fails:
When storage fails:
When validation fails:
When external tools fail:
class ErrorRecovery:
async def recover_from_api_failure(self, error: ModelProviderError):
if error.error_code == "rate_limit":
await asyncio.sleep(error.retry_after or 60)
return await self.retry_operation()
if error.error_code == "authentication":
await self.refresh_api_key()
return await self.retry_operation()
logger.error(
"Critic failure",
extra={
"critic": critic.name,
"error_type": type(error).__name__,
"error_code": getattr(error, 'error_code', None),
"retryable": getattr(error, 'retryable', False),
"text_length": len(text),
"iteration": result.iteration,
}
)
def validate_config(config: Config):
if config.temperature < 0 or config.temperature > 2:
raise ConfigurationError(
f"Temperature {config.temperature} is invalid",
parameter="temperature",
valid_range="0.0-2.0"
)
async def call_openai_api(prompt: str):
try:
response = await openai.ChatCompletion.acreate(...)
return response
except openai.RateLimitError as e:
raise ModelProviderError(
"Rate limit exceeded",
provider="OpenAI",
error_code="rate_limit"
) from e
async def run_critics(text: str, critics: List[Critic]) -> List[CritiqueResult]:
results = []
for critic in critics:
try:
result = await critic.critique(text)
results.append(result)
except Exception as e:
logger.warning(f"Critic {critic.name} failed: {e}")
# Continue with other critics
return results