implemented ad-hoc summary tables with aggregation functions
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 10 Nov 2014 16:47:35 +0000 (18:47 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 10 Nov 2014 16:47:35 +0000 (18:47 +0200)
12 files changed:
include/nms_cscp.h
include/nxclapi.h
src/java/netxms-base/src/main/java/org/netxms/base/NXCPCodes.java
src/java/netxms-client/src/main/java/org/netxms/client/NXCSession.java
src/java/netxms-client/src/main/java/org/netxms/client/constants/AggregationFunction.java [new file with mode: 0644]
src/java/netxms-client/src/test/java/org/netxms/client/DataCollectionTest.java
src/server/core/dc_nxsl.cpp
src/server/core/dcitem.cpp
src/server/core/dcst.cpp
src/server/core/dctarget.cpp
src/server/include/nms_dcoll.h
src/server/include/nms_objects.h

index e1fa52a..cd0f3fc 100644 (file)
@@ -1004,6 +1004,7 @@ typedef struct
 #define VID_CITY                    ((UINT32)487)
 #define VID_STREET_ADDRESS          ((UINT32)488)
 #define VID_POSTCODE                ((UINT32)489)
+#define VID_FUNCTION                ((UINT32)490)
 
 // Base variabe for single threshold in message
 #define VID_THRESHOLD_BASE          ((UINT32)0x00800000)
index 621994c..0a0f7fe 100644 (file)
@@ -826,6 +826,18 @@ enum
 #define F_SCRIPT     6
 
 /**
+ * DCI aggregation functions
+ */
+enum AggregationFunction
+{
+   DCI_AGG_LAST = 0,
+   DCI_AGG_MIN = 1,
+   DCI_AGG_MAX = 2,
+   DCI_AGG_AVG = 3,
+   DCI_AGG_SUM = 4
+};
+
+/**
  * Threshold operations
  */
 #define OP_LE        0
index b5327df..9ed6154 100644 (file)
@@ -830,6 +830,7 @@ public class NXCPCodes
    public static final long VID_CITY = 487;
    public static final long VID_STREET_ADDRESS = 488;
    public static final long VID_POSTCODE = 489;
+   public static final long VID_FUNCTION = 490;
 
        public static final long VID_ACL_USER_BASE = 0x00001000L;
        public static final long VID_ACL_USER_LAST = 0x00001FFFL;
index 560cbaa..305e5b4 100644 (file)
@@ -90,6 +90,7 @@ import org.netxms.base.NXCPMsgWaitQueue;
 import org.netxms.base.NXCommon;
 import org.netxms.client.agent.config.ConfigContent;
 import org.netxms.client.agent.config.ConfigListElement;
+import org.netxms.client.constants.AggregationFunction;
 import org.netxms.client.constants.RCC;
 import org.netxms.client.constants.ObjectStatus;
 import org.netxms.client.dashboards.DashboardElement;
@@ -7628,15 +7629,22 @@ public class NXCSession implements Session, ScriptLibraryManager, UserManager, S
     * Query ad-hoc DCI summary table.
     *
     * @param baseObjectId base container object ID
+    * @param columns columns for resulting table
+    * @param function data aggregation function
+    * @param periodStart start of query period
+    * @param periodEnd end of query period
     * @return table with last values data for all nodes under given base container
     * @throws IOException  if socket I/O error occurs
     * @throws NetXMSClientException if NetXMS server returns an error or operation was timed out
     */
-   public Table queryAdHocDciSummaryTable(long baseObjectId, List<DciSummaryTableColumn> columns) throws IOException, NetXMSClientException
+   public Table queryAdHocDciSummaryTable(long baseObjectId, List<DciSummaryTableColumn> columns, AggregationFunction function, Date periodStart, Date periodEnd) throws IOException, NetXMSClientException
    {
       final NXCPMessage msg = newMessage(NXCPCodes.CMD_QUERY_ADHOC_SUMMARY_TABLE);
       msg.setVariableInt32(NXCPCodes.VID_OBJECT_ID, (int) baseObjectId);
       msg.setVariableInt32(NXCPCodes.VID_NUM_COLUMNS, columns.size());
+      msg.setVariableInt16(NXCPCodes.VID_FUNCTION, (function != null) ? function.getValue() : AggregationFunction.LAST.getValue());
+      msg.setVariableInt64(NXCPCodes.VID_TIME_FROM, (periodStart != null) ? periodStart.getTime() / 1000 : 0);
+      msg.setVariableInt64(NXCPCodes.VID_TIME_TO, (periodEnd != null) ? periodEnd.getTime() / 1000 : 0);
       long id = NXCPCodes.VID_COLUMN_INFO_BASE;
       for(DciSummaryTableColumn c : columns)
       {
diff --git a/src/java/netxms-client/src/main/java/org/netxms/client/constants/AggregationFunction.java b/src/java/netxms-client/src/main/java/org/netxms/client/constants/AggregationFunction.java
new file mode 100644 (file)
index 0000000..03d04b9
--- /dev/null
@@ -0,0 +1,77 @@
+/**
+ * NetXMS - open source network management system
+ * Copyright (C) 2003-2014 Victor Kirhenshtein
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+package org.netxms.client.constants;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.netxms.base.Logger;
+
+/**
+ * Data aggregation function
+ */
+public enum AggregationFunction
+{
+   LAST(0),
+   MIN(1),
+   MAX(2),
+   AVERAGE(3),
+   SUM(4);
+
+   private int value;
+   private static Map<Integer, AggregationFunction> lookupTable = new HashMap<Integer, AggregationFunction>();
+
+   static
+   {
+      for(AggregationFunction element : AggregationFunction.values())
+      {
+         lookupTable.put(element.value, element);
+      }
+   }
+
+   /**
+    * @param value
+    */
+   private AggregationFunction(int value)
+   {
+      this.value = value;
+   }
+
+   /**
+    * @return
+    */
+   public int getValue()
+   {
+      return value;
+   }
+
+   /**
+    * @param value
+    * @return
+    */
+   public static AggregationFunction getByValue(int value)
+   {
+      final AggregationFunction element = lookupTable.get(value);
+      if (element == null)
+      {
+         Logger.warning(AggregationFunction.class.getName(), "Unknown element " + value);
+         return LAST; // fallback
+      }
+      return element;
+   }
+}
index d14e5ba..25e512c 100644 (file)
@@ -19,7 +19,9 @@
 package org.netxms.client;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import org.netxms.client.constants.AggregationFunction;
 import org.netxms.client.constants.RCC;
 import org.netxms.client.datacollection.DataCollectionConfiguration;
 import org.netxms.client.datacollection.DataCollectionItem;
@@ -184,7 +186,7 @@ public class DataCollectionTest extends SessionTest
       columns.add(new DciSummaryTableColumn("Usage", "System.CPU.Usage", 0));
       columns.add(new DciSummaryTableColumn("I/O Wait", "System.CPU.IOWait", 0));
 
-      Table result = session.queryAdHocDciSummaryTable(2, columns);
+      Table result = session.queryAdHocDciSummaryTable(2, columns, AggregationFunction.AVERAGE, new Date(System.currentTimeMillis() - 86400000), new Date());
       System.out.println(result.getRowCount() + " rows in result set");
       for(int i = 0; i < result.getColumnCount(); i++)
          System.out.print(String.format(" | %-20s", result.getColumnDisplayName(i)));
index 3ee2d86..688e366 100644 (file)
@@ -261,9 +261,7 @@ static int F_FindAllDCIs(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXS
 /**
  * Get min, max or average of DCI values for a period
  */
-typedef enum { DCI_MIN = 0, DCI_MAX = 1, DCI_AVG = 2, DCI_SUM = 3 } DciSqlFunc_t;
-
-static int F_GetDCIValueStat(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm, DciSqlFunc_t sqlFunc)
+static int F_GetDCIValueStat(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm, AggregationFunction func)
 {
        if (!argv[0]->isObject())
                return NXSL_ERR_NOT_OBJECT;
@@ -279,64 +277,16 @@ static int F_GetDCIValueStat(int argc, NXSL_Value **argv, NXSL_Value **ppResult,
        DCObject *dci = node->getDCObjectById(argv[1]->getValueAsUInt32());
        if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM))
        {
-               double result = 0;
-               DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
-               TCHAR query[1024];
-      static const TCHAR *functions[] = { _T("min"), _T("max"), _T("avg"), _T("sum") };
-
-               if (g_dbSyntax == DB_SYNTAX_ORACLE)
-               {
-                       _sntprintf(query, 1024, _T("SELECT %s(coalesce(to_number(idata_value),0)) FROM idata_%u ")
-                               _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ?"), 
-                               functions[sqlFunc], node->getId());
-               }
-               else if (g_dbSyntax == DB_SYNTAX_MSSQL)
-               {
-                       _sntprintf(query, 1024, _T("SELECT %s(coalesce(cast(idata_value as float),0)) FROM idata_%u ")
-                               _T("WHERE item_id=? AND (idata_timestamp BETWEEN ? AND ?) AND isnumeric(idata_value)=1"), 
-                               functions[sqlFunc], node->getId());
-               }
-               else if (g_dbSyntax == DB_SYNTAX_PGSQL)
-               {
-                       _sntprintf(query, 1024, _T("SELECT %s(coalesce(idata_value::double precision,0)) FROM idata_%u ")
-                               _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ?"), 
-                               functions[sqlFunc],     node->getId());
-               }
-               else
-               {
-                       _sntprintf(query, 1024, _T("SELECT %s(coalesce(idata_value,0)) FROM idata_%u ")
-                               _T("WHERE item_id=? and idata_timestamp between ? and ?"), 
-                               functions[sqlFunc],     node->getId());
-               }
-
-               DB_STATEMENT hStmt = DBPrepare(hdb, query);
-               if (hStmt != NULL)
-               {
-                       DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, argv[1]->getValueAsUInt32());
-                       DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, argv[2]->getValueAsInt32());
-                       DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, argv[3]->getValueAsInt32());
-                       DB_RESULT hResult = DBSelectPrepared(hStmt);
-                       if (hResult != NULL)
-                       {
-                               if (DBGetNumRows(hResult) == 1)
-                               {
-                                       result = DBGetFieldDouble(hResult, 0, 0);
-                               }
-                               *ppResult = new NXSL_Value(result);
-                               DBFreeResult(hResult);
-                       }
-                       else
-                       {
-                               *ppResult = new NXSL_Value;     // Return NULL if prepared select failed
-                       }
-                       DBFreeStatement(hStmt);
-               }
-               else
-               {
-                       *ppResult = new NXSL_Value;     // Return NULL if prepare failed
-               }
-
-               DBConnectionPoolReleaseConnection(hdb);
+      TCHAR *result = ((DCItem *)dci)->getAggregateValue(func, argv[2]->getValueAsInt32(), argv[3]->getValueAsInt32());
+      if (result != NULL)
+      {
+         *ppResult = new NXSL_Value(result);
+         free(result);
+      }
+      else
+      {
+                  *ppResult = new NXSL_Value;  // Return NULL if query failed
+      }
        }
        else
        {
@@ -351,7 +301,7 @@ static int F_GetDCIValueStat(int argc, NXSL_Value **argv, NXSL_Value **ppResult,
  */
 static int F_GetMinDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm)
 {
-       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_MIN);
+       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_AGG_MIN);
 }
 
 /**
@@ -359,7 +309,7 @@ static int F_GetMinDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult,
  */
 static int F_GetMaxDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm)
 {
-       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_MAX);
+       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_AGG_MAX);
 }
 
 /**
@@ -367,7 +317,7 @@ static int F_GetMaxDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult,
  */
 static int F_GetAvgDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm)
 {
-       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_AVG);
+       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_AGG_AVG);
 }
 
 /**
@@ -375,7 +325,7 @@ static int F_GetAvgDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult,
  */
 static int F_GetSumDCIValue(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm)
 {
-       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_SUM);
+       return F_GetDCIValueStat(argc, argv, ppResult, vm, DCI_AGG_SUM);
 }
 
 /**
index e76106a..b4f2fe4 100644 (file)
@@ -1234,6 +1234,66 @@ ItemValue *DCItem::getInternalLastValue()
 }
 
 /**
+ * Get aggregate value. Returned value must be deallocated by caller.
+ *
+ * @return dynamically allocated value or NULL on error
+ */
+TCHAR *DCItem::getAggregateValue(AggregationFunction func, time_t periodStart, time_t periodEnd)
+{
+       DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
+       TCHAR query[1024];
+   TCHAR *result = NULL;
+   
+   static const TCHAR *functions[] = { _T(""), _T("min"), _T("max"), _T("avg"), _T("sum") };
+
+       if (g_dbSyntax == DB_SYNTAX_ORACLE)
+       {
+               _sntprintf(query, 1024, _T("SELECT %s(coalesce(to_number(idata_value),0)) FROM idata_%u ")
+                       _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ?"), 
+                       functions[func], m_pNode->getId());
+       }
+       else if (g_dbSyntax == DB_SYNTAX_MSSQL)
+       {
+               _sntprintf(query, 1024, _T("SELECT %s(coalesce(cast(idata_value as float),0)) FROM idata_%u ")
+                       _T("WHERE item_id=? AND (idata_timestamp BETWEEN ? AND ?) AND isnumeric(idata_value)=1"), 
+                       functions[func], m_pNode->getId());
+       }
+       else if (g_dbSyntax == DB_SYNTAX_PGSQL)
+       {
+               _sntprintf(query, 1024, _T("SELECT %s(coalesce(idata_value::double precision,0)) FROM idata_%u ")
+                       _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ?"), 
+                       functions[func], m_pNode->getId());
+       }
+       else
+       {
+               _sntprintf(query, 1024, _T("SELECT %s(coalesce(idata_value,0)) FROM idata_%u ")
+                       _T("WHERE item_id=? and idata_timestamp between ? and ?"), 
+                       functions[func], m_pNode->getId());
+       }
+
+       DB_STATEMENT hStmt = DBPrepare(hdb, query);
+       if (hStmt != NULL)
+       {
+               DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
+               DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)periodStart);
+               DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT32)periodEnd);
+               DB_RESULT hResult = DBSelectPrepared(hStmt);
+               if (hResult != NULL)
+               {
+                       if (DBGetNumRows(hResult) == 1)
+                       {
+                               result = DBGetField(hResult, 0, 0, NULL, 0);
+                       }
+                       DBFreeResult(hResult);
+               }
+               DBFreeStatement(hStmt);
+       }
+
+       DBConnectionPoolReleaseConnection(hdb);
+   return result;
+}
+
+/**
  * Clean expired data
  */
 void DCItem::deleteExpiredData()
index c74eb9a..9b8394f 100644 (file)
@@ -153,6 +153,9 @@ SummaryTable::SummaryTable(CSCPMessage *msg)
    m_title[0] = 0;
    m_flags = 0;
    m_filter = NULL;
+   m_aggregationFunction = (AggregationFunction)msg->getFieldAsInt16(VID_FUNCTION);
+   m_periodStart = msg->getFieldAsTime(VID_TIME_FROM);
+   m_periodEnd = msg->getFieldAsTime(VID_TIME_TO);
 
    int count = msg->getFieldAsInt32(VID_NUM_COLUMNS);
    m_columns = new ObjectArray<SummaryTableColumn>(count, 16, true);
@@ -173,6 +176,10 @@ SummaryTable::SummaryTable(DB_RESULT hResult)
    DBGetField(hResult, 0, 0, m_title, MAX_DB_STRING);
    m_flags = DBGetFieldULong(hResult, 0, 1);
 
+   m_aggregationFunction = DCI_AGG_LAST;
+   m_periodStart = 0;
+   m_periodEnd = 0;
+
    // Filter script
    TCHAR *filterSource = DBGetField(hResult, 0, 2, NULL, 0);
    if (filterSource != NULL)
@@ -346,7 +353,7 @@ Table *QuerySummaryTable(LONG tableId, SummaryTable *adHocDefinition, UINT32 bas
 
       if (tableDefinition->filter((DataCollectionTarget *)obj))
       {
-         ((DataCollectionTarget *)obj)->getLastValuesSummary(tableDefinition, tableData);
+         ((DataCollectionTarget *)obj)->getDciValuesSummary(tableDefinition, tableData);
       }
       obj->decRefCount();
    }
index 4872c31..8d1ba5d 100644 (file)
@@ -638,7 +638,7 @@ UINT32 DataCollectionTarget::getScriptItem(const TCHAR *param, size_t bufSize, T
 /**
  * Get last (current) DCI values for summary table.
  */
-void DataCollectionTarget::getLastValuesSummary(SummaryTable *tableDefinition, Table *tableData)
+void DataCollectionTarget::getDciValuesSummary(SummaryTable *tableDefinition, Table *tableData)
 {
    bool rowAdded = false;
    lockDciAccess(false);
@@ -662,9 +662,20 @@ void DataCollectionTarget::getLastValuesSummary(SummaryTable *tableDefinition, T
                tableData->setObjectId(tableData->getNumRows() - 1, m_id);
                rowAdded = true;
             }
-            tableData->set(i + 1, ((DCItem *)object)->getLastValue());
             tableData->setStatus(i + 1, ((DCItem *)object)->getThresholdSeverity());
             tableData->getColumnDefinitions()->get(i + 1)->setDataType(((DCItem *)object)->getDataType());
+            if (tableDefinition->getAggregationFunction() == F_LAST)
+            {
+               tableData->set(i + 1, ((DCItem *)object)->getLastValue());
+            }
+            else
+            {
+               tableData->set(i + 1, 
+                  ((DCItem *)object)->getAggregateValue(
+                     tableDefinition->getAggregationFunction(), 
+                     tableDefinition->getPeriodStart(), 
+                     tableDefinition->getPeriodEnd()));
+            }
          }
       }
    }
index 7115177..180c6a3 100644 (file)
@@ -375,6 +375,7 @@ public:
    NXSL_Value *getRawValueForNXSL();
    const TCHAR *getLastValue();
    ItemValue *getInternalLastValue();
+   TCHAR *getAggregateValue(AggregationFunction func, time_t periodStart, time_t periodEnd);
 
    virtual void createMessage(CSCPMessage *pMsg);
 #if defined(__SUNPRO_CC) || defined(__HP_aCC)
index c9109d9..07a23b7 100644 (file)
@@ -281,6 +281,9 @@ private:
    UINT32 m_flags;
    ObjectArray<SummaryTableColumn> *m_columns;
    NXSL_VM *m_filter;
+   AggregationFunction m_aggregationFunction;
+   time_t m_periodStart;
+   time_t m_periodEnd;
 
    SummaryTable(DB_RESULT hResult);
 
@@ -295,6 +298,9 @@ public:
 
    int getNumColumns() { return m_columns->size(); }
    SummaryTableColumn *getColumn(int index) { return m_columns->get(index); }
+   AggregationFunction getAggregationFunction() { return m_aggregationFunction; }
+   time_t getPeriodStart() { return m_periodStart; }
+   time_t getPeriodEnd() { return m_periodEnd; }
 };
 
 /**
@@ -811,7 +817,7 @@ public:
    UINT32 getTableLastValues(UINT32 dciId, CSCPMessage *msg);
        UINT32 getThresholdSummary(CSCPMessage *msg, UINT32 baseId);
        UINT32 getPerfTabDCIList(CSCPMessage *pMsg);
-   void getLastValuesSummary(SummaryTable *tableDefinition, Table *tableData);
+   void getDciValuesSummary(SummaryTable *tableDefinition, Table *tableData);
 
    void updateDciCache();
    void cleanDCIData();