diff --git a/C/services/north/data_load.cpp b/C/services/north/data_load.cpp index 479cb48048..540f9d722f 100755 --- a/C/services/north/data_load.cpp +++ b/C/services/north/data_load.cpp @@ -42,6 +42,7 @@ DataLoad::DataLoad(const string& name, long streamId, StorageClient *storage) : m_streamUpdate = 1; m_lastFetched = getLastSentId(); m_streamSent = getLastSentId(); + migrateStats(); m_flushRequired = false; m_thread = new thread(threadMain, this); loadFilters(name); @@ -118,6 +119,8 @@ bool DataLoad::setDataSource(const string& source) source.c_str(), m_name.c_str()); return false; } + m_lastFetched = getLastSentId(); + m_streamSent = getLastSentId(); return true; } @@ -337,7 +340,6 @@ unsigned long DataLoad::getLastSentId() // SELECT * FROM fledge.streams WHERE id = x Query qLastId(wStreamId); - ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); if (lastObjectId != NULL && lastObjectId->rowCount()) @@ -349,9 +351,23 @@ unsigned long DataLoad::getLastSentId() if (row) { // Get column value - ResultSet::ColumnValue* theVal = row->getColumn("last_object"); - // Set found id - unsigned long rval = (unsigned long)theVal->getInteger(); + ResultSet::ColumnValue* theVal = nullptr; + unsigned long rval = 0; + theVal = row->getColumn("last_objects"); + const rapidjson::Value * json = theVal->getJSON(); + switch(m_dataSource) + { + case SourceReadings: + rval = json->GetObject()["Readings"].GetInt64(); + break; + case SourceStatistics: + rval = json->GetObject()["Stats"].GetInt64(); + break; + case SourceAudit: + rval = json->GetObject()["Audit"].GetInt64(); + break; + } + delete lastObjectId; return rval; } @@ -455,14 +471,13 @@ int DataLoad::createNewStream() { int streamId = 0; InsertValues streamValues; - streamValues.push_back(InsertValue("description", m_name)); - streamValues.push_back(InsertValue("last_object", 0)); + streamValues.push_back(InsertValue("last_objects", "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}" )); if (m_storage->insertTable("streams", streamValues) != 1) { Logger::getLogger()->error("Failed to insert a row into the streams table"); - } + } else { // Select the row just created, having description='process name' @@ -514,9 +529,107 @@ void DataLoad::flushLastSentId() const Condition condition(Equals); Where where("id", condition, to_string(m_streamId)); InsertValues lastId; + + std::string last_objects = "{}"; + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}"; + unsigned long readingsSentCount = 0; + unsigned long statsSentCount = 0; + unsigned long auditSentCount= 0; + std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects(); + + switch(m_dataSource) + { + case SourceReadings: + last_objects = "{\\\"Readings\\\":" + to_string(m_streamSent) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}"; + break; + case SourceStatistics: + last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(m_streamSent) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}"; + break; + case SourceAudit: + last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(m_streamSent) + "}"; + break; + } + lastId.push_back(InsertValue("last_objects", last_objects)); + m_storage->updateTable("streams", lastId, where); - lastId.push_back(InsertValue("last_object", (long)m_streamSent)); +} + +/** + * Migrate data of last_object field of streams table to new field last_objects + */ +void DataLoad::migrateStats() +{ + // Get last_object field to check if migration of data is required to new field + const Condition conditionId(Equals); + string streamId = to_string(m_streamId); + Where* wStreamId = new Where("id", + conditionId, + streamId); + + // SELECT * FROM foglamp.streams WHERE id = x + Query qLastId(wStreamId); + + ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); + + long lastSent = 0; + if (lastObjectId != NULL && lastObjectId->rowCount()) + { + // Get the first row only + ResultSet::RowIterator it = lastObjectId->firstRow(); + // Access the element + ResultSet::Row* row = *it; + if (row) + { + // Get column value + ResultSet::ColumnValue* theVal = row->getColumn("last_object"); + // Set found id + lastSent = (long)theVal->getInteger(); + } + } + // Free result set + delete lastObjectId; + + if (lastSent < 0) // Already migrated + return; + + // Not already migrated, migrate It. + + const Condition condition(Equals); + Where where("id", condition, to_string(m_streamId)); + InsertValues lastId; + + std::string last_objects = "{}"; + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}"; + unsigned long readingsSentCount = 0; + unsigned long statsSentCount = 0; + unsigned long auditSentCount= 0; + + std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects(); + + switch(m_dataSource) + { + case SourceReadings: + last_objects = "{\\\"Readings\\\":" + to_string(lastSent) + ",\\\"Stats\\\":0,\\\"Audit\\\":0}"; + break; + case SourceStatistics: + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":" + to_string(lastSent) + ",\\\"Audit\\\":0}"; + break; + case SourceAudit: + last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":" + to_string(lastSent) + "}"; + break; + } + + lastId.push_back(InsertValue("last_objects", last_objects)); m_storage->updateTable("streams", lastId, where); + + // Update last_object field to mark that migration is done + lastSent = -1; + const Condition resetCondition(Equals); + Where resetClause("id", resetCondition, to_string(m_streamId)); + InsertValues resetlastObject; + + resetlastObject.push_back(InsertValue("last_object", (long)lastSent)); + m_storage->updateTable("streams", resetlastObject, resetClause); } @@ -724,3 +837,42 @@ void DataLoad::configChange(const string& category, const string& newConfig) } } } + +/** + * Get the a tuple for the last objects sent + */ + +std::tuple DataLoad::getLastObjects() +{ + + std::tuple t; + const Condition conditionId(Equals); + string streamId = to_string(m_streamId); + Where* wStreamId = new Where("id", + conditionId, + streamId); + + // SELECT * FROM fledge.streams WHERE id = x + Query qLastId(wStreamId); + + ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId); + + if (lastObjectId != NULL && lastObjectId->rowCount()) + { + // Get the first row only + ResultSet::RowIterator it = lastObjectId->firstRow(); + // Access the element + ResultSet::Row* row = *it; + if (row) + { + ResultSet::ColumnValue* theVal = row->getColumn("last_objects"); + const rapidjson::Value * json = theVal->getJSON(); + t = std::make_tuple(json->GetObject()["Readings"].GetInt64(),json->GetObject()["Stats"].GetInt64(),json->GetObject()["Audit"].GetInt64()); + delete lastObjectId; + return t; + } + } + // Free result set + delete lastObjectId; + return std::make_tuple(0,0,0); +} diff --git a/C/services/north/include/data_load.h b/C/services/north/include/data_load.h index 96d0e9f826..9312f30e47 100644 --- a/C/services/north/include/data_load.h +++ b/C/services/north/include/data_load.h @@ -11,6 +11,7 @@ #include #include #include +#include #define DEFAULT_BLOCK_SIZE 100 @@ -70,6 +71,8 @@ class DataLoad : public ServiceHandler { ReadingSet *fetchAudit(unsigned int blockSize); void bufferReadings(ReadingSet *readings); bool loadFilters(const std::string& category); + std::tuple getLastObjects(); + void migrateStats(); private: const std::string& m_name; diff --git a/VERSION b/VERSION index 4ba94eff21..db6170b32c 100644 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ fledge_version=2.6.0 -fledge_schema=74 +fledge_schema=75 diff --git a/scripts/plugins/storage/postgres/downgrade/74.sql b/scripts/plugins/storage/postgres/downgrade/74.sql new file mode 100644 index 0000000000..9e64bf2fee --- /dev/null +++ b/scripts/plugins/storage/postgres/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN last_objects; + diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 0337c4e872..edc6fe6112 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -423,17 +423,18 @@ CREATE INDEX readings_ix3 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID - description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry - properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties - object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed - object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised - object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream - active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations - active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update - CONSTRAINT strerams_pkey PRIMARY KEY (id)); + id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID + description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry + properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties + object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed + object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised + object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream + active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations + active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb, -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) + ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update + CONSTRAINT strerams_pkey PRIMARY KEY (id)); -- Configuration table diff --git a/scripts/plugins/storage/postgres/upgrade/75.sql b/scripts/plugins/storage/postgres/upgrade/75.sql new file mode 100644 index 0000000000..19e1c2a95e --- /dev/null +++ b/scripts/plugins/storage/postgres/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb; diff --git a/scripts/plugins/storage/sqlite/downgrade/74.sql b/scripts/plugins/storage/sqlite/downgrade/74.sql new file mode 100644 index 0000000000..9e64bf2fee --- /dev/null +++ b/scripts/plugins/storage/sqlite/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN last_objects; + diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index cb35730643..6acb742e72 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID - description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry - properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties - object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed - object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised - object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream - active_window JSON NOT NULL DEFAULT '{}', -- The window of operations - active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID + description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry + properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties + object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed + object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised + object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream + active_window JSON NOT NULL DEFAULT '{}', -- The window of operations + active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update -- Configuration table diff --git a/scripts/plugins/storage/sqlite/upgrade/75.sql b/scripts/plugins/storage/sqlite/upgrade/75.sql new file mode 100644 index 0000000000..1941746634 --- /dev/null +++ b/scripts/plugins/storage/sqlite/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'; diff --git a/scripts/plugins/storage/sqlitelb/downgrade/74.sql b/scripts/plugins/storage/sqlitelb/downgrade/74.sql new file mode 100644 index 0000000000..9e64bf2fee --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/downgrade/74.sql @@ -0,0 +1,2 @@ +ALTER TABLE fledge.streams DROP COLUMN last_objects; + diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 48b7771f85..d4176b1b99 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2 -- Streams table -- List of the streams to the Cloud. CREATE TABLE fledge.streams ( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID - description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry - properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties - object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed - object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised - object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream - active_window JSON NOT NULL DEFAULT '{}', -- The window of operations - active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive - last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID + description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry + properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties + object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed + object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised + object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream + active_window JSON NOT NULL DEFAULT '{}', -- The window of operations + active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive + last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream) + last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream) + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update -- Configuration table diff --git a/scripts/plugins/storage/sqlitelb/upgrade/75.sql b/scripts/plugins/storage/sqlitelb/upgrade/75.sql new file mode 100644 index 0000000000..1941746634 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/upgrade/75.sql @@ -0,0 +1 @@ +ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}';