-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
175 lines (133 loc) · 4.59 KB
/
utils.py
File metadata and controls
175 lines (133 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Utility functions and shared components for api_optimizer."""
import hashlib
import json
import tiktoken
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import re
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""Count tokens in text using tiktoken.
Args:
text: The text to count tokens for
model: The model to use for tokenization
Returns:
Number of tokens in the text
"""
try:
# Map model names to encoding
if "gpt-4" in model or "gpt-3.5" in model:
encoding = tiktoken.encoding_for_model(model)
else:
# Default to cl100k_base for most modern models
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
except Exception:
# Fallback: rough estimate of 4 chars per token
return len(text) // 4
def hash_content(content: str) -> str:
"""Generate SHA256 hash of content.
Args:
content: String content to hash
Returns:
Hex digest of the hash
"""
return hashlib.sha256(content.encode()).hexdigest()
def normalize_prompt(prompt: str) -> str:
"""Normalize a prompt for consistent caching.
Args:
prompt: The prompt to normalize
Returns:
Normalized prompt string
"""
# Remove extra whitespace
normalized = re.sub(r"\s+", " ", prompt.strip())
# Lowercase for comparison
return normalized.lower()
def estimate_cost(
input_tokens: int,
output_tokens: int,
model: str,
pricing: Optional[Dict[str, Dict[str, float]]] = None
) -> float:
"""Estimate cost based on token usage.
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
model: Model name
pricing: Optional custom pricing dict
Returns:
Estimated cost in USD
"""
default_pricing = {
# OpenAI models (per 1M tokens)
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-4": {"input": 30.00, "output": 60.00},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
# Anthropic models (per 1M tokens)
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00},
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
"claude-3-sonnet-20240229": {"input": 3.00, "output": 15.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
}
prices = pricing or default_pricing
# Find matching model or use default
model_lower = model.lower()
model_prices = None
for key in prices:
if key.lower() in model_lower or model_lower in key.lower():
model_prices = prices[key]
break
if not model_prices:
# Default fallback pricing
model_prices = {"input": 1.00, "output": 2.00}
# Calculate cost (pricing is per 1M tokens)
input_cost = (input_tokens / 1_000_000) * model_prices["input"]
output_cost = (output_tokens / 1_000_000) * model_prices["output"]
return input_cost + output_cost
def format_messages(messages: List[Dict[str, str]]) -> str:
"""Format chat messages into a single string for hashing/embedding.
Args:
messages: List of message dicts with role and content
Returns:
Formatted string representation
"""
parts = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
parts.append(f"{role}: {content}")
return "\n".join(parts)
def get_timestamp() -> str:
"""Get current ISO timestamp.
Returns:
ISO formatted timestamp string
"""
return datetime.utcnow().isoformat()
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp string.
Args:
ts: ISO timestamp string
Returns:
datetime object
"""
return datetime.fromisoformat(ts)
class TokenBudget:
"""Helper class to manage token budgets."""
def __init__(self, max_tokens: int):
self.max_tokens = max_tokens
self.used_tokens = 0
@property
def remaining(self) -> int:
return max(0, self.max_tokens - self.used_tokens)
def can_afford(self, tokens: int) -> bool:
return self.remaining >= tokens
def spend(self, tokens: int) -> bool:
if self.can_afford(tokens):
self.used_tokens += tokens
return True
return False
def reset(self):
self.used_tokens = 0