From 0f67ff17c5664efa21acabc91f109c1e4783f121 Mon Sep 17 00:00:00 2001 From: nandan Date: Thu, 8 Aug 2024 19:15:01 +0530 Subject: [PATCH 1/2] FOGL-8285 : Added new JSONB field in streams table Signed-off-by: nandan --- C/services/north/data_load.cpp | 90 +++++++++++++++++-- C/services/north/include/data_load.h | 1 + .../plugins/storage/postgres/downgrade/74.sql | 2 + scripts/plugins/storage/postgres/init.sql | 23 ++--- .../plugins/storage/postgres/upgrade/75.sql | 1 + .../plugins/storage/sqlite/downgrade/74.sql | 2 + scripts/plugins/storage/sqlite/init.sql | 21 ++--- scripts/plugins/storage/sqlite/upgrade/75.sql | 1 + .../plugins/storage/sqlitelb/downgrade/74.sql | 2 + scripts/plugins/storage/sqlitelb/init.sql | 21 ++--- .../plugins/storage/sqlitelb/upgrade/75.sql | 1 + 11 files changed, 127 insertions(+), 38 deletions(-) create mode 100644 scripts/plugins/storage/postgres/downgrade/74.sql create mode 100644 scripts/plugins/storage/postgres/upgrade/75.sql create mode 100644 scripts/plugins/storage/sqlite/downgrade/74.sql create mode 100644 scripts/plugins/storage/sqlite/upgrade/75.sql create mode 100644 scripts/plugins/storage/sqlitelb/downgrade/74.sql create mode 100644 scripts/plugins/storage/sqlitelb/upgrade/75.sql diff --git a/C/services/north/data_load.cpp b/C/services/north/data_load.cpp index 15a6ce84ee..994597ab21 100755 --- a/C/services/north/data_load.cpp +++ b/C/services/north/data_load.cpp @@ -118,6 +118,8 @@ bool DataLoad::setDataSource(const string& source) source.c_str(), m_name.c_str()); return false; } + m_lastFetched = getLastSentId(); + m_streamSent = getLastSentId(); return true; } @@ -335,7 +337,6 @@ unsigned long DataLoad::getLastSentId() // SELECT * FROM fledge.streams WHERE id = x Query qLastId(wStreamId); - ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); if (lastObjectId != NULL && lastObjectId->rowCount()) @@ -346,10 +347,21 @@ unsigned long DataLoad::getLastSentId() ResultSet::Row* row = *it; if (row) { + ResultSet::ColumnValue* theVal = nullptr; + unsigned long rval = 0; // Get column value - ResultSet::ColumnValue* theVal = row->getColumn("last_object"); - // Set found id - unsigned long rval = (unsigned long)theVal->getInteger(); + if (m_dataSource == SourceReadings) + { + theVal = row->getColumn("last_object"); + rval = (unsigned long)theVal->getInteger(); + } + else + { + theVal = row->getColumn("audit_stats_last_object"); + const rapidjson::Value * json = theVal->getJSON(); + (m_dataSource == SourceStatistics) ? (rval = json->GetObject()["Stats"].GetInt64()) : (rval = json->GetObject()["Audit"].GetInt64()); + + } delete lastObjectId; return rval; } @@ -448,9 +460,15 @@ int DataLoad::createNewStream() { int streamId = 0; InsertValues streamValues; - streamValues.push_back(InsertValue("description", m_name)); - streamValues.push_back(InsertValue("last_object", 0)); + if (m_dataSource == SourceReadings) + { + streamValues.push_back(InsertValue("last_object", 0)); + } + else + { + streamValues.push_back(InsertValue("audit_stats_last_object", "{\\\"Stats\\\":0,\\\"Audit\\\":0}" )); + } if (m_storage->insertTable("streams", streamValues) != 1) { @@ -508,7 +526,29 @@ void DataLoad::flushLastSentId() Where where("id", condition, to_string(m_streamId)); InsertValues lastId; - lastId.push_back(InsertValue("last_object", (long)m_streamSent)); + if (m_dataSource == SourceReadings) + { + lastId.push_back(InsertValue("last_object", (long)m_streamSent)); + } + else + { + const rapidjson::Value * json = getAuditStatsValue(); + std::string audit_stats_last_object = "{}"; + if (json == nullptr) + { + audit_stats_last_object = "{\\\"Stats\\\":0,\\\"Audit\\\":0}"; + } + else if (m_dataSource == SourceStatistics) + { + audit_stats_last_object = "{\\\"Stats\\\":" + to_string(m_streamSent) +",\\\"Audit\\\":" + to_string(json->GetObject()["Audit"].GetInt64()) +"}"; + } + else + { + audit_stats_last_object = "{\\\"Stats\\\":" + to_string(json->GetObject()["Stats"].GetInt64()) +",\\\"Audit\\\":" + to_string(m_streamSent) +"}"; + } + lastId.push_back(InsertValue("audit_stats_last_object", audit_stats_last_object)); + } + m_storage->updateTable("streams", lastId, where); } @@ -718,3 +758,39 @@ void DataLoad::configChange(const string& category, const string& newConfig) } } } + +/** + * Get the Audit Stats value to be used to fetch last reading sent with this current data source + */ + +const rapidjson::Value * DataLoad::getAuditStatsValue() +{ + + const Condition conditionId(Equals); + string streamId = to_string(m_streamId); + Where* wStreamId = new Where("id", + conditionId, + streamId); + + // SELECT * FROM fledge.streams WHERE id = x + Query qLastId(wStreamId); + + ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); + + if (lastObjectId != NULL && lastObjectId->rowCount()) + { + // Get the first row only + ResultSet::RowIterator it = lastObjectId->firstRow(); + // Access the element + ResultSet::Row* row = *it; + if (row) + { + ResultSet::ColumnValue* theVal = row->getColumn("audit_stats_last_object"); + delete lastObjectId; + return theVal->getJSON(); + } + } + // Free result set + delete lastObjectId; + return nullptr; +} diff --git a/C/services/north/include/data_load.h b/C/services/north/include/data_load.h index 96d0e9f826..35352dd824 100644 --- a/C/services/north/include/data_load.h +++ b/C/services/north/include/data_load.h @@ -70,6 +70,7 @@ class DataLoad : public ServiceHandler { ReadingSet *fetchAudit(unsigned int blockSize); void bufferReadings(ReadingSet *readings); bool loadFilters(const std::string& category); + const rapidjson::Value *getAuditStatsValue(); private: const std::string& m_name; diff --git a/scripts/plugins/storage/postgres/downgrade/74.sql b/scripts/plugins/storage/postgres/downgrade/74.sql new file mode 100644 index 0000000000..b3a91e9a85 --- /dev/null +++ b/scripts/plugins/storage/postgres/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; + diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 0337c4e872..256ea6a0df 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -423,17 +423,18 @@ CREATE INDEX readings_ix3 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID - description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry - properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties - object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed - object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised - object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream - active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations - active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update - CONSTRAINT strerams_pkey PRIMARY KEY (id)); + id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID + description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry + properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties + object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed + object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised + object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream + active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations + active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + audit_stats_last_object jsonb NOT NULL DEFAULT '{"Audit":0,"Stats":0}'::jsonb, -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update + CONSTRAINT strerams_pkey PRIMARY KEY (id)); -- Configuration table diff --git a/scripts/plugins/storage/postgres/upgrade/75.sql b/scripts/plugins/storage/postgres/upgrade/75.sql new file mode 100644 index 0000000000..baac045317 --- /dev/null +++ b/scripts/plugins/storage/postgres/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object jsonb NOT NULL DEFAULT '{"Audit":0,"Stats":0}'::jsonb; diff --git a/scripts/plugins/storage/sqlite/downgrade/74.sql b/scripts/plugins/storage/sqlite/downgrade/74.sql new file mode 100644 index 0000000000..b3a91e9a85 --- /dev/null +++ b/scripts/plugins/storage/sqlite/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; + diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index cb35730643..58a0401be1 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID - description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry - properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties - object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed - object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised - object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream - active_window JSON NOT NULL DEFAULT '{}', -- The window of operations - active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID + description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry + properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties + object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed + object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised + object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream + active_window JSON NOT NULL DEFAULT '{}', -- The window of operations + active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}', -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update -- Configuration table diff --git a/scripts/plugins/storage/sqlite/upgrade/75.sql b/scripts/plugins/storage/sqlite/upgrade/75.sql new file mode 100644 index 0000000000..72d96ee21d --- /dev/null +++ b/scripts/plugins/storage/sqlite/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}'; diff --git a/scripts/plugins/storage/sqlitelb/downgrade/74.sql b/scripts/plugins/storage/sqlitelb/downgrade/74.sql new file mode 100644 index 0000000000..b3a91e9a85 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; + diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 48b7771f85..46ab0114dd 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID - description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry - properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties - object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed - object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised - object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream - active_window JSON NOT NULL DEFAULT '{}', -- The window of operations - active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID + description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry + properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties + object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed + object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised + object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream + active_window JSON NOT NULL DEFAULT '{}', -- The window of operations + active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}', -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update -- Configuration table diff --git a/scripts/plugins/storage/sqlitelb/upgrade/75.sql b/scripts/plugins/storage/sqlitelb/upgrade/75.sql new file mode 100644 index 0000000000..72d96ee21d --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}'; From 59341d5cfd0a39ffecd67711914f95d9bc1db3c5 Mon Sep 17 00:00:00 2001 From: nandan Date: Thu, 12 Sep 2024 14:27:05 +0530 Subject: [PATCH 2/2] FOGL-8285 : Added migration code for stats Signed-off-by: nandan --- C/services/north/data_load.cpp | 158 +++++++++++++----- C/services/north/include/data_load.h | 4 +- VERSION | 2 +- .../plugins/storage/postgres/downgrade/74.sql | 2 +- scripts/plugins/storage/postgres/init.sql | 2 +- .../plugins/storage/postgres/upgrade/75.sql | 2 +- .../plugins/storage/sqlite/downgrade/74.sql | 2 +- scripts/plugins/storage/sqlite/init.sql | 2 +- scripts/plugins/storage/sqlite/upgrade/75.sql | 2 +- .../plugins/storage/sqlitelb/downgrade/74.sql | 2 +- scripts/plugins/storage/sqlitelb/init.sql | 2 +- .../plugins/storage/sqlitelb/upgrade/75.sql | 2 +- 12 files changed, 130 insertions(+), 52 deletions(-) diff --git a/C/services/north/data_load.cpp b/C/services/north/data_load.cpp index f8332e2199..91740ecd8e 100755 --- a/C/services/north/data_load.cpp +++ b/C/services/north/data_load.cpp @@ -42,6 +42,7 @@ DataLoad::DataLoad(const string& name, long streamId, StorageClient *storage) : m_streamUpdate = 1; m_lastFetched = getLastSentId(); m_streamSent = getLastSentId(); + migrateStats(); m_flushRequired = false; m_thread = new thread(threadMain, this); loadFilters(name); @@ -350,21 +351,24 @@ unsigned long DataLoad::getLastSentId() ResultSet::Row* row = *it; if (row) { + // Get column value ResultSet::ColumnValue* theVal = nullptr; unsigned long rval = 0; - // Get column value - if (m_dataSource == SourceReadings) + theVal = row->getColumn("last_objects"); + const rapidjson::Value * json = theVal->getJSON(); + switch(m_dataSource) { - theVal = row->getColumn("last_object"); - rval = (unsigned long)theVal->getInteger(); + case SourceReadings: + rval = json->GetObject()["Readings"].GetInt64(); + break; + case SourceStatistics: + rval = json->GetObject()["Stats"].GetInt64(); + break; + case SourceAudit: + rval = json->GetObject()["Audit"].GetInt64(); + break; } - else - { - theVal = row->getColumn("audit_stats_last_object"); - const rapidjson::Value * json = theVal->getJSON(); - (m_dataSource == SourceStatistics) ? (rval = json->GetObject()["Stats"].GetInt64()) : (rval = json->GetObject()["Audit"].GetInt64()); - } delete lastObjectId; return rval; } @@ -469,19 +473,12 @@ int DataLoad::createNewStream() int streamId = 0; InsertValues streamValues; streamValues.push_back(InsertValue("description", m_name)); - if (m_dataSource == SourceReadings) - { - streamValues.push_back(InsertValue("last_object", 0)); - } - else - { - streamValues.push_back(InsertValue("audit_stats_last_object", "{\\\"Stats\\\":0,\\\"Audit\\\":0}" )); - } + streamValues.push_back(InsertValue("last_objects", "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}" )); if (m_storage->insertTable("streams", streamValues) != 1) { Logger::getLogger()->error("Failed to insert a row into the streams table"); - } + } else { // Select the row just created, having description='process name' @@ -533,31 +530,107 @@ void DataLoad::flushLastSentId() const Condition condition(Equals); Where where("id", condition, to_string(m_streamId)); InsertValues lastId; - - if (m_dataSource == SourceReadings) + + std::string last_objects = "{}"; + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}"; + unsigned long readingsSentCount = 0; + unsigned long statsSentCount = 0; + unsigned long auditSentCount= 0; + std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects(); + + switch(m_dataSource) { - lastId.push_back(InsertValue("last_object", (long)m_streamSent)); + case SourceReadings: + last_objects = "{\\\"Readings\\\":" + to_string(m_streamSent) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}"; + break; + case SourceStatistics: + last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(m_streamSent) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}"; + break; + case SourceAudit: + last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(m_streamSent) + "}"; + break; } - else + lastId.push_back(InsertValue("last_objects", last_objects)); + m_storage->updateTable("streams", lastId, where); + +} + +/** + * Migrate data of last_object field of streams table to new field last_objects + */ +void DataLoad::migrateStats() +{ + // Get last_object field to check if migration of data is required to new field + const Condition conditionId(Equals); + string streamId = to_string(m_streamId); + Where* wStreamId = new Where("id", + conditionId, + streamId); + + // SELECT * FROM foglamp.streams WHERE id = x + Query qLastId(wStreamId); + + ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); + + long lastSent = 0; + if (lastObjectId != NULL && lastObjectId->rowCount()) { - const rapidjson::Value * json = getAuditStatsValue(); - std::string audit_stats_last_object = "{}"; - if (json == nullptr) - { - audit_stats_last_object = "{\\\"Stats\\\":0,\\\"Audit\\\":0}"; - } - else if (m_dataSource == SourceStatistics) - { - audit_stats_last_object = "{\\\"Stats\\\":" + to_string(m_streamSent) +",\\\"Audit\\\":" + to_string(json->GetObject()["Audit"].GetInt64()) +"}"; - } - else + // Get the first row only + ResultSet::RowIterator it = lastObjectId->firstRow(); + // Access the element + ResultSet::Row* row = *it; + if (row) { - audit_stats_last_object = "{\\\"Stats\\\":" + to_string(json->GetObject()["Stats"].GetInt64()) +",\\\"Audit\\\":" + to_string(m_streamSent) +"}"; + // Get column value + ResultSet::ColumnValue* theVal = row->getColumn("last_object"); + // Set found id + lastSent = (long)theVal->getInteger(); } - lastId.push_back(InsertValue("audit_stats_last_object", audit_stats_last_object)); + } + // Free result set + delete lastObjectId; + + if (lastSent < 0) // Already migrated + return; + + // Not already migrated, migrate It. + + const Condition condition(Equals); + Where where("id", condition, to_string(m_streamId)); + InsertValues lastId; + + std::string last_objects = "{}"; + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}"; + unsigned long readingsSentCount = 0; + unsigned long statsSentCount = 0; + unsigned long auditSentCount= 0; + + std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects(); + + switch(m_dataSource) + { + case SourceReadings: + last_objects = "{\\\"Readings\\\":" + to_string(lastSent) + ",\\\"Stats\\\":0,\\\"Audit\\\":0}"; + break; + case SourceStatistics: + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":" + to_string(lastSent) + ",\\\"Audit\\\":0}"; + break; + case SourceAudit: + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":" + to_string(lastSent) + "}"; + break; } + lastId.push_back(InsertValue("last_objects", last_objects)); m_storage->updateTable("streams", lastId, where); + + // Update last_object field to mark that migration is done + lastSent = -1; + const Condition resetCondition(Equals); + Where resetClause("id", resetCondition, to_string(m_streamId)); + InsertValues resetlastObject; + + resetlastObject.push_back(InsertValue("last_object", (long)lastSent)); + m_storage->updateTable("streams", resetlastObject, resetClause); } @@ -767,12 +840,13 @@ void DataLoad::configChange(const string& category, const string& newConfig) } /** - * Get the Audit Stats value to be used to fetch last reading sent with this current data source + * Get the a tuple for the last objects sent */ -const rapidjson::Value * DataLoad::getAuditStatsValue() +std::tuple DataLoad::getLastObjects() { + std::tuple t; const Condition conditionId(Equals); string streamId = to_string(m_streamId); Where* wStreamId = new Where("id", @@ -792,12 +866,14 @@ const rapidjson::Value * DataLoad::getAuditStatsValue() ResultSet::Row* row = *it; if (row) { - ResultSet::ColumnValue* theVal = row->getColumn("audit_stats_last_object"); + ResultSet::ColumnValue* theVal = row->getColumn("last_objects"); + const rapidjson::Value * json = theVal->getJSON(); + t = std::make_tuple(json->GetObject()["Readings"].GetInt64(),json->GetObject()["Stats"].GetInt64(),json->GetObject()["Audit"].GetInt64()); delete lastObjectId; - return theVal->getJSON(); + return t; } } // Free result set delete lastObjectId; - return nullptr; + return std::make_tuple(0,0,0); } diff --git a/C/services/north/include/data_load.h b/C/services/north/include/data_load.h index 35352dd824..9312f30e47 100644 --- a/C/services/north/include/data_load.h +++ b/C/services/north/include/data_load.h @@ -11,6 +11,7 @@ #include #include #include +#include #define DEFAULT_BLOCK_SIZE 100 @@ -70,7 +71,8 @@ class DataLoad : public ServiceHandler { ReadingSet *fetchAudit(unsigned int blockSize); void bufferReadings(ReadingSet *readings); bool loadFilters(const std::string& category); - const rapidjson::Value *getAuditStatsValue(); + std::tuple getLastObjects(); + void migrateStats(); private: const std::string& m_name; diff --git a/VERSION b/VERSION index dc165f3307..7108fb207f 100644 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ fledge_version=2.5.0 -fledge_schema=74 +fledge_schema=75 diff --git a/scripts/plugins/storage/postgres/downgrade/74.sql b/scripts/plugins/storage/postgres/downgrade/74.sql index b3a91e9a85..9e64bf2fee 100644 --- a/scripts/plugins/storage/postgres/downgrade/74.sql +++ b/scripts/plugins/storage/postgres/downgrade/74.sql @@ -1,2 +1,2 @@ -ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; +ALTER TABLE fledge.streams DROP COLUMN last_objects; diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 256ea6a0df..edc6fe6112 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -432,7 +432,7 @@ CREATE TABLE fledge.streams ( active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - audit_stats_last_object jsonb NOT NULL DEFAULT '{"Audit":0,"Stats":0}'::jsonb, -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb, -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update CONSTRAINT strerams_pkey PRIMARY KEY (id)); diff --git a/scripts/plugins/storage/postgres/upgrade/75.sql b/scripts/plugins/storage/postgres/upgrade/75.sql index baac045317..19e1c2a95e 100644 --- a/scripts/plugins/storage/postgres/upgrade/75.sql +++ b/scripts/plugins/storage/postgres/upgrade/75.sql @@ -1 +1 @@ -ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object jsonb NOT NULL DEFAULT '{"Audit":0,"Stats":0}'::jsonb; +ALTER TABLE fledge.streams ADD COLUMN last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb; diff --git a/scripts/plugins/storage/sqlite/downgrade/74.sql b/scripts/plugins/storage/sqlite/downgrade/74.sql index b3a91e9a85..9e64bf2fee 100644 --- a/scripts/plugins/storage/sqlite/downgrade/74.sql +++ b/scripts/plugins/storage/sqlite/downgrade/74.sql @@ -1,2 +1,2 @@ -ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; +ALTER TABLE fledge.streams DROP COLUMN last_objects; diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 58a0401be1..6acb742e72 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -221,7 +221,7 @@ CREATE TABLE fledge.streams ( active_window JSON NOT NULL DEFAULT '{}', -- The window of operations active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}', -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update diff --git a/scripts/plugins/storage/sqlite/upgrade/75.sql b/scripts/plugins/storage/sqlite/upgrade/75.sql index 72d96ee21d..1941746634 100644 --- a/scripts/plugins/storage/sqlite/upgrade/75.sql +++ b/scripts/plugins/storage/sqlite/upgrade/75.sql @@ -1 +1 @@ -ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}'; +ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'; diff --git a/scripts/plugins/storage/sqlitelb/downgrade/74.sql b/scripts/plugins/storage/sqlitelb/downgrade/74.sql index b3a91e9a85..9e64bf2fee 100644 --- a/scripts/plugins/storage/sqlitelb/downgrade/74.sql +++ b/scripts/plugins/storage/sqlitelb/downgrade/74.sql @@ -1,2 +1,2 @@ -ALTER TABLE fledge.streams DROP COLUMN audit_stats_last_object; +ALTER TABLE fledge.streams DROP COLUMN last_objects; diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 46ab0114dd..d4176b1b99 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -221,7 +221,7 @@ CREATE TABLE fledge.streams ( active_window JSON NOT NULL DEFAULT '{}', -- The window of operations active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}', -- The ID of the last object streamed (Audit or Stats, depending on the object_stream) + last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update diff --git a/scripts/plugins/storage/sqlitelb/upgrade/75.sql b/scripts/plugins/storage/sqlitelb/upgrade/75.sql index 72d96ee21d..1941746634 100644 --- a/scripts/plugins/storage/sqlitelb/upgrade/75.sql +++ b/scripts/plugins/storage/sqlitelb/upgrade/75.sql @@ -1 +1 @@ -ALTER TABLE fledge.streams ADD COLUMN audit_stats_last_object JSON NOT NULL DEFAULT '{"Audit":0,"Stats":0}'; +ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}';