diff --git a/pkg/inserter/inserter.go b/pkg/inserter/inserter.go index 8a5155a..c3764a9 100644 --- a/pkg/inserter/inserter.go +++ b/pkg/inserter/inserter.go @@ -94,6 +94,7 @@ func NewInserter(clickhouseOptions ClickHouseOptions, runtimeInfo RuntimeInfo, n w := worker{ waitAsyncInsert: clickhouseOptions.WaitForAsyncInsert, conn: conn, + database: clickhouseOptions.Database, batch: make([]string, 0, clickhouseOptions.BatchSize), batchExpirationTimer: time.NewTimer(clickhouseOptions.BatchSendTimeout), batchSendTimeout: clickhouseOptions.BatchSendTimeout, diff --git a/pkg/inserter/worker.go b/pkg/inserter/worker.go index 5925cc2..89cee92 100644 --- a/pkg/inserter/worker.go +++ b/pkg/inserter/worker.go @@ -72,6 +72,7 @@ type worker struct { waitAsyncInsert bool conn driver.Conn + database string batch []string batchCreationTimestamp time.Time batchExpirationTimer *time.Timer @@ -246,44 +247,44 @@ func (w *worker) flush() error { }() ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{"insert_deduplication_token": uuid.New().String()})) - q := ` -INSERT INTO default.network_flows_0 + q := fmt.Sprintf(` +INSERT INTO %s.network_flows_0 ( - date, - intervalStartTime, - intervalSeconds, - environment, - proto, - connectionClass, - connectionFlags, - direction, - localCloud, - localRegion, - localCluster, - localAvailabilityZone, - localNode, - localInstanceID, - localNamespace, - localPod, - localIPv4, - localPort, - localApp, - remoteCloud, - remoteRegion, - remoteCluster, - remoteAvailabilityZone, - remoteNode, - remoteInstanceID, - remoteNamespace, - remotePod, - remoteIPv4, - remotePort, - remoteApp, - remoteCloudService, - bytes, - packets +date, +intervalStartTime, +intervalSeconds, +environment, +proto, +connectionClass, +connectionFlags, +direction, +localCloud, +localRegion, +localCluster, +localAvailabilityZone, +localNode, +localInstanceID, +localNamespace, +localPod, +localIPv4, +localPort, +localApp, +remoteCloud, +remoteRegion, +remoteCluster, +remoteAvailabilityZone, +remoteNode, +remoteInstanceID, +remoteNamespace, +remotePod, +remoteIPv4, +remotePort, +remoteApp, +remoteCloudService, +bytes, +packets ) -VALUES ` + strings.Join(w.batch, ", ") +VALUES %s`, w.database, strings.Join(w.batch, ", ")) if err1 := w.conn.AsyncInsert(ctx, q, w.waitAsyncInsert); err1 != nil { insertRetryCounter.Inc() log.Err(err1).Msg("failed to insert, going to retry")