Added functionality to send <<ERROR>> status for cashed dci values Fixes #1092
authorzev <zev@radensolutions.com>
Thu, 9 Jun 2016 09:47:02 +0000 (12:47 +0300)
committerzev <zev@radensolutions.com>
Thu, 9 Jun 2016 09:48:33 +0000 (12:48 +0300)
src/agent/core/datacoll.cpp
src/agent/core/dbupgrade.cpp
src/agent/core/dcsnmp.cpp
src/agent/core/localdb.h
src/server/core/agent.cpp
src/server/core/dcitem.cpp
src/server/core/dcobject.cpp
src/server/core/dctable.cpp
src/server/include/nms_dcoll.h

index b778360..f223739 100644 (file)
@@ -26,7 +26,7 @@
  * Externals
  */
 void UpdateSnmpTarget(SNMPTarget *target);
-bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
+UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
 
 /**
  * Data collector start indicator
@@ -259,6 +259,7 @@ private:
    time_t m_timestamp;
    int m_origin;
    int m_type;
+   UINT32 m_statusCode;
    uuid m_snmpNode;
    union
    {
@@ -268,60 +269,64 @@ private:
    } m_value;
 
 public:
-   DataElement(DataCollectionItem *dci, const TCHAR *value)
+   DataElement(DataCollectionItem *dci, const TCHAR *value, UINT32 status)
    {
       m_serverId = dci->getServerId();
       m_dciId = dci->getId();
       m_timestamp = time(NULL);
       m_origin = dci->getOrigin();
       m_type = DCO_TYPE_ITEM;
+      m_statusCode = status;
       m_snmpNode = dci->getSnmpTargetGuid();
       m_value.item = _tcsdup(value);
    }
 
-   DataElement(DataCollectionItem *dci, StringList *value)
+   DataElement(DataCollectionItem *dci, StringList *value, UINT32 status)
    {
       m_serverId = dci->getServerId();
       m_dciId = dci->getId();
       m_timestamp = time(NULL);
       m_origin = dci->getOrigin();
       m_type = DCO_TYPE_LIST;
+      m_statusCode = status;
       m_snmpNode = dci->getSnmpTargetGuid();
       m_value.list = value;
    }
 
-   DataElement(DataCollectionItem *dci, Table *value)
+   DataElement(DataCollectionItem *dci, Table *value, UINT32 status)
    {
       m_serverId = dci->getServerId();
       m_dciId = dci->getId();
       m_timestamp = time(NULL);
       m_origin = dci->getOrigin();
       m_type = DCO_TYPE_TABLE;
+      m_statusCode = status;
       m_snmpNode = dci->getSnmpTargetGuid();
       m_value.table = value;
    }
 
    /**
     * Create data element from database record
-    * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
+    * Expected field order: server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value
     */
    DataElement(DB_RESULT hResult, int row)
    {
       m_serverId = DBGetFieldUInt64(hResult, row, 0);
       m_dciId = DBGetFieldULong(hResult, row, 1);
-      m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
+      m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 6);
       m_origin = DBGetFieldLong(hResult, row, 3);
       m_type = DBGetFieldLong(hResult, row, 2);
-      m_snmpNode = DBGetFieldGUID(hResult, row, 4);
+      m_statusCode = DBGetFieldLong(hResult, row, 4);
+      m_snmpNode = DBGetFieldGUID(hResult, row, 5);
       switch(m_type)
       {
          case DCO_TYPE_ITEM:
-            m_value.item = DBGetField(hResult, row, 6, NULL, 0);
+            m_value.item = DBGetField(hResult, row, 7, NULL, 0);
             break;
          case DCO_TYPE_LIST:
             {
                m_value.list = new StringList();
-               TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
+               TCHAR *text = DBGetField(hResult, row, 7, NULL, 0);
                if (text != NULL)
                {
                   m_value.list->splitAndAdd(text, _T("\n"));
@@ -331,7 +336,7 @@ public:
             break;
          case DCO_TYPE_TABLE:
             {
-               char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
+               char *xml = DBGetFieldUTF8(hResult, row, 7, NULL, 0);
                if (xml != NULL)
                {
                   m_value.table = Table::createFromXML(xml);
@@ -370,6 +375,7 @@ public:
    UINT64 getServerId() { return m_serverId; }
    UINT32 getDciId() { return m_dciId; }
    int getType() { return m_type; }
+   UINT32 getStatusCode() { return m_statusCode; }
 
    void saveToDatabase(DB_STATEMENT hStmt);
    bool sendToServer(bool reconcillation);
@@ -385,18 +391,19 @@ void DataElement::saveToDatabase(DB_STATEMENT hStmt)
    DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
    DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
    DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
-   DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
-   DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
+   DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_statusCode);
+   DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, m_snmpNode);
+   DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
    switch(m_type)
    {
       case DCO_TYPE_ITEM:
-         DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
+         DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
          break;
       case DCO_TYPE_LIST:
-         DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
+         DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
          break;
       case DCO_TYPE_TABLE:
-         DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
+         DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
          break;
    }
    DBExecute(hStmt);
@@ -430,6 +437,7 @@ bool DataElement::sendToServer(bool reconciliation)
    msg.setField(VID_DCI_ID, m_dciId);
    msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
    msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
+   msg.setField(VID_STATUS, m_statusCode);
    msg.setField(VID_NODE_ID, m_snmpNode);
    msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
    msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
@@ -465,6 +473,7 @@ void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
    msg->setField(baseId + 3, m_snmpNode);
    msg->setFieldFromTime(baseId + 4, m_timestamp);
    msg->setField(baseId + 5, m_value.item);
+   msg->setField(baseId + 6, m_statusCode);
 }
 
 /**
@@ -505,7 +514,7 @@ static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
       if (e == INVALID_POINTER_VALUE)
          break;
 
-      DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
+      DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?,?)"));
       if (hStmt == NULL)
       {
          delete e;
@@ -603,7 +612,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
       }
 
       TCHAR query[1024];
-      _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
+      _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
 
       TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
       DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
@@ -822,23 +831,24 @@ static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
    VirtualSession session(dci->getServerId());
 
    DataElement *e = NULL;
+   UINT32 status;
    if (dci->getType() == DCO_TYPE_ITEM)
    {
       TCHAR value[MAX_RESULT_LENGTH];
-      if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
-         e = new DataElement(dci, value);
+      status = GetParameterValue(INVALID_INDEX, dci->getName(), value, &session);
+      e = new DataElement(dci, (status == ERR_SUCCESS) ? value : _T(""), status);
    }
    else if (dci->getType() == DCO_TYPE_LIST)
    {
-      StringList *value = new StringList;
-      if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
-         e = new DataElement(dci, value);
+      StringList *value = new StringList();
+      status = GetListValue(INVALID_INDEX, dci->getName(), value, &session);
+      e = new DataElement(dci, value, status);
    }
    else if (dci->getType() == DCO_TYPE_TABLE)
    {
-      Table *value = new Table;
-      if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
-         e = new DataElement(dci, value);
+      Table *value = new Table();
+      status = GetTableValue(INVALID_INDEX, dci->getName(), value, &session);
+      e = new DataElement(dci, value, status);
    }
 
    return e;
@@ -855,8 +865,8 @@ static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
       DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
 
       TCHAR value[MAX_RESULT_LENGTH];
-      if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
-         e = new DataElement(dci, value);
+      UINT32 status = GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType());
+      e = new DataElement(dci, status == ERR_SUCCESS ? value : _T(""), status);
    }
    return e;
 }
index b32807a..106493b 100644 (file)
@@ -37,6 +37,18 @@ bool g_ignoreAgentDbErrors = FALSE;
  */
 static DB_HANDLE s_db = NULL;
 
+
+/**
+ * Upgrade from V2 to V3
+ */
+static BOOL H_UpgradeFromV3(int currVersion, int newVersion)
+{
+   CHK_EXEC(Query(_T("ALTER TABLE dc_queue ADD status_code integer")));
+
+   CHK_EXEC(WriteMetadata(_T("SchemaVersion"), 4));
+   return TRUE;
+}
+
 /**
  * Upgrade from V2 to V3
  */
@@ -269,6 +281,7 @@ static struct
 {
    { 1, 2, H_UpgradeFromV1 },
    { 2, 3, H_UpgradeFromV2 },
+   { 3, 4, H_UpgradeFromV3 },
    { 0, 0, NULL }
 };
 
index ae6c7d3..1ec8dc9 100644 (file)
@@ -151,7 +151,7 @@ void UpdateSnmpTarget(SNMPTarget *target)
 /**
  * Get value from SNMP node
  */
-bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue)
+UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue)
 {
    MutexLock(s_snmpTargetsLock);
    SNMPTarget *t = s_snmpTargets.get(target.getValue());
@@ -212,5 +212,6 @@ bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *valu
    }
 
    t->decRefCount();
-   return (rcc == SNMP_ERR_SUCCESS);
+   return (rcc == SNMP_ERR_SUCCESS) ? ERR_SUCCESS :
+      ((rcc == SNMP_ERR_NO_OBJECT) ? ERR_UNKNOWN_PARAMETER : ERR_INTERNAL_ERROR);
 }
index fd13dfd..41d2d12 100644 (file)
@@ -27,7 +27,7 @@
 /**
  * Database schema version
  */
-#define DB_SCHEMA_VERSION     3
+#define DB_SCHEMA_VERSION     4
 
 bool OpenLocalDatabase();
 void CloseLocalDatabase();
index 4b72660..68a7922 100644 (file)
@@ -536,6 +536,10 @@ UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
       return ERR_INTERNAL_ERROR;
    }
 
+   time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
+   UINT32 status = msg->getFieldAsUInt32(VID_STATUS);
+   bool success = true;
+
    void *value;
    switch(type)
    {
@@ -552,12 +556,31 @@ UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
          DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: invalid type %d of DCI %s [%d] on node %s [%d]"), type, dcObject->getName(), dciId, node->getName(), node->getId());
          return ERR_INTERNAL_ERROR;
    }
+   DbgPrintf(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on node %s [%d]"), dcObject->getName(), dciId, type, status, node->getName(), node->getId());
 
-   DbgPrintf(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) on node %s [%d]"), dcObject->getName(), dciId, type, node->getName(), node->getId());
-   time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
-   bool success = node->processNewDCValue(dcObject, t, value);
-   if (t > dcObject->getLastPollTime())
-      dcObject->setLastPollTime(t);
+   switch(status)
+   {
+      case ERR_SUCCESS:
+      {
+         if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+            dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
+         success = node->processNewDCValue(dcObject, t, value);
+         if (t > dcObject->getLastPollTime())
+            dcObject->setLastPollTime(t);
+         break;
+      }
+      case ERR_UNKNOWN_PARAMETER:
+         if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+            dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
+         dcObject->processNewError(false, t);
+         break;
+      case ERR_NO_SUCH_INSTANCE:
+         dcObject->processNewError(true, t);
+         break;
+      case ERR_INTERNAL_ERROR:
+         dcObject->processNewError(true, t);
+         break;
+   }
 
    switch(type)
    {
@@ -643,14 +666,37 @@ UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMes
       }
 
       void *value = request->getFieldAsString(fieldId + 5);
-
-      DbgPrintf(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) on node %s [%d] (element %d)"), dcObject->getName(), dciId, type, node->getName(), node->getId(), i);
+      UINT32 status_code = request->getFieldAsUInt32(fieldId + 6);
+      DbgPrintf(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on node %s [%d] (element %d)"), dcObject->getName(), dciId, type, status, node->getName(), node->getId(), i);
       time_t t = request->getFieldAsTime(fieldId + 4);
-      bool success = node->processNewDCValue(dcObject, t, value);
-      if (t > dcObject->getLastPollTime())
-         dcObject->setLastPollTime(t);
-      status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
+      bool success = true;
+
+      switch(status_code)
+      {
+         case ERR_SUCCESS:
+         {
+            if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+               dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
+            success = node->processNewDCValue(dcObject, t, value);
+            if (t > dcObject->getLastPollTime())
+               dcObject->setLastPollTime(t);
+            break;
+         }
+         case ERR_UNKNOWN_PARAMETER:
+            if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+               dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
+            dcObject->processNewError(false, t);
+            break;
+         case ERR_NO_SUCH_INSTANCE:
+            dcObject->processNewError(true, t);
+            break;
+         case ERR_INTERNAL_ERROR:
+            dcObject->processNewError(true, t);
+            break;
+      }
 
+
+      status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
       free(value);
    }
 
index 2553738..18a8cfc 100644 (file)
@@ -779,7 +779,7 @@ bool DCItem::processNewValue(time_t tmTimeStamp, const void *originalValue, bool
 /**
  * Process new data collection error
  */
-void DCItem::processNewError(bool noInstance)
+void DCItem::processNewError(bool noInstance, time_t now)
 {
    lock();
 
@@ -822,7 +822,6 @@ void DCItem::processNewError(bool noInstance)
          case ALREADY_ACTIVE:
             {
                                // Check if we need to re-sent threshold violation event
-               time_t now = time(NULL);
                UINT32 repeatInterval = (t->getRepeatInterval() == -1) ? g_thresholdRepeatInterval : (UINT32)t->getRepeatInterval();
                                   if ((repeatInterval != 0) && (t->getLastEventTimestamp() + (time_t)repeatInterval < now))
                                   {
index 57aaa9d..86ef0b4 100644 (file)
@@ -804,6 +804,15 @@ bool DCObject::processNewValue(time_t nTimeStamp, const void *value, bool *updat
  * Process new data collection error
  */
 void DCObject::processNewError(bool noInstance)
+{
+   time_t now = time(NULL);
+   processNewError(noInstance, now);
+}
+
+/**
+ * Process new data collection error
+ */
+void DCObject::processNewError(bool noInstance, time_t now)
 {
 }
 
index 214094e..fa7b755 100644 (file)
@@ -538,7 +538,7 @@ void DCTable::checkThresholds(Table *value)
 /**
  * Process new data collection error
  */
-void DCTable::processNewError(bool noInstance)
+void DCTable::processNewError(bool noInstance, time_t now)
 {
        m_dwErrorCount++;
 }
index 4e9e258..6e947b9 100644 (file)
@@ -240,7 +240,8 @@ public:
    virtual bool loadThresholdsFromDB(DB_HANDLE hdb);
 
    virtual bool processNewValue(time_t nTimeStamp, const void *value, bool *updateStatus);
-   virtual void processNewError(bool noInstance);
+   void processNewError(bool noInstance);
+   virtual void processNewError(bool noInstance, time_t now);
 
        virtual bool hasValue();
 
@@ -382,7 +383,7 @@ public:
        void expandInstance();
 
    virtual bool processNewValue(time_t nTimeStamp, const void *value, bool *updateStatus);
-   virtual void processNewError(bool noInstance);
+   virtual void processNewError(bool noInstance, time_t now);
 
        virtual bool hasValue();
 
@@ -577,7 +578,7 @@ public:
    virtual void deleteFromDatabase();
 
    virtual bool processNewValue(time_t nTimeStamp, const void *value, bool *updateStatus);
-   virtual void processNewError(bool noInstance);
+   virtual void processNewError(bool noInstance, time_t now);
 
    virtual bool hasValue();