-
-
Notifications
You must be signed in to change notification settings - Fork 59
fix(passthrough): track usage for non-streaming responses and extract… #332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -311,12 +311,55 @@ func (s *passthroughService) proxyPassthroughResponse(c *echo.Context, providerT | |||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| body, err := io.ReadAll(resp.Body) | ||||||
| if err != nil { | ||||||
| return handleError(c, core.NewProviderError(providerType, http.StatusBadGateway, "failed to read provider passthrough response body", err)) | ||||||
| } | ||||||
|
|
||||||
| workflow := core.GetWorkflow(c.Request().Context()) | ||||||
| if s.usageLogger != nil && s.usageLogger.Config().Enabled && (workflow == nil || workflow.UsageEnabled()) { | ||||||
| model := "" | ||||||
| if info != nil { | ||||||
| model = strings.TrimSpace(info.Model) | ||||||
| } | ||||||
| model = resolvedModelFromWorkflow(workflow, model) | ||||||
| requestID := requestIDFromContextOrHeader(c.Request()) | ||||||
| usagePath := strings.TrimSpace(c.Request().URL.Path) | ||||||
| s.logPassthroughNonStreamUsage(body, model, providerType, providerName, requestID, usagePath, c.Request().Context()) | ||||||
| } | ||||||
|
Comment on lines
+327
to
+329
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix non-stream audit path derivation to use provider endpoint, not request URL path. At Line 328, the caller passes Suggested minimal fix- usagePath := strings.TrimSpace(c.Request().URL.Path)
- s.logPassthroughNonStreamUsage(body, model, providerType, providerName, requestID, usagePath, c.Request().Context())
+ requestPath := strings.TrimSpace(c.Request().URL.Path)
+ s.logPassthroughNonStreamUsage(body, model, providerType, providerName, requestID, requestPath, endpoint, c.Request().Context())
@@
-func (s *passthroughService) logPassthroughNonStreamUsage(body []byte, model, providerType, providerName, requestID, endpoint string, ctx context.Context) {
+func (s *passthroughService) logPassthroughNonStreamUsage(body []byte, model, providerType, providerName, requestID, requestPath, providerEndpoint string, ctx context.Context) {
@@
- auditPath := passthroughStreamAuditPath(endpoint, providerType, endpoint)
+ auditPath := passthroughStreamAuditPath(requestPath, providerType, providerEndpoint)Also applies to: 341-347 🤖 Prompt for AI Agents |
||||||
|
|
||||||
| c.Response().WriteHeader(resp.StatusCode) | ||||||
| if _, err := io.Copy(c.Response(), resp.Body); err != nil { | ||||||
| if _, err := c.Response().Write(body); err != nil { | ||||||
| return err | ||||||
| } | ||||||
| if f, ok := c.Response().(http.Flusher); ok { | ||||||
| f.Flush() | ||||||
| } | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| func (s *passthroughService) logPassthroughNonStreamUsage(body []byte, model, providerType, providerName, requestID, endpoint string, ctx context.Context) { | ||||||
| if len(body) == 0 { | ||||||
| return | ||||||
| } | ||||||
|
|
||||||
| auditPath := passthroughStreamAuditPath(endpoint, providerType, endpoint) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| var pricingArgs []*core.ModelPricing | ||||||
| if s.pricingResolver != nil { | ||||||
| pricingProvider := strings.TrimSpace(providerName) | ||||||
| if pricingProvider == "" { | ||||||
| pricingProvider = strings.TrimSpace(providerType) | ||||||
| } | ||||||
| if p := s.pricingResolver.ResolvePricing(model, pricingProvider); p != nil { | ||||||
| pricingArgs = append(pricingArgs, p) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| entry := usage.ExtractFromCachedResponseBody(body, requestID, model, providerType, auditPath, "", pricingArgs...) | ||||||
| if entry == nil { | ||||||
| return | ||||||
| } | ||||||
| entry.ProviderName = strings.TrimSpace(providerName) | ||||||
| entry.UserPath = core.UserPathFromContext(ctx) | ||||||
| s.usageLogger.Write(entry) | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,10 @@ func seedRequestBodySelectorHints(req *http.Request, bodyMode core.BodyMode, env | |
| } | ||
|
|
||
| hints := peekRequestBodySelectorHints(req, requestSelectorPeekLimit) | ||
| if !hints.parsed || !hints.complete { | ||
| if !hints.parsed && !hints.complete { | ||
| if bodyMode == core.BodyModeOpaque && hints.model != "" { | ||
| core.ApplyBodySelectorHints(env, hints.model, hints.provider, hints.stream) | ||
| } | ||
| return | ||
| } | ||
|
Comment on lines
+29
to
34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The change from |
||
| core.ApplyBodySelectorHints(env, hints.model, hints.provider, hints.stream) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unbounded buffering of non-SSE passthrough responses.
Line 314 reads the full upstream body into memory with
io.ReadAll(resp.Body)and no size guard. Large passthrough payloads can cause high memory pressure or OOM under concurrency.🤖 Prompt for AI Agents