diff --git a/lib/constants.go b/lib/constants.go index fa6a19b1..d2fb7d0f 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -45,6 +45,8 @@ const ( EvtNameWhitelist = "db_whitelist" EvtNameShardKeyAutodisc = "shard_key_auto_discovery" EvtNameBadMapping = "bad_mapping" + EvtNameScuttleIdMismatch = "scuttle_id_mismatch" + EvtNameBadScuttleId = "bad_scuttle_id" ) // Shard map configuration @@ -75,6 +77,7 @@ var ( ErrNoShardValue, ErrAutodiscoverWhileSetShardID, ErrNoScuttleIdPredicate, + ErrScuttleIDMismatch, ErrCrossKeysDML, ErrQueryBindBlocker, ErrOther, @@ -106,6 +109,7 @@ func MkErr(prefix string) { ErrNoShardValue = errors.New(prefix + "-375: no shard value or wrong sharKey array binding") ErrCrossKeysDML = errors.New(prefix + "-206: cross key dml") ErrQueryBindBlocker = errors.New(prefix + "-207: dba query bind blocker") + ErrScuttleIDMismatch = errors.New(prefix + "-208: scuttle_id mismatch") ErrOther = errors.New(prefix + "-1000: unknown error") ErrReqParseFail = errors.New("Request error") } diff --git a/lib/coordinatorsharding.go b/lib/coordinatorsharding.go index 26f4c840..60ba2114 100644 --- a/lib/coordinatorsharding.go +++ b/lib/coordinatorsharding.go @@ -75,6 +75,7 @@ func (crd *Coordinator) getShardRec(key0 interface{}) *ShardMapRecord { keyNum := key0.(uint64) //keyNum, ok := key0.(uint64) for i := 0; i < 8; i++ { + //In this case, keyNum & 0xFF extracts the least significant 8 bits (one byte) from the keyNum variable. bytes[i] = byte(keyNum & 0xFF) keyNum >>= 8 } @@ -203,6 +204,7 @@ func (crd *Coordinator) computeLogicalShards() { break } // filter only the numeric part of the ShardValue + //Based on shard-key type, cast value to specific type. var key interface{} if GetConfig().ShardKeyValueTypeIsString { key = rec @@ -290,6 +292,25 @@ func (crd *Coordinator) isShardKey(bind string) bool { return true } +//Compare bind-name with scuttle ID column name from configuration. This doesn't consider multiple ScutttleID present +//via IN CLAUSE as part of request. This implementation provided based on assumption that request will have +// single bind value for scuttle_id column. +func (crd *Coordinator) isScuttleID(bindName string) bool { + if len(bindName) == 0 { + return false + } + if bindName[0] == ':' { + bindName = bindName[1:] + } + bindName = strings.ToLower(bindName) + scuttleIDColumn := strings.ToLower(GetConfig().ScuttleColName) + + if scuttleIDColumn == bindName { + return true + } + return false +} + // PreprocessSharding is doing shard info calculation and validation checks (by calling verifyValidShard) // before determining if the current request should continue, returning nil error if the request should be allowed. // If error is not nil, the second parameter says if the coordinator should hangup the client connection. @@ -314,12 +335,14 @@ func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (boo } sz := len(requests) + var scuttleID int = -1 //Default value if request has bindname without bind value + scuttleColumnPresent := false autodisc := false /* ShardKey can overwrite the autodiscovery */ for i := 0; i < sz; i++ { if requests[i].Cmd == common.CmdPrepare { - lowerSql := strings.ToLower(string(requests[i].Payload)) - scuttle_idx := strings.LastIndex(lowerSql, strings.ToLower(GetConfig().ScuttleColName)) - if scuttle_idx < 0 || scuttle_idx > strings.Index(lowerSql, " from ") { + lowerSQL := strings.ToLower(string(requests[i].Payload)) + scuttleIdx := strings.LastIndex(lowerSQL, strings.ToLower(GetConfig().ScuttleColName)) + if scuttleIdx < 0 || scuttleIdx > strings.Index(lowerSQL, " from ") { continue } evt := cal.NewCalEvent(EvtTypeSharding, "RM_SCUTTLE_ID_FETCH_COL", cal.TransOK, "") @@ -329,6 +352,27 @@ func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (boo crd.respond(ns.Serialized) return true, ErrNoScuttleIdPredicate } + //Capture ScuttleID column data in-case if it provided as part of query. + if (requests[i].Cmd == common.CmdBindName) && crd.isScuttleID(string(requests[i].Payload)) { + //To avoid repeated binds for scuttleId column + if !scuttleColumnPresent { + scuttleColumnPresent = true + if i < (sz - 1) { + if requests[i+1].Cmd == common.CmdBindNum && requests[i+2].Cmd == common.CmdBindValueMaxSize { + scuttleID, _ = strconv.Atoi(string(requests[i+3].Payload)) + } else if requests[i+1].Cmd == common.CmdBindValue { + scuttleID, _ = strconv.Atoi(string(requests[i+1].Payload)) + } else { + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, crd.id, fmt.Sprintf("Bind value for scuttleID column: %s not present in Query.", GetConfig().ScuttleColName)) + } + evt := cal.NewCalEvent(EvtTypeSharding, EvtNameBadScuttleId, cal.TransOK, fmt.Sprintf("Bind value for scuttleID column: %s not present in Query.", GetConfig().ScuttleColName)) + evt.AddDataInt("sql", int64(uint32(crd.sqlhash))) + evt.Completed() + } + } + } + } if (requests[i].Cmd == common.CmdBindName) && crd.isShardKey(string(requests[i].Payload)) { if crd.shard.sessionShardID != -1 { evt := cal.NewCalEvent(EvtTypeSharding, EvtNameAutodiscSetShardID, cal.TransOK, "") @@ -370,7 +414,7 @@ func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (boo evt := cal.NewCalEvent(EvtTypeSharding, EvtNameShardIDAndKey, cal.TransOK, "") evt.AddDataInt("sql", int64(uint32(crd.sqlhash))) evt.Completed() - return true, errors.New("Unsupported both HERA_SET_SHARD_ID and ShardKey") + return true, errors.New("unsupported both HERA_SET_SHARD_ID and ShardKey") } key, vals := crd.parseShardKey(requests[i].Payload) @@ -452,6 +496,14 @@ func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (boo evt.Completed() } } + + //This will handle scuttleID verification as part of this it compares scuttleID value with bucket value in shardRec + if scuttleColumnPresent { + hangup, err := crd.verifyScuttleID(scuttleID) + if err != nil { + return hangup, err + } + } } if (len(crd.shard.shardValues) == 0) && (crd.shard.sessionShardID == -1) { @@ -468,6 +520,7 @@ func (crd *Coordinator) PreprocessSharding(requests []*netstring.Netstring) (boo } } + //Verify whether shard information is valid or not hangup, err := crd.verifyValidShard() if err != nil { return hangup, err @@ -604,3 +657,20 @@ func (crd *Coordinator) verifyXShard(oldShardValues []string, oldShardID int, ol } return nil } + +//This validates the scuttleId provided as part of request command matching with scuttleID computed from shardKey. +//If both are not matching then it throws scuttle ID mismatch +func (crd *Coordinator) verifyScuttleID(scuttleID int) (bool, error) { + if scuttleID != crd.shard.shardRecs[0].bin { + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("ScuttleID comparison failed, scuttleID: %d captured from request didn't match with computed value: %d using shardKey: %s", scuttleID, crd.shard.shardRecs[0].bin, crd.shard.shardValues[0])) + } + evt := cal.NewCalEvent(EvtTypeSharding, EvtNameScuttleIdMismatch, cal.TransOK, "") + evt.AddDataInt("sql", int64(uint32(crd.sqlhash))) + evt.Completed() + ns := netstring.NewNetstringFrom(common.RcError, []byte(ErrScuttleIDMismatch.Error())) + crd.respond(ns.Serialized) + return true /*don't hangup*/, ErrScuttleIDMismatch + } + return false, nil +} diff --git a/tests/unittest/coordinator_sharding/main_test.go b/tests/unittest/coordinator_sharding/main_test.go index 713c05f2..c8ef730f 100644 --- a/tests/unittest/coordinator_sharding/main_test.go +++ b/tests/unittest/coordinator_sharding/main_test.go @@ -459,9 +459,6 @@ func TestShardingSetShardKey(t *testing.T) { err = nil t.Fatalf("Expected 1 Unsupported both HERA_SET_SHARD_ID and ShardKey true, %v %v", err, len(out)) } - if out[0] != '1' { - t.Fatalf("Expected 1 instance of 'Unsupported both HERA_SET_SHARD_ID and ShardKey', instead got %d", int(out[0]-'0')) - } conn, err = db.Conn(ctx) if err != nil { diff --git a/tests/unittest/coordinator_sharding_with_scuttleid/main_test.go b/tests/unittest/coordinator_sharding_with_scuttleid/main_test.go new file mode 100755 index 00000000..17671841 --- /dev/null +++ b/tests/unittest/coordinator_sharding_with_scuttleid/main_test.go @@ -0,0 +1,348 @@ +package main + +import ( + "context" + "database/sql" + + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/paypal/hera/client/gosqldriver" + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31003" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_sharding"] = "true" + appcfg["num_shards"] = "3" + appcfg["max_scuttle"] = "9" + appcfg["scuttle_col_name"] = "scuttle_id" + appcfg["shard_key_name"] = "id" + appcfg["error_code_prefix"] = "HERA" + pfx := os.Getenv("MGMT_TABLE_PREFIX") + if pfx != "" { + appcfg["management_table_prefix"] = pfx + } + appcfg["sharding_cfg_reload_interval"] = "3600" + appcfg["rac_sql_interval"] = "0" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.max_connections"] = "3" + opscfg["opscfg.default.server.log_level"] = "5" + + return appcfg, opscfg, testutil.MySQLWorker +} + +func setupShardMap(t *testing.T) { + twoTask := os.Getenv("TWO_TASK") + if !strings.HasPrefix(twoTask, "tcp") { + // not mysql + return + } + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer releaseDB(db) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + defer func(conn *sql.Conn) { + err := conn.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing Conn: ", err) + } + }(conn) + + testutil.RunDML("create table hera_shard_map ( scuttle_id smallint not null, shard_id tinyint not null, status char(1) , read_status char(1), write_status char(1), remarks varchar(500))") + + for i := 0; i < 1024; i++ { + shard := 0 + if i <= 9 { + shard = i % 3 + } + testutil.RunDML(fmt.Sprintf("insert into hera_shard_map ( scuttle_id, shard_id, status, read_status, write_status ) values ( %d, %d, 'Y', 'Y', 'Y' )", i, shard)) + } +} + +func before() error { + tableName = "jdbc_hera_scuttle_id_test" + if strings.HasPrefix(os.Getenv("TWO_TASK"), "tcp") { + // mysql + err := testutil.RunDML("create table " + tableName + " ( SCUTTLE_ID smallint not null, ID BIGINT, INT_VAL BIGINT, STR_VAL VARCHAR(500))") + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while creating table") + } + } + return nil +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, before)) +} + +func TestShardingWithScuttleIDBasic(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestShardingBasicWithScuttleID setup") + setupShardMap(t) + logger.GetLogger().Log(logger.Debug, "TestShardingBasicWithScuttleID begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + hostname, _ := os.Hostname() + appCfg, _, _ := cfg() + db, err := sql.Open("hera", hostname+":31003") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer releaseDB(db) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + + // insert one row in the table + tx, _ := conn.BeginTx(ctx, nil) + shardKey := 1 + scuttleID, err := testutil.ComputeScuttleId(shardKey, appCfg["max_scuttle"]) + if err != nil { + t.Fatalf("Error generating scuttle ID %s\n", err.Error()) + } + stmt, _ := tx.PrepareContext(ctx, "/*TestShardingBasicWithScuttleID*/insert into "+tableName+" (scuttle_id, id, int_val, str_val) VALUES(:scuttle_id, :id, :int_val, :str_val)") + _, err = stmt.Exec(sql.Named("scuttle_id", scuttleID), sql.Named("id", shardKey), sql.Named("int_val", time.Now().Unix()), sql.Named("str_val", "val 1")) + if err != nil { + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + } + err = tx.Commit() + if err != nil { + t.Fatalf("Error commit %s\n", err.Error()) + } + + stmt, _ = conn.PrepareContext(ctx, "/*TestShardingBasicWithScuttleID*/Select id, int_val, str_val from "+tableName+" where id=:id and scuttle_id=:scuttle_id") + rows, err := stmt.Query(sql.Named("id", 1), sql.Named("scuttle_id", scuttleID)) + + if err != nil { + t.Fatalf("Error Selecting results %s\n", err.Error()) + } + + if !rows.Next() { + t.Fatalf("Expected 1 row") + } + var id, intVal uint64 + var strVal sql.NullString + err = rows.Scan(&id, &intVal, &strVal) + if err != nil { + t.Fatalf("Expected values %s", err.Error()) + } + if strVal.String != "val 1" { + t.Fatalf("Expected val 1 , got: %s", strVal.String) + } + err = rows.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing rows: ", err) + } + + //Change Scuttle ID value to in correct scuttle ID + shardKey = 2 + scuttleID = 3 + // insert one row in the table + tx, _ = conn.BeginTx(ctx, nil) + stmt, _ = tx.PrepareContext(ctx, "/*TestShardingBasicWithScuttleIDIncorrectVal*/insert into "+tableName+" (scuttle_id, id, int_val, str_val) VALUES(:scuttle_id, :id, :int_val, :str_val)") + _, err = stmt.Exec(sql.Named("scuttle_id", scuttleID), sql.Named("id", shardKey), sql.Named("int_val", time.Now().Unix()), sql.Named("str_val", "val 2")) + if err == nil { + t.Fatal("Expected to fail because, mismatch between computed bucket and scuttleId.") + } + if !strings.Contains(err.Error(), "HERA-208: scuttle_id mismatch") { + t.Fatal("Expected error HERA-208: scuttle_id mismatch") + } + err = tx.Commit() + + //Add record2 + // insert one row in the table + conn, err = db.Conn(ctx) + tx, _ = conn.BeginTx(ctx, nil) + shardKey = 2 + scuttleID, err = testutil.ComputeScuttleId(shardKey, appCfg["max_scuttle"]) + if err != nil { + t.Fatalf("Error generating scuttle ID %s\n", err.Error()) + } + stmt, _ = tx.PrepareContext(ctx, "/*TestShardingBasicWithScuttleID*/insert into "+tableName+" (scuttle_id, id, int_val, str_val) VALUES(:scuttle_id, :id, :int_val, :str_val)") + _, err = stmt.Exec(sql.Named("scuttle_id", scuttleID), sql.Named("id", shardKey), sql.Named("int_val", time.Now().Unix()), sql.Named("str_val", "val 2")) + + if err != nil { + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + } + err = tx.Commit() + err = stmt.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing statement: ", err) + } + err = conn.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing connection: ", err) + } + cancel() + logger.GetLogger().Log(logger.Debug, "TestShardingBasicWithScuttleID done -------------------------------------------------------------") +} + +func TestShardingWithScuttleIDAndSetShard(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestShardingWithScuttleIDAndSetShard begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + hostname, _ := os.Hostname() + db, err := sql.Open("hera", hostname+":31003") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer releaseDB(db) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + + mux := gosqldriver.InnerConn(conn) + err = mux.SetShardID(1) + if err != nil { + logger.GetLogger().Log(logger.Info, "Failed to set shardId: ", err) + } + stmt, _ := conn.PrepareContext(ctx, "/*TestShardingWithScuttleIDAndSetShard*/Select scuttle_id, id, int_val, str_val from "+tableName+" where id=1 and scuttle_id=:scuttle_id") + rows, _ := stmt.Query(sql.Named("scuttle_id", 2)) + if rows != nil { + err = rows.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing rows: ", err) + } + } + if stmt != nil { + err = stmt.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing statement: ", err) + } + } + out, err := testutil.BashCmd("grep 'Preparing: /\\*TestShardingWithScuttleIDAndSetShard\\*/' hera.log | grep 'WORKER shd1' | wc -l") + if (err != nil) || (len(out) == 0) { + err = nil + t.Fatalf("Request did not run on shard 1. err = %v, len(out) = %d", err, len(out)) + } + + err = mux.SetShardID(2) + if err != nil { + logger.GetLogger().Log(logger.Info, "Failed to set shardId: ", err) + } + stmt, _ = conn.PrepareContext(ctx, "/*TestShardingWithScuttleIDAndSetShard*/Select scuttle_id, id, int_val, str_val from "+tableName+" where id=2 and scuttle_id=:scuttle_id") + rows, _ = stmt.Query(sql.Named("scuttle_id", 1)) + if rows != nil { + err = rows.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing rows: ", err) + } + } + err = stmt.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing Statement: ", err) + } + out, err = testutil.BashCmd("grep 'Preparing: /\\*TestShardingWithScuttleIDAndSetShard\\*/' hera.log | grep 'WORKER shd2' | wc -l") + if (err != nil) || (len(out) == 0) { + err = nil + t.Fatalf("Request did not run on shard 2. err = %v, len(out) = %d", err, len(out)) + } + err = conn.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing connection: ", err) + } + cancel() + logger.GetLogger().Log(logger.Debug, "TestShardingWithScuttleIDAndSetShard done -------------------------------------------------------------") +} + +func TestShardingWithScuttleIDAndInvalidBindValue(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestShardingWithScuttleIDAndInvalidBindValue begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + hostname, _ := os.Hostname() + db, err := sql.Open("hera", hostname+":31003") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer releaseDB(db) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + + //Test 1 provide scuttle_id as empty or nil + tx, _ := conn.BeginTx(ctx, nil) + shardKey := 2 + if err != nil { + t.Fatalf("Error generating scuttle ID %s\n", err.Error()) + } + stmt, _ := tx.PrepareContext(ctx, "/*TestShardingWithScuttleIDAndWithoutBindValue*/insert into "+tableName+" (scuttle_id, id, int_val, str_val) VALUES(:scuttle_id, :id, :int_val, :str_val)") + _, err = stmt.Exec(sql.Named("scuttle_id", ""), sql.Named("id", shardKey), sql.Named("int_val", time.Now().Unix()), sql.Named("str_val", "val 2")) + if err == nil { + t.Fatal("Expected to fail because, mismatch between computed bucket and scuttleId.") + } + if !strings.Contains(err.Error(), "HERA-208") { + t.Fatal("Expected error HERA-208: scuttle_id mismatch") + } + tx.Commit() + err = conn.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing connection: ", err) + } + + conn, err = db.Conn(ctx) + //Test 2 select with no bind value scuttle_id + stmt, _ = conn.PrepareContext(ctx, "/*TestShardingWithScuttleIDAndWithoutBindValue*/Select id, int_val, str_val from "+tableName+" where id=:id and scuttle_id=:scuttle_id") + rows, err := stmt.Query(sql.Named("scuttle_id", ""), sql.Named("id", shardKey)) + + if err == nil { + t.Fatal("Expected to fail because, mismatch between computed bucket and scuttleId.") + } + if !strings.Contains(err.Error(), "HERA-208") { + t.Fatal("Expected error HERA-208: scuttle_id mismatch") + } + if rows != nil { + err := rows.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing rows object: ", err) + } + } + if stmt != nil { + err = stmt.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing statement: ", err) + } + } + err = conn.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing connection: ", err) + } + cancel() + logger.GetLogger().Log(logger.Debug, "TestShardingWithScuttleIDAndWithoutBindValue done -------------------------------------------------------------") +} + +func releaseDB(db *sql.DB) { + err := db.Close() + if err != nil { + logger.GetLogger().Log(logger.Info, "Error while closing DB: ", err) + } +} diff --git a/tests/unittest/testutil/util.go b/tests/unittest/testutil/util.go index 979ee4cb..77317e64 100644 --- a/tests/unittest/testutil/util.go +++ b/tests/unittest/testutil/util.go @@ -7,9 +7,11 @@ import ( "database/sql" "errors" "fmt" + "github.com/paypal/hera/lib" "os" "os/exec" "regexp" + "strconv" "strings" "time" @@ -152,6 +154,7 @@ func RunDML(dml string) error { func RegexCount(regex string) int { return RegexCountFile(regex, "hera.log") } + func RegexCountFile(regex string, filename string) int { time.Sleep(10 * time.Millisecond) fa, err := regexp.Compile(regex) @@ -185,10 +188,47 @@ func RegexCountFile(regex string, filename string) int { return count } +func ComputeScuttleId(shardKey interface{}, maxScuttles string) (uint64, error) { + maxScuttlesVal, _ := strconv.ParseUint(maxScuttles, 10, 64) + logger.GetLogger().Log(logger.Info, fmt.Sprintf("ComputeScuttleId: Max scuttles value: %d", maxScuttlesVal)) + switch keyType := shardKey.(type) { + case string: + keyStr := shardKey.(string) + //keyStr, ok := key0.(string) + key := uint64(lib.Murmur3([]byte(keyStr))) + return key % maxScuttlesVal, nil + case int: + bytes := make([]byte, 8) + keyNum, _ := shardKey.(int) + keyNumValue := uint64(keyNum) + for i := 0; i < 8; i++ { + bytes[i] = byte(keyNumValue & 0xFF) + keyNumValue >>= 8 + } + key := uint64(lib.Murmur3(bytes)) + return key % maxScuttlesVal, nil + case int64: + bytes := make([]byte, 8) + keyNum := shardKey.(uint64) + //keyNum, ok := key0.(uint64) + for i := 0; i < 8; i++ { + bytes[i] = byte(keyNum & 0xFF) + keyNum >>= 8 + } + key := uint64(lib.Murmur3(bytes)) + return key % maxScuttlesVal, nil + default: + logger.GetLogger().Log(logger.Info, fmt.Sprintf("Provided incorrect shardkey type: %v for tests for shard-key: %v", keyType, shardKey)) + return 0, errors.New(fmt.Sprintf("Provided incorrect shardkey type: %v for tests for shard-key: %v", keyType, shardKey)) + } + return 0, errors.New(fmt.Sprintf("Failed to compute scuttle ID for shardKey: %v", shardKey)) +} + func Fatal(msg ...interface{}) { fmt.Println(msg...) os.Exit(2) } + func Fatalf(str string, msg ...interface{}) { fmt.Printf(str, msg...) os.Exit(1)