#endif
INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataTableCreationCommand_0','CREATE TABLE tdata_%d (item_id integer not null,tdata_timestamp integer not null,record_id ' CONCAT SQL_INT64_TEXT CONCAT ' not null,UNIQUE(record_id))');
-INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataTableCreationCommand_1','CREATE TABLE tdata_records_%d (record_id ' CONCAT SQL_INT64_TEXT CONCAT ' not null,row_id ' CONCAT SQL_INT64_TEXT CONCAT ' not null,instance varchar(255) null,PRIMARY KEY(row_id),FOREIGN KEY (record_id) REFERENCES tdata_%d(record_id) ON DELETE CASCADE)');
-INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataTableCreationCommand_2','CREATE TABLE tdata_rows_%d (row_id ' CONCAT SQL_INT64_TEXT CONCAT ' not null,column_id integer not null,value varchar(255) null,PRIMARY KEY(row_id,column_id),FOREIGN KEY (row_id) REFERENCES tdata_records_%d(row_id) ON DELETE CASCADE)');
-#if defined(DB_POSTGRESQL)
-INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataIndexCreationCommand_0','CREATE INDEX idx_tdata_%d_timestamp_id ON tdata_%d(tdata_timestamp,item_id)');
-#elif defined(DB_MSSQL)
+ VALUES ('TDataTableCreationCommand_0','CREATE TABLE tdata_%d (item_id integer not null,tdata_timestamp integer not null,tdata_value ' CONCAT SQL_TEXT_TEXT CONCAT ' null)');
+#if defined(DB_MSSQL)
INSERT INTO metadata (var_name,var_value)
VALUES ('TDataIndexCreationCommand_0','CREATE CLUSTERED INDEX idx_tdata_%d_id_timestamp ON tdata_%d(item_id,tdata_timestamp)');
#else
INSERT INTO metadata (var_name,var_value)
VALUES ('TDataIndexCreationCommand_0','CREATE INDEX idx_tdata_%d_id_timestamp ON tdata_%d(item_id,tdata_timestamp)');
#endif
-INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataIndexCreationCommand_1','CREATE INDEX idx_tdata_rec_%d_instance ON tdata_records_%d(instance)');
-INSERT INTO metadata (var_name,var_value)
- VALUES ('TDataIndexCreationCommand_2','CREATE INDEX idx_tdata_rec_%d_id ON tdata_records_%d(record_id)');
INSERT INTO metadata (var_name,var_value)
VALUES ('LocationHistory','CREATE TABLE gps_history_%d (latitude varchar(20), longitude varchar(20), accuracy integer not null, start_timestamp integer not null, end_timestamp integer not null, PRIMARY KEY(start_timestamp))');
Table *data = (Table *)value;
TCHAR query[256];
- _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,record_id) VALUES (?,?,?)"), (int)nodeId);
+ _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,tdata_value) VALUES (?,?,?)"), (int)nodeId);
DB_STATEMENT hStmt = DBPrepare(hdb, query);
if (hStmt != NULL)
{
DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, tableId);
DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)timestamp);
- DBBind(hStmt, 3, DB_SQLTYPE_BIGINT, recordId);
+ DBBind(hStmt, 3, DB_SQLTYPE_TEXT, DB_CTYPE_UTF8_STRING, data->createPackedXML(), DB_BIND_DYNAMIC);
success = DBExecute(hStmt);
DBFreeStatement(hStmt);
}
- if (success)
- {
- _sntprintf(query, 256, _T("INSERT INTO tdata_records_%d (record_id,row_id,instance) VALUES (?,?,?)"), (int)nodeId);
- DB_STATEMENT hStmt = DBPrepare(hdb, query);
- if (hStmt != NULL)
- {
- DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId);
- for(int row = 0; row < data->getNumRows(); row++)
- {
- TCHAR instance[MAX_RESULT_LENGTH];
- data->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
- DBBind(hStmt, 2, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
- DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, instance, DB_BIND_STATIC);
- success = DBExecute(hStmt);
- if (!success)
- break;
- }
- DBFreeStatement(hStmt);
- }
- }
-
- if (success)
- {
- _sntprintf(query, 256, _T("INSERT INTO tdata_rows_%d (row_id,column_id,value) VALUES (?,?,?)"), (int)nodeId);
- DB_STATEMENT hStmt = DBPrepare(hdb, query);
- if (hStmt != NULL)
- {
- for(int col = 0; col < data->getNumColumns(); col++)
- {
- INT32 colId = columnIdFromName(data->getColumnName(col));
- if (colId == 0)
- continue; // cannot get column ID
-
- for(int row = 0; row < data->getNumRows(); row++)
- {
- DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
- DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, colId);
- const TCHAR *s = data->getAsString(row, col);
- if ((s == NULL) || (_tcslen(s) < MAX_DB_STRING))
- {
- DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, s, DB_BIND_STATIC);
- }
- else
- {
- TCHAR *sp = (TCHAR *)nx_memdup(s, MAX_DB_STRING * sizeof(TCHAR));
- sp[MAX_DB_STRING - 1] = 0;
- DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, sp, DB_BIND_DYNAMIC);
- }
- success = DBExecute(hStmt);
- if (!success)
- break;
- }
- }
- DBFreeStatement(hStmt);
- }
- }
-
if (success)
DBCommit(hdb);
else
}
/**
- * Prepare statement for reading data from idata table
+ * Prepare statement for reading data from idata/tdata table
*/
-static DB_STATEMENT PrepareIDataSelect(DB_HANDLE hdb, UINT32 nodeId, UINT32 maxRows, const TCHAR *condition)
+static DB_STATEMENT PrepareDataSelect(DB_HANDLE hdb, UINT32 nodeId, int dciType, UINT32 maxRows, const TCHAR *condition)
{
TCHAR query[512];
+ const TCHAR *tablePrefix = (dciType == DCO_TYPE_ITEM) ? _T("idata") : _T("tdata");
switch(g_dbSyntax)
{
case DB_SYNTAX_MSSQL:
- _sntprintf(query, 512, _T("SELECT TOP %d idata_timestamp,idata_value FROM idata_%d WHERE item_id=?%s ORDER BY idata_timestamp DESC"),
- (int)maxRows, (int)nodeId, condition);
+ _sntprintf(query, 512, _T("SELECT TOP %d %s_timestamp,%s_value FROM %s_%d WHERE item_id=?%s ORDER BY %s_timestamp DESC"),
+ (int)maxRows, tablePrefix, tablePrefix, tablePrefix, (int)nodeId, condition, tablePrefix);
break;
case DB_SYNTAX_ORACLE:
- _sntprintf(query, 512, _T("SELECT * FROM (SELECT idata_timestamp,idata_value FROM idata_%d WHERE item_id=?%s ORDER BY idata_timestamp DESC) WHERE ROWNUM<=%d"),
- (int)nodeId, condition, (int)maxRows);
+ _sntprintf(query, 512, _T("SELECT * FROM (SELECT %s_timestamp,%s_value FROM %s_%d WHERE item_id=?%s ORDER BY %s_timestamp DESC) WHERE ROWNUM<=%d"),
+ tablePrefix, tablePrefix, tablePrefix, (int)nodeId, condition, tablePrefix, (int)maxRows);
break;
case DB_SYNTAX_MYSQL:
case DB_SYNTAX_PGSQL:
case DB_SYNTAX_SQLITE:
- _sntprintf(query, 512, _T("SELECT idata_timestamp,idata_value FROM idata_%d WHERE item_id=?%s ORDER BY idata_timestamp DESC LIMIT %d"),
- (int)nodeId, condition, (int)maxRows);
+ _sntprintf(query, 512, _T("SELECT %s_timestamp,%s_value FROM %s_%d WHERE item_id=?%s ORDER BY %s_timestamp DESC LIMIT %d"),
+ tablePrefix, tablePrefix, tablePrefix, (int)nodeId, condition, tablePrefix, (int)maxRows);
break;
case DB_SYNTAX_DB2:
- _sntprintf(query, 512, _T("SELECT idata_timestamp,idata_value FROM idata_%d WHERE item_id=?%s ORDER BY idata_timestamp DESC FETCH FIRST %d ROWS ONLY"),
- (int)nodeId, condition, (int)maxRows);
+ _sntprintf(query, 512, _T("SELECT %s_timestamp,%s_value FROM %s_%d WHERE item_id=?%s ORDER BY %s_timestamp DESC FETCH FIRST %d ROWS ONLY"),
+ tablePrefix, tablePrefix, tablePrefix, (int)nodeId, condition, tablePrefix, (int)maxRows);
break;
default:
- DbgPrintf(1, _T(">>> INTERNAL ERROR: unsupported database in PrepareIDataSelect"));
- return NULL; // Unsupported database
- }
- return DBPrepare(hdb, query);
-}
-
-/**
- * Prepare statement for reading data from tdata table
- */
-static DB_STATEMENT PrepareTDataSelect(DB_HANDLE hdb, UINT32 nodeId, UINT32 maxRows, const TCHAR *condition)
-{
- TCHAR query[1024];
-
- switch(g_dbSyntax)
- {
- case DB_SYNTAX_MSSQL:
- _sntprintf(query, 1024,
- _T("SELECT TOP %d d.tdata_timestamp, r.value FROM tdata_%d d")
- _T(" INNER JOIN tdata_records_%d rec ON rec.record_id=d.record_id ")
- _T(" INNER JOIN tdata_rows_%d r ON r.row_id=rec.row_id ")
- _T("WHERE d.item_id=? AND rec.instance=? AND r.column_id=? %s ")
- _T("ORDER BY d.tdata_timestamp DESC"),
- (int)maxRows, (int)nodeId, (int)nodeId, (int)nodeId, condition);
- break;
- case DB_SYNTAX_ORACLE:
- _sntprintf(query, 1024,
- _T("SELECT * FROM (")
- _T(" SELECT d.tdata_timestamp, r.value FROM tdata_%d d")
- _T(" INNER JOIN tdata_records_%d rec ON rec.record_id=d.record_id")
- _T(" INNER JOIN tdata_rows_%d r ON r.row_id=rec.row_id")
- _T(" WHERE d.item_id=? AND rec.instance=? AND r.column_id=? %s")
- _T(" ORDER BY d.tdata_timestamp DESC)")
- _T("WHERE ROWNUM<=%d"),
- (int)nodeId, (int)nodeId, (int)nodeId, condition, (int)maxRows);
- break;
- case DB_SYNTAX_MYSQL:
- case DB_SYNTAX_PGSQL:
- case DB_SYNTAX_SQLITE:
- _sntprintf(query, 1024,
- _T("SELECT d.tdata_timestamp, r.value FROM tdata_%d d")
- _T(" INNER JOIN tdata_records_%d rec ON rec.record_id=d.record_id ")
- _T(" INNER JOIN tdata_rows_%d r ON r.row_id=rec.row_id ")
- _T("WHERE d.item_id=? AND rec.instance=? AND r.column_id=? %s ")
- _T("ORDER BY d.tdata_timestamp DESC LIMIT %d"),
- (int)nodeId, (int)nodeId, (int)nodeId, condition, (int)maxRows);
- break;
- case DB_SYNTAX_DB2:
- _sntprintf(query, 1024,
- _T("SELECT d.tdata_timestamp, r.value FROM tdata_%d d")
- _T(" INNER JOIN tdata_records_%d rec ON rec.record_id=d.record_id ")
- _T(" INNER JOIN tdata_rows_%d r ON r.row_id=rec.row_id ")
- _T("WHERE d.item_id=? AND rec.instance=? AND r.column_id=? %s ")
- _T("ORDER BY d.tdata_timestamp DESC FETCH FIRST %d ROWS ONLY"),
- (int)nodeId, (int)nodeId, (int)nodeId, condition, (int)maxRows);
- break;
- default:
- DbgPrintf(1, _T(">>> INTERNAL ERROR: unsupported database in PrepareTDataSelect"));
+ DbgPrintf(1, _T(">>> INTERNAL ERROR: unsupported database in PrepareDataSelect"));
return NULL; // Unsupported database
}
return DBPrepare(hdb, query);
goto read_from_db;
}
- TCHAR *instance = request->getFieldAsString(VID_INSTANCE);
+ TCHAR instance[256];
+ request->getFieldAsString(VID_INSTANCE, instance, 256);
int row = t->findRowByInstance(instance);
switch(((DCTable *)dci)->getColumnDataType(dataColumn))
TCHAR condition[256] = _T("");
if (timeFrom != 0)
- _tcscpy(condition, (dciType == DCO_TYPE_TABLE) ? _T(" AND d.tdata_timestamp>=?") : _T(" AND idata_timestamp>=?"));
+ _tcscpy(condition, (dciType == DCO_TYPE_TABLE) ? _T(" AND tdata_timestamp>=?") : _T(" AND idata_timestamp>=?"));
if (timeTo != 0)
- _tcscat(condition, (dciType == DCO_TYPE_TABLE) ? _T(" AND d.tdata_timestamp<=?") : _T(" AND idata_timestamp<=?"));
+ _tcscat(condition, (dciType == DCO_TYPE_TABLE) ? _T(" AND tdata_timestamp<=?") : _T(" AND idata_timestamp<=?"));
bool success = false;
DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
- DB_STATEMENT hStmt;
- switch(dciType)
- {
- case DCO_TYPE_ITEM:
- hStmt = PrepareIDataSelect(hdb, dcTarget->getId(), maxRows, condition);
- break;
- case DCO_TYPE_TABLE:
- hStmt = PrepareTDataSelect(hdb, dcTarget->getId(), maxRows, condition);
- break;
- default:
- hStmt = NULL;
- break;
- }
-
+ DB_STATEMENT hStmt = PrepareDataSelect(hdb, dcTarget->getId(), dciType, maxRows, condition);
if (hStmt != NULL)
{
TCHAR dataColumn[MAX_COLUMN_NAME] = _T("");
+ TCHAR instance[256];
int pos = 1;
DBBind(hStmt, pos++, DB_SQLTYPE_INTEGER, dci->getId());
if (dciType == DCO_TYPE_TABLE)
{
request->getFieldAsString(VID_DATA_COLUMN, dataColumn, MAX_COLUMN_NAME);
-
- DBBind(hStmt, pos++, DB_SQLTYPE_VARCHAR, request->getFieldAsString(VID_INSTANCE), DB_BIND_DYNAMIC);
- DBBind(hStmt, pos++, DB_SQLTYPE_INTEGER, DCTable::columnIdFromName(dataColumn));
+ request->getFieldAsString(VID_INSTANCE, instance, 256);
}
if (timeFrom != 0)
DBBind(hStmt, pos++, DB_SQLTYPE_INTEGER, timeFrom);
rows++;
pCurr->timeStamp = htonl(DBGetFieldULong(hResult, 0));
- switch(dataType)
+ if (dciType == DCO_TYPE_ITEM)
{
- case DCI_DT_INT:
- case DCI_DT_UINT:
- pCurr->value.int32 = htonl(DBGetFieldULong(hResult, 1));
- break;
- case DCI_DT_INT64:
- case DCI_DT_UINT64:
- pCurr->value.ext.v64.int64 = htonq(DBGetFieldUInt64(hResult, 1));
- break;
- case DCI_DT_FLOAT:
- pCurr->value.ext.v64.real = htond(DBGetFieldDouble(hResult, 1));
- break;
- case DCI_DT_STRING:
+ switch(dataType)
+ {
+ case DCI_DT_INT:
+ case DCI_DT_UINT:
+ pCurr->value.int32 = htonl(DBGetFieldULong(hResult, 1));
+ break;
+ case DCI_DT_INT64:
+ case DCI_DT_UINT64:
+ pCurr->value.ext.v64.int64 = htonq(DBGetFieldUInt64(hResult, 1));
+ break;
+ case DCI_DT_FLOAT:
+ pCurr->value.ext.v64.real = htond(DBGetFieldDouble(hResult, 1));
+ break;
+ case DCI_DT_STRING:
#ifdef UNICODE
#ifdef UNICODE_UCS4
- DBGetField(hResult, 1, szBuffer, MAX_DCI_STRING_VALUE);
- ucs4_to_ucs2(szBuffer, -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
+ DBGetField(hResult, 1, szBuffer, MAX_DCI_STRING_VALUE);
+ ucs4_to_ucs2(szBuffer, -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
#else
- DBGetField(hResult, 1, pCurr->value.string, MAX_DCI_STRING_VALUE);
+ DBGetField(hResult, 1, pCurr->value.string, MAX_DCI_STRING_VALUE);
#endif
#else
- DBGetField(hResult, 1, szBuffer, MAX_DCI_STRING_VALUE);
- mb_to_ucs2(szBuffer, -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
+ DBGetField(hResult, 1, szBuffer, MAX_DCI_STRING_VALUE);
+ mb_to_ucs2(szBuffer, -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
#endif
- SwapWideString(pCurr->value.string);
- break;
+ SwapWideString(pCurr->value.string);
+ break;
+ }
+ }
+ else
+ {
+ char *encodedTable = DBGetFieldUTF8(hResult, 1, NULL, 0);
+ if (encodedTable != NULL)
+ {
+ Table *table = Table::createFromPackedXML(encodedTable);
+ if (table != NULL)
+ {
+ int row = table->findRowByInstance(instance);
+ int col = table->getColumnIndex(dataColumn);
+ switch(dataType)
+ {
+ case DCI_DT_INT:
+ pCurr->value.int32 = htonl((UINT32)table->getAsInt(row, col));
+ break;
+ case DCI_DT_UINT:
+ pCurr->value.int32 = htonl(table->getAsUInt(row, col));
+ break;
+ case DCI_DT_INT64:
+ pCurr->value.ext.v64.int64 = htonq((UINT64)table->getAsInt64(row, col));
+ break;
+ case DCI_DT_UINT64:
+ pCurr->value.ext.v64.int64 = htonq(table->getAsUInt64(row, col));
+ break;
+ case DCI_DT_FLOAT:
+ pCurr->value.ext.v64.real = htond(table->getAsDouble(row, col));
+ break;
+ case DCI_DT_STRING:
+#ifdef UNICODE
+#ifdef UNICODE_UCS4
+ ucs4_to_ucs2(CHECK_NULL_EX(table->getAsString(row, col)), -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
+#else
+ nx_strncpy(pCurr->value.string, CHECK_NULL_EX(table->getAsString(row, col)), MAX_DCI_STRING_VALUE);
+#endif
+#else
+ mb_to_ucs2(CHECK_NULL_EX(table->getAsString(row, col)), -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
+#endif
+ SwapWideString(pCurr->value.string);
+ break;
+ }
+ delete table;
+ }
+ free(encodedTable);
+ }
}
pCurr = (DCI_DATA_ROW *)(((char *)pCurr) + s_rowSize[dataType]);
}