Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 97 additions & 11 deletions cmd/logs_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package cmd
import (
"fmt"
"io"
"regexp"
"strings"
"time"

"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
Expand Down Expand Up @@ -593,10 +595,10 @@ func init() {

// Helper functions

// parseTimeString converts relative or absolute time to Unix timestamp in milliseconds
// parseTimeString converts relative or absolute time to Unix timestamp in milliseconds (UTC)
func parseTimeString(timeStr string) (int64, error) {
if timeStr == "now" {
return time.Now().UnixMilli(), nil
return time.Now().UTC().UnixMilli(), nil
}

// Try parsing as relative time (1h, 30m, 7d)
Expand All @@ -621,7 +623,7 @@ func parseTimeString(timeStr string) (int64, error) {
default:
return 0, fmt.Errorf("invalid time unit: %s (use s, m, h, d, or w)", unit)
}
return time.Now().Add(-duration).UnixMilli(), nil
return time.Now().UTC().Add(-duration).UnixMilli(), nil
}
}

Expand All @@ -634,6 +636,76 @@ func parseTimeString(timeStr string) (int64, error) {
return 0, fmt.Errorf("invalid time format: %s (use relative like '1h' or Unix timestamp)", timeStr)
}

// parseComputeString parses compute strings like "count", "avg(@duration)", "percentile(@duration, 99)"
// and returns the aggregation function and metric field
func parseComputeString(compute string) (aggregation string, metric string, err error) {
compute = strings.TrimSpace(compute)

// List of valid aggregation functions (from API error message)
validFunctions := []string{
"count", "max", "min", "avg", "sum", "median",
"cardinality", "delta", "most_frequent", "earliest",
"any", "latest", "dd_sketch", "top_n",
}

// Check for simple count
if strings.ToLower(compute) == "count" {
return "count", "", nil
}

// Parse format: function(metric) or function(metric, param)
// Examples: avg(@duration), percentile(@duration, 99), cardinality(@user.id)
re := regexp.MustCompile(`^(\w+)\(([^,)]+)(?:,\s*(\d+))?\)$`)
matches := re.FindStringSubmatch(compute)

if matches == nil {
// No parentheses - treat as a simple aggregation function
funcLower := strings.ToLower(compute)
for _, valid := range validFunctions {
if funcLower == valid {
return funcLower, "", nil
}
}
return "", "", fmt.Errorf("invalid compute format: %q\n\nExpected format:\n - count\n - function(metric) e.g. avg(@duration), sum(@bytes), cardinality(@user.id)\n - percentile(metric, N) e.g. percentile(@duration, 99)\n\nSupported functions: %s",
compute, strings.Join(validFunctions, ", "))
}

aggregation = strings.ToLower(matches[1])
metric = strings.TrimSpace(matches[2])
percentileValue := ""
if len(matches) > 3 && matches[3] != "" {
percentileValue = matches[3]
}

// Handle percentile special case: convert "percentile" to "pcNN"
if aggregation == "percentile" {
if percentileValue == "" {
return "", "", fmt.Errorf("percentile requires a percentile value: e.g. percentile(@duration, 99)")
}
aggregation = "pc" + percentileValue
}

// Validate aggregation function
isValid := false
for _, valid := range validFunctions {
if aggregation == valid {
isValid = true
break
}
}
// Also allow pcNN format (e.g., pc99, pc95, pc50)
if strings.HasPrefix(aggregation, "pc") {
isValid = true
}

if !isValid {
return "", "", fmt.Errorf("unknown aggregation function: %q\n\nSupported functions: %s, percentiles (pc50, pc75, pc90, pc95, pc99)",
aggregation, strings.Join(validFunctions, ", "))
}

return aggregation, metric, nil
}

// Implementation functions

func runLogsSearch(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -691,9 +763,9 @@ func runLogsSearch(cmd *cobra.Command, args []string) error {
if r != nil && r.Body != nil {
bodyBytes, readErr := io.ReadAll(r.Body)
if readErr == nil && len(bodyBytes) > 0 {
fromTimeObj := time.UnixMilli(fromTime)
toTimeObj := time.UnixMilli(toTime)
return fmt.Errorf("failed to search logs: %w\nStatus: %d\nAPI Response: %s\n\nRequest Details:\n- Query: %s\n- From: %s (parsed from: %s)\n- To: %s (parsed from: %s)\n- Limit: %d\n\nTroubleshooting:\n- Verify your time range is valid\n- Check that your query syntax is correct\n- Ensure you have proper permissions",
fromTimeObj := time.UnixMilli(fromTime).UTC()
toTimeObj := time.UnixMilli(toTime).UTC()
return fmt.Errorf("failed to search logs: %w\nStatus: %d\nAPI Response: %s\n\nRequest Details:\n- Query: %s\n- From: %s UTC (parsed from: %s)\n- To: %s UTC (parsed from: %s)\n- Limit: %d\n\nTroubleshooting:\n- Verify your time range is valid\n- Check that your query syntax is correct\n- Ensure you have proper permissions",
err, r.StatusCode, string(bodyBytes),
logsQuery,
fromTimeObj.Format(time.RFC3339), logsFrom,
Expand Down Expand Up @@ -910,16 +982,21 @@ func runLogsAggregate(cmd *cobra.Command, args []string) error {
return fmt.Errorf("invalid --to time: %w", err)
}

// Parse the compute string to extract aggregation and metric
aggregation, metric, err := parseComputeString(logsCompute)
if err != nil {
return fmt.Errorf("invalid --compute value: %w", err)
}

api := datadogV2.NewLogsApi(client.V2())

// Build compute aggregation
compute := datadogV2.LogsCompute{
Aggregation: datadogV2.LogsAggregationFunction(logsCompute),
Aggregation: datadogV2.LogsAggregationFunction(aggregation),
}

// Parse compute field if present (e.g., "avg(@duration)")
if logsCompute != "count" {
metric := "*"
// Add metric field if present
if metric != "" {
compute.Metric = &metric
}

Expand Down Expand Up @@ -952,7 +1029,16 @@ func runLogsAggregate(cmd *cobra.Command, args []string) error {
if r != nil && r.Body != nil {
bodyBytes, readErr := io.ReadAll(r.Body)
if readErr == nil && len(bodyBytes) > 0 {
return fmt.Errorf("failed to aggregate logs: %w\nStatus: %d\nAPI Response: %s", err, r.StatusCode, string(bodyBytes))
fromTimeObj := time.UnixMilli(fromTime).UTC()
toTimeObj := time.UnixMilli(toTime).UTC()
return fmt.Errorf("failed to aggregate logs: %w\nStatus: %d\nAPI Response: %s\n\nRequest Details:\n- Query: %s\n- Compute: %s (parsed as: aggregation=%q, metric=%q)\n- Group By: %s\n- From: %s UTC (parsed from: %s)\n- To: %s UTC (parsed from: %s)\n- Limit: %d\n\nTroubleshooting:\n- Verify the aggregation function is supported\n- Ensure the metric field exists in your logs (e.g., @duration, @bytes)\n- Check your query syntax\n- Verify your time range is valid",
err, r.StatusCode, string(bodyBytes),
logsQuery,
logsCompute, aggregation, metric,
logsGroupBy,
fromTimeObj.Format(time.RFC3339), logsFrom,
toTimeObj.Format(time.RFC3339), logsTo,
logsLimit)
}
return fmt.Errorf("failed to aggregate logs: %w (status: %d)", err, r.StatusCode)
}
Expand Down
167 changes: 167 additions & 0 deletions cmd/logs_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"fmt"
"os"
"strings"
"testing"

"github.com/DataDog/pup/pkg/client"
Expand Down Expand Up @@ -80,6 +81,172 @@ func TestParseTimeString(t *testing.T) {
}
}

func TestParseComputeString(t *testing.T) {
tests := []struct {
name string
input string
wantAggregation string
wantMetric string
wantErr bool
errContains string
}{
{
name: "count - no metric",
input: "count",
wantAggregation: "count",
wantMetric: "",
wantErr: false,
},
{
name: "count - uppercase",
input: "COUNT",
wantAggregation: "count",
wantMetric: "",
wantErr: false,
},
{
name: "avg with metric",
input: "avg(@duration)",
wantAggregation: "avg",
wantMetric: "@duration",
wantErr: false,
},
{
name: "sum with metric",
input: "sum(@bytes)",
wantAggregation: "sum",
wantMetric: "@bytes",
wantErr: false,
},
{
name: "min with metric",
input: "min(@response_time)",
wantAggregation: "min",
wantMetric: "@response_time",
wantErr: false,
},
{
name: "max with metric",
input: "max(@duration)",
wantAggregation: "max",
wantMetric: "@duration",
wantErr: false,
},
{
name: "cardinality with metric",
input: "cardinality(@user.id)",
wantAggregation: "cardinality",
wantMetric: "@user.id",
wantErr: false,
},
{
name: "percentile with metric and parameter - converts to pc99",
input: "percentile(@duration, 99)",
wantAggregation: "pc99",
wantMetric: "@duration",
wantErr: false,
},
{
name: "percentile pc95",
input: "percentile(@latency, 95)",
wantAggregation: "pc95",
wantMetric: "@latency",
wantErr: false,
},
{
name: "percentile pc50 (median)",
input: "percentile(@response_time, 50)",
wantAggregation: "pc50",
wantMetric: "@response_time",
wantErr: false,
},
{
name: "percentile without value",
input: "percentile(@duration)",
wantErr: true,
errContains: "percentile requires a percentile value",
},
{
name: "median with metric",
input: "median(@latency)",
wantAggregation: "median",
wantMetric: "@latency",
wantErr: false,
},
{
name: "metric with dots and underscores",
input: "avg(@http.response_time)",
wantAggregation: "avg",
wantMetric: "@http.response_time",
wantErr: false,
},
{
name: "whitespace handling",
input: " avg(@duration) ",
wantAggregation: "avg",
wantMetric: "@duration",
wantErr: false,
},
{
name: "invalid - unknown function",
input: "invalid(@duration)",
wantErr: true,
errContains: "unknown aggregation function",
},
{
name: "invalid - malformed",
input: "avg(@duration",
wantErr: true,
errContains: "invalid compute format",
},
{
name: "invalid - no function name",
input: "(@duration)",
wantErr: true,
errContains: "invalid compute format",
},
{
name: "invalid - empty string",
input: "",
wantErr: true,
errContains: "invalid compute format",
},
{
name: "case insensitive function",
input: "AVG(@duration)",
wantAggregation: "avg",
wantMetric: "@duration",
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAgg, gotMetric, err := parseComputeString(tt.input)

if (err != nil) != tt.wantErr {
t.Errorf("parseComputeString() error = %v, wantErr %v", err, tt.wantErr)
return
}

if tt.wantErr && tt.errContains != "" {
if err == nil || !strings.Contains(err.Error(), tt.errContains) {
t.Errorf("parseComputeString() error = %v, should contain %q", err, tt.errContains)
}
return
}

if gotAgg != tt.wantAggregation {
t.Errorf("parseComputeString() aggregation = %q, want %q", gotAgg, tt.wantAggregation)
}

if gotMetric != tt.wantMetric {
t.Errorf("parseComputeString() metric = %q, want %q", gotMetric, tt.wantMetric)
}
})
}
}

// Helper function to setup logs test client
func setupLogsTestClient(t *testing.T) func() {
t.Helper()
Expand Down