From 41c1b2e9347ee2b8fde9482495ff3e75ac163132 Mon Sep 17 00:00:00 2001 From: vishwajeetpal Date: Wed, 14 Jan 2026 05:34:07 +0530 Subject: [PATCH 1/4] add: added the log to see cache inserts --- api/pkg/transformer/feast/feature_cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/pkg/transformer/feast/feature_cache.go b/api/pkg/transformer/feast/feature_cache.go index 66b9b03b2..50222365b 100644 --- a/api/pkg/transformer/feast/feature_cache.go +++ b/api/pkg/transformer/feast/feature_cache.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/caraml-dev/merlin/log" "github.com/cespare/xxhash" feast "github.com/feast-dev/feast/sdk/go" feastTypes "github.com/feast-dev/feast/sdk/go/protos/feast/types" @@ -143,6 +144,7 @@ func (fc *featureCache) insertFeaturesOfEntity(entity feast.Row, columnNames []s if err != nil { return err } + log.Debugf("insert features of key %v \n value: %v", key, cacheValue) return fc.cache.Insert(keyByte, dataByte, fc.ttl) } From 107b6b1c078440bca9f1f4c4445b358832cee665 Mon Sep 17 00:00:00 2001 From: vishwajeetpal Date: Wed, 14 Jan 2026 05:36:04 +0530 Subject: [PATCH 2/4] fix: broken pyfunc-server pipeline. --- python/pyfunc-server/docker/local.Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyfunc-server/docker/local.Dockerfile b/python/pyfunc-server/docker/local.Dockerfile index 140949472..79a733f2f 100644 --- a/python/pyfunc-server/docker/local.Dockerfile +++ b/python/pyfunc-server/docker/local.Dockerfile @@ -7,6 +7,7 @@ COPY ${MODEL_DEPENDENCIES_URL} conda.yaml ARG MERLIN_DEP_CONSTRAINT RUN process_conda_env.sh conda.yaml "merlin-pyfunc-server" "${MERLIN_DEP_CONSTRAINT}" +RUN process_conda_env.sh conda.yaml "merlin-sdk" "${MERLIN_DEP_CONSTRAINT}" RUN conda env create --name merlin-model --file conda.yaml # Download and dry-run user model artifacts and code From cd8a4d6f058f26a3f98b44304df2e449aee65224 Mon Sep 17 00:00:00 2001 From: vishwajeetpal Date: Wed, 14 Jan 2026 05:42:41 +0530 Subject: [PATCH 3/4] add: added the log to see cache inserts --- api/pkg/transformer/feast/feature_cache.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/pkg/transformer/feast/feature_cache.go b/api/pkg/transformer/feast/feature_cache.go index 50222365b..391f72a1c 100644 --- a/api/pkg/transformer/feast/feature_cache.go +++ b/api/pkg/transformer/feast/feature_cache.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/caraml-dev/merlin/log" "github.com/cespare/xxhash" feast "github.com/feast-dev/feast/sdk/go" feastTypes "github.com/feast-dev/feast/sdk/go/protos/feast/types" @@ -115,6 +114,10 @@ func (fc *featureCache) insertFeatureTable(featureTable *internalFeatureTable, p for idx, entity := range featureTable.entities { if err := fc.insertFeaturesOfEntity(entity, featureTable.columnNames, project, featureTable.valueRows[idx], featureTable.columnTypes); err != nil { errorMsgs = append(errorMsgs, fmt.Sprintf("(value: %v, with message: %v)", featureTable.valueRows[idx], err.Error())) + } else { + fmt.Printf("CACHE INSERT: entity=%v, project=%s, columns=%v, values=%v\n", + entity, project, featureTable.columnNames, featureTable.valueRows[idx]) + } } if len(errorMsgs) > 0 { @@ -144,7 +147,6 @@ func (fc *featureCache) insertFeaturesOfEntity(entity feast.Row, columnNames []s if err != nil { return err } - log.Debugf("insert features of key %v \n value: %v", key, cacheValue) return fc.cache.Insert(keyByte, dataByte, fc.ttl) } From 67d60f322ba15fd50925f1166e4bf5983e6cbf91 Mon Sep 17 00:00:00 2001 From: vishwajeetpal Date: Wed, 14 Jan 2026 13:37:57 +0530 Subject: [PATCH 4/4] add: added the log to see table join --- api/pkg/transformer/pipeline/table_join_op.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/api/pkg/transformer/pipeline/table_join_op.go b/api/pkg/transformer/pipeline/table_join_op.go index eed706e80..39325b4f8 100644 --- a/api/pkg/transformer/pipeline/table_join_op.go +++ b/api/pkg/transformer/pipeline/table_join_op.go @@ -11,6 +11,7 @@ import ( "github.com/caraml-dev/merlin/pkg/transformer/types" "github.com/caraml-dev/merlin/pkg/transformer/types/table" "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" ) type TableJoinOp struct { @@ -55,6 +56,25 @@ func (t TableJoinOp) Execute(ctx context.Context, environment *Environment) erro joinColumns = t.tableJoinSpec.OnColumns } + // Log table states (shape + column names) before the join. + // This avoids dumping full data but is enough to debug mismatched schemas or empty tables. + if environment != nil { + if ce := environment.logger.Check(zap.DebugLevel, "table_join input tables"); ce != nil { + ce.Write( + zap.String("left_table", t.tableJoinSpec.LeftTable), + zap.Int("left_nrow", leftTable.NRow()), + zap.Int("left_ncol", len(leftTable.ColumnNames())), + zap.Strings("left_columns", leftTable.ColumnNames()), + zap.String("right_table", t.tableJoinSpec.RightTable), + zap.Int("right_nrow", rightTable.NRow()), + zap.Int("right_ncol", len(rightTable.ColumnNames())), + zap.Strings("right_columns", rightTable.ColumnNames()), + zap.String("how", t.tableJoinSpec.How.String()), + zap.Strings("join_columns", joinColumns), + ) + } + } + switch t.tableJoinSpec.How { case spec.JoinMethod_LEFT: err := validateJoinColumns(leftTable, rightTable, joinColumns)