diff --git a/.release-please-manifest.json b/.release-please-manifest.json index d871cd14..10f30916 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.2.0-alpha.94" + ".": "0.2.0" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c9a1cb5e..037dd2ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 0.2.0 (2025-09-16) + +Full Changelog: [v0.2.0-alpha.94...v0.2.0](https://github.com/openlayer-ai/openlayer-python/compare/v0.2.0-alpha.94...v0.2.0) + +### Features + +* integrated litellm for tracing ([49c1cb1](https://github.com/openlayer-ai/openlayer-python/commit/49c1cb1205f0d1e1668ef14ba8f707eb5bc79332)) + + +### Bug Fixes + +* fixed model names and OpenLayer to Openlayer. ([572a1f2](https://github.com/openlayer-ai/openlayer-python/commit/572a1f2a01a41c7ed8d3b4947c4dd8ed5f408a35)) + + +### Styles + +* lint fixes ([c138716](https://github.com/openlayer-ai/openlayer-python/commit/c1387161f2b5b3b0b7bffe78f0857e8930e3561c)) + ## 0.2.0-alpha.94 (2025-09-11) Full Changelog: [v0.2.0-alpha.93...v0.2.0-alpha.94](https://github.com/openlayer-ai/openlayer-python/compare/v0.2.0-alpha.93...v0.2.0-alpha.94) diff --git a/README.md b/README.md index 58e65627..4f32c54b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The REST API documentation can be found on [openlayer.com](https://openlayer.com ```sh # install from PyPI -pip install --pre openlayer +pip install openlayer ``` ## Usage @@ -109,7 +109,7 @@ You can enable this by installing `aiohttp`: ```sh # install from PyPI -pip install --pre openlayer[aiohttp] +pip install openlayer[aiohttp] ``` Then you can enable it by instantiating the client with `http_client=DefaultAioHttpClient()`: diff --git a/examples/tracing/litellm/litellm_tracing.ipynb b/examples/tracing/litellm/litellm_tracing.ipynb new file mode 100644 index 00000000..98254194 --- /dev/null +++ b/examples/tracing/litellm/litellm_tracing.ipynb @@ -0,0 +1,169 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/openlayer-python/blob/main/examples/tracing/litellm/litellm_tracing.ipynb)\n", + "\n", + "\n", + "# LiteLLM monitoring quickstart\n", + "\n", + "This notebook illustrates how to get started monitoring LiteLLM completions with Openlayer.\n", + "\n", + "LiteLLM provides a unified interface to call 100+ LLM APIs using the same input/output format. This integration allows you to trace and monitor completions across all supported providers through a single interface.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install openlayer litellm\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Set the environment variables\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import litellm\n", + "\n", + "# Set your API keys for the providers you want to use\n", + "os.environ[\"OPENAI_API_KEY\"] = \"YOUR_OPENAI_API_KEY_HERE\"\n", + "os.environ[\"ANTHROPIC_API_KEY\"] = \"YOUR_ANTHROPIC_API_KEY_HERE\" # Optional\n", + "os.environ[\"GROQ_API_KEY\"] = \"YOUR_GROQ_API_KEY_HERE\" # Optional\n", + "\n", + "# Openlayer env variables\n", + "os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY_HERE\"\n", + "os.environ[\"OPENLAYER_INFERENCE_PIPELINE_ID\"] = \"YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE\"\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Enable LiteLLM tracing\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openlayer.lib import trace_litellm\n", + "\n", + "# Enable tracing for all LiteLLM completions\n", + "trace_litellm()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Use LiteLLM normally - tracing happens automatically!\n", + "\n", + "### Basic completion with OpenAI\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Basic completion with OpenAI GPT-4\n", + "response = litellm.completion(\n", + " model=\"gpt-4\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": \"You are a helpful assistant.\"},\n", + " {\"role\": \"user\", \"content\": \"What is the capital of France?\"}\n", + " ],\n", + " temperature=0.7,\n", + " max_tokens=100,\n", + " inference_id=\"litellm-openai-example-1\" # Optional: custom inference ID\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Multi-provider comparison\n", + "\n", + "One of LiteLLM's key features is the ability to easily switch between providers. Let's trace completions from different providers:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test the same prompt with different models/providers\n", + "prompt = \"Explain quantum computing in simple terms.\"\n", + "messages = [{\"role\": \"user\", \"content\": prompt}]\n", + "\n", + "models_to_test = [\n", + " \"gpt-3.5-turbo\", # OpenAI\n", + " \"claude-3-haiku-20240307\", # Anthropic (if API key is set)\n", + " \"groq/llama-3.1-8b-instant\", # Groq (if API key is set)\n", + "]\n", + "\n", + "for model in models_to_test:\n", + " response = litellm.completion(\n", + " model=model,\n", + " messages=messages,\n", + " temperature=0.5,\n", + " max_tokens=150,\n", + " inference_id=f\"multi-provider-{model.replace('/', '-')}\"\n", + " )\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. View your traces\n", + "\n", + "Once you've run the examples above, you can:\n", + "\n", + "1. **Visit your OpenLayer dashboard** to see all the traced completions\n", + "2. **Analyze performance** across different models and providers\n", + "3. **Monitor costs** and token usage\n", + "4. **Debug issues** with detailed request/response logs\n", + "5. **Compare models** side-by-side\n", + "\n", + "The traces will include:\n", + "- **Request details**: Model, parameters, messages\n", + "- **Response data**: Generated content, token counts, latency\n", + "- **Provider information**: Which underlying service was used\n", + "- **Custom metadata**: Any additional context you provide\n", + "\n", + "For more information, check out:\n", + "- [OpenLayer Documentation](https://docs.openlayer.com/)\n", + "- [LiteLLM Documentation](https://docs.litellm.ai/)\n", + "- [LiteLLM Supported Models](https://docs.litellm.ai/docs/providers)\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pyproject.toml b/pyproject.toml index 17f1fd53..706c7812 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "openlayer" -version = "0.2.0-alpha.94" +version = "0.2.0" description = "The official Python library for the openlayer API" dynamic = ["readme"] license = "Apache-2.0" diff --git a/src/openlayer/_version.py b/src/openlayer/_version.py index dd6548f4..bc71f4d8 100644 --- a/src/openlayer/_version.py +++ b/src/openlayer/_version.py @@ -1,4 +1,4 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. __title__ = "openlayer" -__version__ = "0.2.0-alpha.94" # x-release-please-version +__version__ = "0.2.0" # x-release-please-version diff --git a/src/openlayer/lib/__init__.py b/src/openlayer/lib/__init__.py index abfab729..1edfb0b8 100644 --- a/src/openlayer/lib/__init__.py +++ b/src/openlayer/lib/__init__.py @@ -13,6 +13,7 @@ "trace_bedrock", "trace_oci_genai", "trace_oci", # Alias for backward compatibility + "trace_litellm", "update_current_trace", "update_current_step", # User and session context functions @@ -156,3 +157,37 @@ def trace_oci_genai(client, estimate_tokens: bool = True): # --------------------------------- OCI GenAI -------------------------------- # # Alias for backward compatibility trace_oci = trace_oci_genai + + +# --------------------------------- LiteLLM ---------------------------------- # +def trace_litellm(): + """Enable tracing for LiteLLM completions. + + This function patches litellm.completion to automatically trace all completions + made through the LiteLLM library, which provides a unified interface to 100+ LLM APIs. + + Example: + >>> import litellm + >>> from openlayer.lib import trace_litellm + >>> + >>> # Enable tracing + >>> trace_litellm() + >>> + >>> # Use LiteLLM normally - tracing happens automatically + >>> response = litellm.completion( + ... model="gpt-3.5-turbo", + ... messages=[{"role": "user", "content": "Hello!"}], + ... inference_id="custom-id-123" # Optional Openlayer parameter + ... ) + """ + # pylint: disable=import-outside-toplevel + try: + import litellm + except ImportError: + raise ImportError( + "litellm is required for LiteLLM tracing. Install with: pip install litellm" + ) + + from .integrations import litellm_tracer + + return litellm_tracer.trace_litellm() diff --git a/src/openlayer/lib/integrations/litellm_tracer.py b/src/openlayer/lib/integrations/litellm_tracer.py new file mode 100644 index 00000000..71fce20b --- /dev/null +++ b/src/openlayer/lib/integrations/litellm_tracer.py @@ -0,0 +1,705 @@ +"""Module with methods used to trace LiteLLM completions.""" + +import json +import logging +import time +from functools import wraps +from typing import Any, Dict, Iterator, Optional, Union, TYPE_CHECKING + +try: + import litellm + HAVE_LITELLM = True +except ImportError: + HAVE_LITELLM = False + +if TYPE_CHECKING: + import litellm + +from ..tracing import tracer + +logger = logging.getLogger(__name__) + + +def trace_litellm() -> None: + """Patch the litellm.completion function to trace completions. + + The following information is collected for each completion: + - start_time: The time when the completion was requested. + - end_time: The time when the completion was received. + - latency: The time it took to generate the completion. + - tokens: The total number of tokens used to generate the completion. + - prompt_tokens: The number of tokens in the prompt. + - completion_tokens: The number of tokens in the completion. + - model: The model used to generate the completion. + - model_parameters: The parameters used to configure the model. + - raw_output: The raw output of the model. + - inputs: The inputs used to generate the completion. + - metadata: Additional metadata about the completion. For example, the time it + took to generate the first token, when streaming. + + Returns + ------- + None + This function patches litellm.completion in place. + + Example + ------- + >>> import litellm + >>> from openlayer.lib import trace_litellm + >>> + >>> # Enable tracing + >>> trace_litellm() + >>> + >>> # Use LiteLLM normally - tracing happens automatically + >>> response = litellm.completion( + ... model="gpt-3.5-turbo", + ... messages=[{"role": "user", "content": "Hello!"}], + ... inference_id="custom-id-123" # Optional Openlayer parameter + ... ) + """ + if not HAVE_LITELLM: + raise ImportError( + "LiteLLM library is not installed. Please install it with: pip install litellm" + ) + + original_completion = litellm.completion + + @wraps(original_completion) + def traced_completion(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + stream = kwargs.get("stream", False) + + if stream: + return handle_streaming_completion( + *args, + **kwargs, + completion_func=original_completion, + inference_id=inference_id, + ) + return handle_non_streaming_completion( + *args, + **kwargs, + completion_func=original_completion, + inference_id=inference_id, + ) + + litellm.completion = traced_completion + + +def handle_streaming_completion( + completion_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Iterator[Any]: + """Handles the completion function when streaming is enabled. + + Parameters + ---------- + completion_func : callable + The completion function to handle. + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Iterator[Any] + A generator that yields the chunks of the completion. + """ + # Enable usage data in streaming by setting stream_options + # This ensures we get proper token usage data in the final chunk + # Reference: https://docs.litellm.ai/docs/completion/usage + if "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + + chunks = completion_func(*args, **kwargs) + return stream_chunks( + chunks=chunks, + kwargs=kwargs, + inference_id=inference_id, + ) + + +def stream_chunks( + chunks: Iterator[Any], + kwargs: Dict[str, any], + inference_id: Optional[str] = None, +): + """Streams the chunks of the completion and traces the completion.""" + collected_output_data = [] + collected_function_call = { + "name": "", + "arguments": "", + } + raw_outputs = [] + start_time = time.time() + end_time = None + first_token_time = None + num_of_completion_tokens = None + latency = None + model_name = kwargs.get("model", "unknown") + latest_usage_data = {"total_tokens": None, "prompt_tokens": None, "completion_tokens": None} + provider = "unknown" + latest_chunk_metadata = {} + + try: + i = 0 + for i, chunk in enumerate(chunks): + raw_outputs.append(chunk.model_dump() if hasattr(chunk, 'model_dump') else str(chunk)) + + if i == 0: + first_token_time = time.time() + # Try to detect provider from the first chunk + provider = detect_provider_from_chunk(chunk, model_name) + + # Extract usage data from this chunk if available (usually in final chunks) + chunk_usage = extract_usage_from_chunk(chunk) + if any(v is not None for v in chunk_usage.values()): + latest_usage_data = chunk_usage + + # Always update metadata from latest chunk (for cost, headers, etc.) + chunk_metadata = extract_litellm_metadata(chunk, model_name) + if chunk_metadata: + latest_chunk_metadata.update(chunk_metadata) + + if i > 0: + num_of_completion_tokens = i + 1 + + # Handle different chunk formats based on provider + delta = get_delta_from_chunk(chunk) + + if delta and hasattr(delta, 'content') and delta.content: + collected_output_data.append(delta.content) + elif delta and hasattr(delta, 'function_call') and delta.function_call: + if delta.function_call.name: + collected_function_call["name"] += delta.function_call.name + if delta.function_call.arguments: + collected_function_call["arguments"] += delta.function_call.arguments + elif delta and hasattr(delta, 'tool_calls') and delta.tool_calls: + if delta.tool_calls[0].function.name: + collected_function_call["name"] += delta.tool_calls[0].function.name + if delta.tool_calls[0].function.arguments: + collected_function_call["arguments"] += delta.tool_calls[0].function.arguments + + yield chunk + + end_time = time.time() + latency = (end_time - start_time) * 1000 + + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed to yield chunk. %s", e) + finally: + # Try to add step to the trace + try: + collected_output_data = [message for message in collected_output_data if message is not None] + if collected_output_data: + output_data = "".join(collected_output_data) + else: + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + except json.JSONDecodeError: + pass + output_data = collected_function_call + + # Post-streaming calculations (after streaming is finished) + completion_tokens_calculated, prompt_tokens_calculated, total_tokens_calculated, cost_calculated = calculate_streaming_usage_and_cost( + chunks=raw_outputs, + messages=kwargs.get("messages", []), + output_content=output_data, + model_name=model_name, + latest_usage_data=latest_usage_data, + latest_chunk_metadata=latest_chunk_metadata + ) + + # Use calculated values (fall back to extracted data if calculation fails) + usage_data = latest_usage_data if any(v is not None for v in latest_usage_data.values()) else {} + + final_prompt_tokens = prompt_tokens_calculated if prompt_tokens_calculated is not None else usage_data.get("prompt_tokens", 0) + final_completion_tokens = completion_tokens_calculated if completion_tokens_calculated is not None else usage_data.get("completion_tokens", num_of_completion_tokens) + final_total_tokens = total_tokens_calculated if total_tokens_calculated is not None else usage_data.get("total_tokens", final_prompt_tokens + final_completion_tokens) + final_cost = cost_calculated if cost_calculated is not None else latest_chunk_metadata.get('cost', None) + + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs.get("messages", [])}, + output=output_data, + latency=latency, + tokens=final_total_tokens, + prompt_tokens=final_prompt_tokens, + completion_tokens=final_completion_tokens, + model=model_name, + model_parameters=get_model_parameters(kwargs), + raw_output=raw_outputs, + id=inference_id, + cost=final_cost, # Use calculated cost + metadata={ + "timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None), + "provider": provider, + "litellm_model": model_name, + **latest_chunk_metadata, # Add all LiteLLM-specific metadata + }, + ) + add_to_trace(**trace_args) + + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the LiteLLM completion request with Openlayer. %s", + e, + ) + + +def handle_non_streaming_completion( + completion_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handles the completion function when streaming is disabled. + + Parameters + ---------- + completion_func : callable + The completion function to handle. + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Any + The completion response. + """ + start_time = time.time() + response = completion_func(*args, **kwargs) + end_time = time.time() + + # Try to add step to the trace + try: + model_name = kwargs.get("model", getattr(response, 'model', 'unknown')) + provider = detect_provider_from_response(response, model_name) + output_data = parse_non_streaming_output_data(response) + usage_data = extract_usage_from_response(response) + + # Extract additional LiteLLM metadata + extra_metadata = extract_litellm_metadata(response, model_name) + + # Extract cost from metadata + cost = extra_metadata.get('cost', None) + + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs.get("messages", [])}, + output=output_data, + latency=(end_time - start_time) * 1000, + tokens=usage_data.get("total_tokens"), + prompt_tokens=usage_data.get("prompt_tokens"), + completion_tokens=usage_data.get("completion_tokens"), + model=model_name, + model_parameters=get_model_parameters(kwargs), + raw_output=response.model_dump() if hasattr(response, 'model_dump') else str(response), + id=inference_id, + cost=cost, # Add cost as direct parameter + metadata={ + "provider": provider, + "litellm_model": model_name, + **extra_metadata, # Add all LiteLLM-specific metadata + }, + ) + + add_to_trace(**trace_args) + + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed to trace the LiteLLM completion request with Openlayer. %s", e) + + return response + + +def get_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Gets the model parameters from the kwargs.""" + return { + "temperature": kwargs.get("temperature", 1.0), + "top_p": kwargs.get("top_p", 1.0), + "max_tokens": kwargs.get("max_tokens", None), + "n": kwargs.get("n", 1), + "stream": kwargs.get("stream", False), + "stop": kwargs.get("stop", None), + "presence_penalty": kwargs.get("presence_penalty", 0.0), + "frequency_penalty": kwargs.get("frequency_penalty", 0.0), + "logit_bias": kwargs.get("logit_bias", None), + "logprobs": kwargs.get("logprobs", False), + "top_logprobs": kwargs.get("top_logprobs", None), + "parallel_tool_calls": kwargs.get("parallel_tool_calls", True), + "seed": kwargs.get("seed", None), + "response_format": kwargs.get("response_format", None), + "timeout": kwargs.get("timeout", None), + "api_base": kwargs.get("api_base", None), + "api_version": kwargs.get("api_version", None), + } + + +def create_trace_args( + end_time: float, + inputs: Dict[str, Any], + output: str, + latency: float, + tokens: int, + prompt_tokens: int, + completion_tokens: int, + model: str, + model_parameters: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + raw_output: Optional[str] = None, + id: Optional[str] = None, + cost: Optional[float] = None, +) -> Dict[str, Any]: + """Returns a dictionary with the trace arguments.""" + trace_args = { + "end_time": end_time, + "inputs": inputs, + "output": output, + "latency": latency, + "tokens": tokens, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "model": model, + "model_parameters": model_parameters, + "raw_output": raw_output, + "metadata": metadata if metadata else {}, + } + if id: + trace_args["id"] = id + if cost is not None: + trace_args["cost"] = cost + return trace_args + + +def add_to_trace(**kwargs) -> None: + """Add a chat completion step to the trace.""" + provider = kwargs.get("metadata", {}).get("provider", "LiteLLM") + tracer.add_chat_completion_step_to_trace(**kwargs, name="LiteLLM Chat Completion", provider=provider) + + +def parse_non_streaming_output_data(response: Any) -> Union[str, Dict[str, Any], None]: + """Parses the output data from a non-streaming completion. + + Parameters + ---------- + response : Any + The completion response. + + Returns + ------- + Union[str, Dict[str, Any], None] + The parsed output data. + """ + try: + if hasattr(response, 'choices') and response.choices: + choice = response.choices[0] + if hasattr(choice, 'message'): + message = choice.message + if hasattr(message, 'content') and message.content: + return message.content.strip() + elif hasattr(message, 'function_call') and message.function_call: + return { + "name": message.function_call.name, + "arguments": json.loads(message.function_call.arguments) if isinstance(message.function_call.arguments, str) else message.function_call.arguments, + } + elif hasattr(message, 'tool_calls') and message.tool_calls: + return { + "name": message.tool_calls[0].function.name, + "arguments": json.loads(message.tool_calls[0].function.arguments) if isinstance(message.tool_calls[0].function.arguments, str) else message.tool_calls[0].function.arguments, + } + except Exception as e: + logger.debug("Error parsing output data: %s", e) + + return None + + +def detect_provider_from_response(response: Any, model_name: str) -> str: + """Detect the provider from the response object.""" + try: + # First try LiteLLM's built-in provider detection + if HAVE_LITELLM: + try: + provider_info = litellm.get_llm_provider(model_name) + if provider_info and len(provider_info) > 1: + return provider_info[1] # provider_info is (model, provider, dynamic_api_key, api_base) + except Exception: + pass + + # Try to get provider from response metadata/hidden params + if hasattr(response, '_hidden_params'): + hidden_params = response._hidden_params + if 'custom_llm_provider' in hidden_params: + return hidden_params['custom_llm_provider'] + if 'litellm_provider' in hidden_params: + return hidden_params['litellm_provider'] + + # Try other response attributes + if hasattr(response, 'response_metadata') and 'provider' in response.response_metadata: + return response.response_metadata['provider'] + + # Fallback to model name detection + return detect_provider_from_model_name(model_name) + except Exception: + return "unknown" + + +def detect_provider_from_chunk(chunk: Any, model_name: str) -> str: + """Detect the provider from a streaming chunk.""" + try: + # First try LiteLLM's built-in provider detection + if HAVE_LITELLM: + try: + import litellm + provider_info = litellm.get_llm_provider(model_name) + if provider_info and len(provider_info) > 1: + return provider_info[1] + except Exception: + pass + + # Try to get provider from chunk metadata/hidden params + if hasattr(chunk, '_hidden_params'): + hidden_params = chunk._hidden_params + if 'custom_llm_provider' in hidden_params: + return hidden_params['custom_llm_provider'] + if 'litellm_provider' in hidden_params: + return hidden_params['litellm_provider'] + + # Fallback to model name detection + return detect_provider_from_model_name(model_name) + except Exception: + return "unknown" + + +def detect_provider_from_model_name(model_name: str) -> str: + """Detect provider from model name patterns.""" + model_lower = model_name.lower() + + if model_lower.startswith(('gpt-', 'o1-', 'text-davinci', 'text-curie', 'text-babbage', 'text-ada')): + return "OpenAI" + elif model_lower.startswith(('claude-', 'claude')): + return "Anthropic" + elif 'gemini' in model_lower or 'palm' in model_lower: + return "Google" + elif 'llama' in model_lower: + return "Meta" + elif model_lower.startswith('mistral'): + return "Mistral" + elif model_lower.startswith('command'): + return "Cohere" + else: + return "unknown" + + +def get_delta_from_chunk(chunk: Any) -> Any: + """Extract delta from chunk, handling different response formats.""" + try: + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta'): + return choice.delta + except Exception: + pass + return None + + +def extract_usage_from_response(response: Any) -> Dict[str, Optional[int]]: + """Extract usage data from response.""" + try: + if hasattr(response, 'usage'): + usage = response.usage + return { + "total_tokens": getattr(usage, 'total_tokens', None), + "prompt_tokens": getattr(usage, 'prompt_tokens', None), + "completion_tokens": getattr(usage, 'completion_tokens', None), + } + except Exception: + pass + + return {"total_tokens": None, "prompt_tokens": None, "completion_tokens": None} + + +def calculate_streaming_usage_and_cost(chunks, messages, output_content, model_name, latest_usage_data, latest_chunk_metadata): + """Calculate usage and cost after streaming is finished. + + With stream_options={"include_usage": True}, LiteLLM provides accurate usage data + in the final streaming chunk. This function prioritizes that data over estimation. + + Reference: https://docs.litellm.ai/docs/completion/usage + """ + try: + # Priority 1: Use actual usage data from streaming chunks (with stream_options) + if latest_usage_data and latest_usage_data.get("total_tokens") and latest_usage_data.get("total_tokens") > 0: + logger.debug("Using actual streaming usage data from chunks") + return ( + latest_usage_data.get("completion_tokens"), + latest_usage_data.get("prompt_tokens"), + latest_usage_data.get("total_tokens"), + latest_chunk_metadata.get("cost") + ) + + # Priority 2: Look for usage data in the final chunk directly + for chunk_data in reversed(chunks): # Check from the end + if isinstance(chunk_data, dict) and "usage" in chunk_data and chunk_data["usage"]: + usage = chunk_data["usage"] + if usage.get("total_tokens", 0) > 0: + logger.debug("Found usage data in final chunk: %s", usage) + return ( + usage.get("completion_tokens"), + usage.get("prompt_tokens"), + usage.get("total_tokens"), + latest_chunk_metadata.get("cost") + ) + + # Priority 3: Manual calculation as fallback + logger.debug("Falling back to manual token calculation") + completion_tokens = None + prompt_tokens = None + total_tokens = None + cost = None + + # 1. Calculate completion tokens from output content + if isinstance(output_content, str): + # Simple token estimation: ~4 characters per token (rough approximation) + completion_tokens = max(1, len(output_content) // 4) + elif isinstance(output_content, dict): + # For function calls, estimate based on JSON content length + json_str = json.dumps(output_content) if output_content else "{}" + completion_tokens = max(1, len(json_str) // 4) + else: + # Fallback: count chunks with content + completion_tokens = len([chunk for chunk in chunks if chunk]) + + # 2. Calculate prompt tokens from input messages + if messages: + # Simple estimation: sum of message content lengths + total_chars = 0 + for message in messages: + if isinstance(message, dict) and "content" in message: + total_chars += len(str(message["content"])) + prompt_tokens = max(1, total_chars // 4) + else: + prompt_tokens = 0 + + # 3. Calculate total tokens + total_tokens = (prompt_tokens or 0) + (completion_tokens or 0) + + # 4. Try to get cost from metadata or estimate + cost = latest_chunk_metadata.get("cost") + if cost is None and total_tokens and model_name: + # Simple cost estimation for gpt-3.5-turbo (if we know the model) + if "gpt-3.5-turbo" in model_name.lower(): + # Approximate cost: $0.0005 per 1K prompt tokens, $0.0015 per 1K completion tokens + estimated_cost = (prompt_tokens * 0.0005 / 1000) + (completion_tokens * 0.0015 / 1000) + cost = estimated_cost + + logger.debug( + "Calculated streaming usage: prompt=%s, completion=%s, total=%s, cost=%s", + prompt_tokens, completion_tokens, total_tokens, cost + ) + + return completion_tokens, prompt_tokens, total_tokens, cost + + except Exception as e: + logger.debug("Error calculating streaming usage: %s", e) + return None, None, None, None + + +def extract_usage_from_chunk(chunk: Any) -> Dict[str, Optional[int]]: + """Extract usage data from streaming chunk.""" + try: + # Check for usage attribute + if hasattr(chunk, 'usage') and chunk.usage is not None: + usage = chunk.usage + return { + "total_tokens": getattr(usage, 'total_tokens', None), + "prompt_tokens": getattr(usage, 'prompt_tokens', None), + "completion_tokens": getattr(usage, 'completion_tokens', None), + } + + # Check for usage in _hidden_params (LiteLLM specific) + if hasattr(chunk, '_hidden_params'): + hidden_params = chunk._hidden_params + # Check if usage is a direct attribute + if hasattr(hidden_params, 'usage') and hidden_params.usage is not None: + usage = hidden_params.usage + return { + "total_tokens": getattr(usage, 'total_tokens', None), + "prompt_tokens": getattr(usage, 'prompt_tokens', None), + "completion_tokens": getattr(usage, 'completion_tokens', None), + } + # Check if usage is a dictionary key + elif isinstance(hidden_params, dict) and 'usage' in hidden_params: + usage = hidden_params['usage'] + if usage: + return { + "total_tokens": usage.get('total_tokens', None), + "prompt_tokens": usage.get('prompt_tokens', None), + "completion_tokens": usage.get('completion_tokens', None), + } + + # Check if chunk model dump has usage + if hasattr(chunk, 'model_dump'): + chunk_dict = chunk.model_dump() + if 'usage' in chunk_dict and chunk_dict['usage']: + usage = chunk_dict['usage'] + return { + "total_tokens": usage.get('total_tokens', None), + "prompt_tokens": usage.get('prompt_tokens', None), + "completion_tokens": usage.get('completion_tokens', None), + } + except Exception: + pass + + return {"total_tokens": None, "prompt_tokens": None, "completion_tokens": None} + + +def extract_litellm_metadata(response: Any, model_name: str) -> Dict[str, Any]: + """Extract LiteLLM-specific metadata from response.""" + metadata = {} + + try: + # Extract hidden parameters + if hasattr(response, '_hidden_params'): + hidden_params = response._hidden_params + + # Cost information + if 'response_cost' in hidden_params: + metadata['cost'] = hidden_params['response_cost'] + + # API information + if 'api_base' in hidden_params: + metadata['api_base'] = hidden_params['api_base'] + if 'api_version' in hidden_params: + metadata['api_version'] = hidden_params['api_version'] + + # Model information + if 'model_info' in hidden_params: + metadata['model_info'] = hidden_params['model_info'] + + # Additional provider info + if 'additional_args' in hidden_params: + metadata['additional_args'] = hidden_params['additional_args'] + + # Extract response headers if available + if 'additional_headers' in hidden_params: + headers = hidden_params['additional_headers'] + if headers: + metadata['response_headers'] = headers + + # Extract system fingerprint if available + if hasattr(response, 'system_fingerprint'): + metadata['system_fingerprint'] = response.system_fingerprint + + # Extract response headers if available + if hasattr(response, '_response_headers'): + metadata['response_headers'] = dict(response._response_headers) + + except Exception as e: + logger.debug("Error extracting LiteLLM metadata: %s", e) + + return metadata diff --git a/tests/test_integration_conditional_imports.py b/tests/test_integration_conditional_imports.py index 88f49997..f673b480 100644 --- a/tests/test_integration_conditional_imports.py +++ b/tests/test_integration_conditional_imports.py @@ -33,6 +33,7 @@ "groq_tracer": ["groq"], "oci_tracer": ["oci"], "langchain_callback": ["langchain", "langchain_core", "langchain_community"], + "litellm_tracer": ["litellm"], } # Expected patterns for integration modules diff --git a/tests/test_litellm_integration.py b/tests/test_litellm_integration.py new file mode 100644 index 00000000..ecebbff8 --- /dev/null +++ b/tests/test_litellm_integration.py @@ -0,0 +1,277 @@ +"""Test LiteLLM integration.""" + +import builtins +from typing import Any, Dict +from unittest.mock import Mock, patch + +import pytest # type: ignore + + +class TestLiteLLMIntegration: + """Test LiteLLM integration functionality.""" + + def test_import_without_litellm(self): + """Test that the module can be imported even when LiteLLM is not available.""" + # This should not raise an ImportError + from openlayer.lib.integrations import litellm_tracer + + # The HAVE_LITELLM flag should be set correctly + assert hasattr(litellm_tracer, 'HAVE_LITELLM') + + def test_trace_litellm_raises_import_error_without_dependency(self): + """Test that trace_litellm raises ImportError when LiteLLM is not available.""" + with patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', False): + from openlayer.lib.integrations.litellm_tracer import trace_litellm + + with pytest.raises(ImportError) as exc_info: # type: ignore + trace_litellm() + + assert "LiteLLM library is not installed" in str(exc_info.value) # type: ignore + assert "pip install litellm" in str(exc_info.value) # type: ignore + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + @patch('openlayer.lib.integrations.litellm_tracer.litellm') + def test_trace_litellm_patches_completion(self, mock_litellm: Mock) -> None: + """Test that trace_litellm successfully patches litellm.completion.""" + from openlayer.lib.integrations.litellm_tracer import trace_litellm + + # Mock the original completion function + original_completion = Mock() + mock_litellm.completion = original_completion + + # Call trace_litellm + trace_litellm() + + # Verify that litellm.completion was replaced + assert mock_litellm.completion != original_completion + assert callable(mock_litellm.completion) + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_detect_provider_from_model_name(self): + """Test provider detection from model names.""" + from openlayer.lib.integrations.litellm_tracer import detect_provider_from_model_name + + test_cases = [ + ("gpt-4", "OpenAI"), + ("gpt-3.5-turbo", "OpenAI"), + ("claude-3-opus-20240229", "Anthropic"), + ("claude-3-haiku-20240307", "Anthropic"), + ("gemini-pro", "Google"), + ("llama-2-70b", "Meta"), + ("mistral-large-latest", "Mistral"), + ("command-r-plus", "Cohere"), + ("unknown-model", "unknown"), + ] + + for model_name, expected_provider in test_cases: + assert detect_provider_from_model_name(model_name) == expected_provider + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_get_model_parameters(self): + """Test model parameters extraction.""" + from openlayer.lib.integrations.litellm_tracer import get_model_parameters + + kwargs = { + "temperature": 0.8, + "top_p": 0.9, + "max_tokens": 150, + "stream": True, + "custom_param": "ignored", + } + + params = get_model_parameters(kwargs) + + expected_params = { + "temperature": 0.8, + "top_p": 0.9, + "max_tokens": 150, + "n": 1, # default value + "stream": True, + "stop": None, # default value + "presence_penalty": 0.0, # default value + "frequency_penalty": 0.0, # default value + "logit_bias": None, # default value + "logprobs": False, # default value + "top_logprobs": None, # default value + "parallel_tool_calls": True, # default value + "seed": None, # default value + "response_format": None, # default value + "timeout": None, # default value + "api_base": None, # default value + "api_version": None, # default value + } + + assert params == expected_params + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_extract_usage_from_response(self): + """Test usage extraction from response.""" + from openlayer.lib.integrations.litellm_tracer import extract_usage_from_response + + # Mock response with usage + mock_response = Mock() + mock_usage = Mock() + mock_usage.total_tokens = 100 + mock_usage.prompt_tokens = 50 + mock_usage.completion_tokens = 50 + mock_response.usage = mock_usage + + usage = extract_usage_from_response(mock_response) + + expected_usage = { + "total_tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, + } + + assert usage == expected_usage + + # Test response without usage + mock_response_no_usage = Mock(spec=[]) # No usage attribute + usage_no_data = extract_usage_from_response(mock_response_no_usage) + + expected_no_usage = { + "total_tokens": None, + "prompt_tokens": None, + "completion_tokens": None, + } + + assert usage_no_data == expected_no_usage + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_parse_non_streaming_output_data(self): + """Test parsing output data from non-streaming responses.""" + from openlayer.lib.integrations.litellm_tracer import parse_non_streaming_output_data + + # Mock response with content + mock_response = Mock() + mock_choice = Mock() + mock_message = Mock() + mock_message.content = "Hello, world!" + mock_choice.message = mock_message + mock_response.choices = [mock_choice] + + output = parse_non_streaming_output_data(mock_response) + assert output == "Hello, world!" + + # Mock response with function call + mock_response_func = Mock() + mock_choice_func = Mock() + mock_message_func = Mock() + mock_message_func.content = None + mock_function_call = Mock() + mock_function_call.name = "get_weather" + mock_function_call.arguments = '{"location": "New York"}' + mock_message_func.function_call = mock_function_call + mock_choice_func.message = mock_message_func + mock_response_func.choices = [mock_choice_func] + + output_func = parse_non_streaming_output_data(mock_response_func) + expected_func_output = { + "name": "get_weather", + "arguments": {"location": "New York"} + } + assert output_func == expected_func_output + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_create_trace_args(self): + """Test trace arguments creation.""" + from openlayer.lib.integrations.litellm_tracer import create_trace_args + + args: Dict[str, Any] = create_trace_args( + end_time=1234567890.0, + inputs={"prompt": "test"}, + output="response", + latency=1500.0, + tokens=100, + prompt_tokens=50, + completion_tokens=50, + model="gpt-4", + id="test-id" + ) + + expected_args = { + "end_time": 1234567890.0, + "inputs": {"prompt": "test"}, + "output": "response", + "latency": 1500.0, + "tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, + "model": "gpt-4", + "model_parameters": None, + "raw_output": None, + "metadata": {}, + "id": "test-id", + } + + assert args == expected_args + + def test_lib_init_trace_litellm_function_exists(self): + """Test that trace_litellm function is available in lib.__init__.""" + from openlayer.lib import trace_litellm + + assert callable(trace_litellm) + + def test_lib_init_trace_litellm_import_error(self): + """Test that lib.trace_litellm raises ImportError when litellm is not available.""" + from openlayer.lib import trace_litellm + + # Mock import to fail for litellm specifically + original_import = builtins.__import__ + def mock_import(name: str, *args: Any, **kwargs: Any) -> Any: + if name == 'litellm': + raise ImportError("No module named 'litellm'") + return original_import(name, *args, **kwargs) + + with patch('builtins.__import__', side_effect=mock_import): + with pytest.raises(ImportError) as exc_info: # type: ignore + trace_litellm() + + assert "litellm is required for LiteLLM tracing" in str(exc_info.value) # type: ignore + assert "pip install litellm" in str(exc_info.value) # type: ignore + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + def test_extract_litellm_metadata(self): + """Test extraction of LiteLLM-specific metadata.""" + from openlayer.lib.integrations.litellm_tracer import extract_litellm_metadata + + # Mock response with hidden params + mock_response = Mock() + mock_hidden_params = { + 'response_cost': 0.002, + 'api_base': 'https://api.openai.com/v1', + 'api_version': 'v1', + 'model_info': {'provider': 'openai', 'mode': 'chat'}, + 'custom_llm_provider': 'openai' + } + mock_response._hidden_params = mock_hidden_params + mock_response.system_fingerprint = 'fp_12345' + + metadata = extract_litellm_metadata(mock_response, 'gpt-4') + + expected_metadata = { + 'cost': 0.002, + 'api_base': 'https://api.openai.com/v1', + 'api_version': 'v1', + 'model_info': {'provider': 'openai', 'mode': 'chat'}, + 'system_fingerprint': 'fp_12345' + } + + assert metadata == expected_metadata + + @patch('openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM', True) + @patch('openlayer.lib.integrations.litellm_tracer.litellm') + def test_detect_provider_with_litellm_method(self, mock_litellm: Mock) -> None: + """Test provider detection using LiteLLM's get_llm_provider method.""" + from openlayer.lib.integrations.litellm_tracer import detect_provider_from_response + + # Mock LiteLLM's get_llm_provider method + mock_litellm.get_llm_provider.return_value = ('gpt-4', 'openai', None, None) + + mock_response = Mock(spec=[]) # No special attributes + + provider = detect_provider_from_response(mock_response, 'gpt-4') + + assert provider == 'openai' + mock_litellm.get_llm_provider.assert_called_once_with('gpt-4')