Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 160 additions & 8 deletions C/services/north/data_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ DataLoad::DataLoad(const string& name, long streamId, StorageClient *storage) :
m_streamUpdate = 1;
m_lastFetched = getLastSentId();
m_streamSent = getLastSentId();
migrateStats();
m_flushRequired = false;
m_thread = new thread(threadMain, this);
loadFilters(name);
Expand Down Expand Up @@ -118,6 +119,8 @@ bool DataLoad::setDataSource(const string& source)
source.c_str(), m_name.c_str());
return false;
}
m_lastFetched = getLastSentId();
m_streamSent = getLastSentId();
return true;
}

Expand Down Expand Up @@ -337,7 +340,6 @@ unsigned long DataLoad::getLastSentId()

// SELECT * FROM fledge.streams WHERE id = x
Query qLastId(wStreamId);

ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId);

if (lastObjectId != NULL && lastObjectId->rowCount())
Expand All @@ -349,9 +351,23 @@ unsigned long DataLoad::getLastSentId()
if (row)
{
// Get column value
ResultSet::ColumnValue* theVal = row->getColumn("last_object");
// Set found id
unsigned long rval = (unsigned long)theVal->getInteger();
ResultSet::ColumnValue* theVal = nullptr;
unsigned long rval = 0;
theVal = row->getColumn("last_objects");
const rapidjson::Value * json = theVal->getJSON();
switch(m_dataSource)
{
case SourceReadings:
rval = json->GetObject()["Readings"].GetInt64();
break;
case SourceStatistics:
rval = json->GetObject()["Stats"].GetInt64();
break;
case SourceAudit:
rval = json->GetObject()["Audit"].GetInt64();
break;
}

delete lastObjectId;
return rval;
}
Expand Down Expand Up @@ -455,14 +471,13 @@ int DataLoad::createNewStream()
{
int streamId = 0;
InsertValues streamValues;

streamValues.push_back(InsertValue("description", m_name));
streamValues.push_back(InsertValue("last_object", 0));
streamValues.push_back(InsertValue("last_objects", "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}" ));

if (m_storage->insertTable("streams", streamValues) != 1)
{
Logger::getLogger()->error("Failed to insert a row into the streams table");
}
}
else
{
// Select the row just created, having description='process name'
Expand Down Expand Up @@ -514,9 +529,107 @@ void DataLoad::flushLastSentId()
const Condition condition(Equals);
Where where("id", condition, to_string(m_streamId));
InsertValues lastId;

std::string last_objects = "{}";
last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}";
unsigned long readingsSentCount = 0;
unsigned long statsSentCount = 0;
unsigned long auditSentCount= 0;
std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects();

switch(m_dataSource)
{
case SourceReadings:
last_objects = "{\\\"Readings\\\":" + to_string(m_streamSent) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}";
break;
case SourceStatistics:
last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(m_streamSent) + ",\\\"Audit\\\":" + to_string(auditSentCount) + "}";
break;
case SourceAudit:
last_objects = "{\\\"Readings\\\":" + to_string(readingsSentCount) + ",\\\"Stats\\\":" + to_string(statsSentCount) + ",\\\"Audit\\\":" + to_string(m_streamSent) + "}";
break;
}
lastId.push_back(InsertValue("last_objects", last_objects));
m_storage->updateTable("streams", lastId, where);

lastId.push_back(InsertValue("last_object", (long)m_streamSent));
}

/**
* Migrate data of last_object field of streams table to new field last_objects
*/
void DataLoad::migrateStats()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nothing to do with stats, it is the position in the stream. The object ID's in a stream are not by defintion continuous, therefore making the assumption they are is dangerous. Consider for example cases of filter pipelines in the north that remove readings fom the stream.

{
// Get last_object field to check if migration of data is required to new field
const Condition conditionId(Equals);
string streamId = to_string(m_streamId);
Where* wStreamId = new Where("id",
conditionId,
streamId);

// SELECT * FROM foglamp.streams WHERE id = x
Query qLastId(wStreamId);

ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId);

long lastSent = 0;
if (lastObjectId != NULL && lastObjectId->rowCount())
{
// Get the first row only
ResultSet::RowIterator it = lastObjectId->firstRow();
// Access the element
ResultSet::Row* row = *it;
if (row)
{
// Get column value
ResultSet::ColumnValue* theVal = row->getColumn("last_object");
// Set found id
lastSent = (long)theVal->getInteger();
}
}
// Free result set
delete lastObjectId;

if (lastSent < 0) // Already migrated
return;

// Not already migrated, migrate It.

const Condition condition(Equals);
Where where("id", condition, to_string(m_streamId));
InsertValues lastId;

std::string last_objects = "{}";
last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":0}";
unsigned long readingsSentCount = 0;
unsigned long statsSentCount = 0;
unsigned long auditSentCount= 0;

std::tie(readingsSentCount,statsSentCount,auditSentCount) = getLastObjects();

switch(m_dataSource)
{
case SourceReadings:
last_objects = "{\\\"Readings\\\":" + to_string(lastSent) + ",\\\"Stats\\\":0,\\\"Audit\\\":0}";
break;
case SourceStatistics:
last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":" + to_string(lastSent) + ",\\\"Audit\\\":0}";
break;
case SourceAudit:
last_objects = "{\\\"Readings\\\":0,\\\"Stats\\\":0,\\\"Audit\\\":" + to_string(lastSent) + "}";
break;
}

lastId.push_back(InsertValue("last_objects", last_objects));
m_storage->updateTable("streams", lastId, where);

// Update last_object field to mark that migration is done
lastSent = -1;
const Condition resetCondition(Equals);
Where resetClause("id", resetCondition, to_string(m_streamId));
InsertValues resetlastObject;

resetlastObject.push_back(InsertValue("last_object", (long)lastSent));
m_storage->updateTable("streams", resetlastObject, resetClause);
}


Expand Down Expand Up @@ -724,3 +837,42 @@ void DataLoad::configChange(const string& category, const string& newConfig)
}
}
}

/**
* Get the a tuple for the last objects sent
*/

std::tuple<unsigned long,unsigned long,unsigned long> DataLoad::getLastObjects()
{

std::tuple<unsigned long,unsigned long,unsigned long> t;
const Condition conditionId(Equals);
string streamId = to_string(m_streamId);
Where* wStreamId = new Where("id",
conditionId,
streamId);

// SELECT * FROM fledge.streams WHERE id = x
Query qLastId(wStreamId);

ResultSet* lastObjectId = m_storage->queryTable("streams", qLastId);

if (lastObjectId != NULL && lastObjectId->rowCount())
{
// Get the first row only
ResultSet::RowIterator it = lastObjectId->firstRow();
// Access the element
ResultSet::Row* row = *it;
if (row)
{
ResultSet::ColumnValue* theVal = row->getColumn("last_objects");
const rapidjson::Value * json = theVal->getJSON();
t = std::make_tuple(json->GetObject()["Readings"].GetInt64(),json->GetObject()["Stats"].GetInt64(),json->GetObject()["Audit"].GetInt64());
delete lastObjectId;
return t;
}
}
// Free result set
delete lastObjectId;
return std::make_tuple(0,0,0);
}
3 changes: 3 additions & 0 deletions C/services/north/include/data_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <filter_pipeline.h>
#include <service_handler.h>
#include <perfmonitors.h>
#include <tuple>

#define DEFAULT_BLOCK_SIZE 100

Expand Down Expand Up @@ -70,6 +71,8 @@ class DataLoad : public ServiceHandler {
ReadingSet *fetchAudit(unsigned int blockSize);
void bufferReadings(ReadingSet *readings);
bool loadFilters(const std::string& category);
std::tuple<unsigned long,unsigned long,unsigned long> getLastObjects();
void migrateStats();

private:
const std::string& m_name;
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fledge_version=2.6.0
fledge_schema=74
fledge_schema=75
2 changes: 2 additions & 0 deletions scripts/plugins/storage/postgres/downgrade/74.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE fledge.streams DROP COLUMN last_objects;

23 changes: 12 additions & 11 deletions scripts/plugins/storage/postgres/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,18 @@ CREATE INDEX readings_ix3
-- Streams table
-- List of the streams to the Cloud.
CREATE TABLE fledge.streams (
id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID
description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry
properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties
object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed
object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised
object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream
active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations
active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update
CONSTRAINT strerams_pkey PRIMARY KEY (id));
id integer NOT NULL DEFAULT nextval('fledge.streams_id_seq'::regclass), -- Sequence ID
description character varying(255) NOT NULL DEFAULT ''::character varying COLLATE pg_catalog."default", -- A brief description of the stream entry
properties jsonb NOT NULL DEFAULT '{}'::jsonb, -- A generic set of properties
object_stream jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of what must be streamed
object_block jsonb NOT NULL DEFAULT '{}'::jsonb, -- Definition of how the stream must be organised
object_filter jsonb NOT NULL DEFAULT '{}'::jsonb, -- Any filter involved in selecting the data to stream
active_window jsonb NOT NULL DEFAULT '{}'::jsonb, -- The window of operations
active boolean NOT NULL DEFAULT true, -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb, -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream)
ts timestamp(6) with time zone NOT NULL DEFAULT now(), -- Creation or last update
CONSTRAINT strerams_pkey PRIMARY KEY (id));


-- Configuration table
Expand Down
1 change: 1 addition & 0 deletions scripts/plugins/storage/postgres/upgrade/75.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE fledge.streams ADD COLUMN last_objects jsonb NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}'::jsonb;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is not preserving the current point reached in the readings being sent north. This will be an issue for upgrade of a running system.

2 changes: 2 additions & 0 deletions scripts/plugins/storage/sqlite/downgrade/74.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE fledge.streams DROP COLUMN last_objects;

21 changes: 11 additions & 10 deletions scripts/plugins/storage/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2
-- Streams table
-- List of the streams to the Cloud.
CREATE TABLE fledge.streams (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID
description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry
properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties
object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed
object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised
object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream
active_window JSON NOT NULL DEFAULT '{}', -- The window of operations
active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID
description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry
properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties
object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed
object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised
object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream
active_window JSON NOT NULL DEFAULT '{}', -- The window of operations
active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream)
ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update


-- Configuration table
Expand Down
1 change: 1 addition & 0 deletions scripts/plugins/storage/sqlite/upgrade/75.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with other upgrader scripts, this does not preserve the location in the readings

2 changes: 2 additions & 0 deletions scripts/plugins/storage/sqlitelb/downgrade/74.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE fledge.streams DROP COLUMN last_objects;

21 changes: 11 additions & 10 deletions scripts/plugins/storage/sqlitelb/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ CREATE INDEX fki_asset_messages_fk2
-- Streams table
-- List of the streams to the Cloud.
CREATE TABLE fledge.streams (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID
description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry
properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties
object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed
object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised
object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream
active_window JSON NOT NULL DEFAULT '{}', -- The window of operations
active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Sequence ID
description character varying(255) NOT NULL DEFAULT '', -- A brief description of the stream entry
properties JSON NOT NULL DEFAULT '{}', -- A generic set of properties
object_stream JSON NOT NULL DEFAULT '{}', -- Definition of what must be streamed
object_block JSON NOT NULL DEFAULT '{}', -- Definition of how the stream must be organised
object_filter JSON NOT NULL DEFAULT '{}', -- Any filter involved in selecting the data to stream
active_window JSON NOT NULL DEFAULT '{}', -- The window of operations
active boolean NOT NULL DEFAULT 't', -- When false, all data to this stream stop and are inactive
last_object bigint NOT NULL DEFAULT 0, -- The ID of the last object streamed (asset or reading, depending on the object_stream)
last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}', -- The ID of the last object streamed (Readings,Stats or Audit , depending on the object_stream)
ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime'))); -- Creation or last update


-- Configuration table
Expand Down
1 change: 1 addition & 0 deletions scripts/plugins/storage/sqlitelb/upgrade/75.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE fledge.streams ADD COLUMN last_objects JSON NOT NULL DEFAULT '{"Readings":0,"Stats":0,"Audit":0}';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, the current position is lost so this will cause issues.