e070ab8b8d48c76141fb0b2d46feae1a75c91159
[public/netxms.git] / src / server / core / agent.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: agent.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Externals
27 */
28 void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, UINT32 zoneId, int srcPort, SNMP_Transport *pTransport, SNMP_Engine *localEngine, bool isInformRq);
29
30 /**
31 * Destructor for extended agent connection class
32 */
33 AgentConnectionEx::~AgentConnectionEx()
34 {
35 }
36
37 /**
38 * Trap processor
39 */
40 void AgentConnectionEx::onTrap(NXCPMessage *pMsg)
41 {
42 UINT32 dwEventCode;
43 int i, iNumArgs;
44 Node *pNode = NULL;
45 TCHAR *pszArgList[32], szBuffer[32];
46 char szFormat[] = "ssssssssssssssssssssssssssssssss";
47
48 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Received trap message from agent at %s, node ID %d"), getIpAddr().toString(szBuffer), m_nodeId);
49 if (m_nodeId != 0)
50 pNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
51 if (pNode == NULL)
52 pNode = FindNodeByIP(0, getIpAddr().getAddressV4());
53 if (pNode != NULL)
54 {
55 if (pNode->getStatus() != STATUS_UNMANAGED)
56 {
57 // Check for duplicate traps - only accept traps with ID
58 // higher than last received
59 // agents prior to 1.1.6 will not send trap id
60 // we should accept trap in that case to maintain compatibility
61 bool acceptTrap;
62 QWORD trapId = pMsg->getFieldAsUInt64(VID_TRAP_ID);
63 if (trapId != 0)
64 {
65 acceptTrap = pNode->checkAgentTrapId(trapId);
66 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
67 }
68 else
69 {
70 acceptTrap = true;
71 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trap ID not provided"));
72 }
73
74 if (acceptTrap)
75 {
76 dwEventCode = pMsg->getFieldAsUInt32(VID_EVENT_CODE);
77 if ((dwEventCode == 0) && pMsg->isFieldExist(VID_EVENT_NAME))
78 {
79 TCHAR eventName[256];
80 pMsg->getFieldAsString(VID_EVENT_NAME, eventName, 256);
81 dwEventCode = EventCodeFromName(eventName, 0);
82 }
83 iNumArgs = (int)pMsg->getFieldAsUInt16(VID_NUM_ARGS);
84 if (iNumArgs > 32)
85 iNumArgs = 32;
86 for(i = 0; i < iNumArgs; i++)
87 pszArgList[i] = pMsg->getFieldAsString(VID_EVENT_ARG_BASE + i);
88 DbgPrintf(3, _T("Event from trap: %d"), dwEventCode);
89
90 szFormat[iNumArgs] = 0;
91 PostEvent(dwEventCode, pNode->getId(), (iNumArgs > 0) ? szFormat : NULL,
92 pszArgList[0], pszArgList[1], pszArgList[2], pszArgList[3],
93 pszArgList[4], pszArgList[5], pszArgList[6], pszArgList[7],
94 pszArgList[8], pszArgList[9], pszArgList[10], pszArgList[11],
95 pszArgList[12], pszArgList[13], pszArgList[14], pszArgList[15],
96 pszArgList[16], pszArgList[17], pszArgList[18], pszArgList[19],
97 pszArgList[20], pszArgList[21], pszArgList[22], pszArgList[23],
98 pszArgList[24], pszArgList[25], pszArgList[26], pszArgList[27],
99 pszArgList[28], pszArgList[29], pszArgList[30], pszArgList[31]);
100
101 // Cleanup
102 for(i = 0; i < iNumArgs; i++)
103 free(pszArgList[i]);
104 }
105 }
106 else
107 {
108 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): node %s [%d] in in UNMANAGED state - trap ignored"), pNode->getName(), pNode->getId());
109 }
110 }
111 else
112 {
113 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Cannot find node for IP address %s"), getIpAddr().toString(szBuffer));
114 }
115 }
116
117 /**
118 * Handler for data push
119 */
120 void AgentConnectionEx::onDataPush(NXCPMessage *msg)
121 {
122 if (g_flags & AF_SHUTDOWN)
123 return;
124
125 TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
126 msg->getFieldAsString(VID_NAME, name, MAX_PARAM_NAME);
127 msg->getFieldAsString(VID_VALUE, value, MAX_RESULT_LENGTH);
128
129 Node *sender = NULL;
130 if (m_nodeId != 0)
131 sender = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
132 if (sender == NULL)
133 sender = FindNodeByIP(0, getIpAddr());
134
135 if (sender != NULL)
136 {
137 // Check for duplicate data requests - only accept requests with ID
138 // higher than last received
139 // agents prior to 1.2.10 will not send request id
140 // we should accept data in that case to maintain compatibility
141 bool acceptRequest;
142 QWORD requestId = msg->getFieldAsUInt64(VID_REQUEST_ID);
143 if (requestId != 0)
144 {
145 acceptRequest = sender->checkAgentPushRequestId(requestId);
146 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): requestId is%s valid"), acceptRequest ? _T("") : _T(" not"));
147 }
148 else
149 {
150 acceptRequest = true;
151 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): request ID not provided"));
152 }
153
154 if (acceptRequest)
155 {
156 Node *target;
157 UINT32 objectId = msg->getFieldAsUInt32(VID_OBJECT_ID);
158 if (objectId != 0)
159 {
160 // push on behalf of other node
161 target = (Node *)FindObjectById(objectId, OBJECT_NODE);
162 if (target != NULL)
163 {
164 if (target->isTrustedNode(sender->getId()))
165 {
166 DbgPrintf(5, _T("%s: agent data push: target set to %s [%d]"), sender->getName(), target->getName(), target->getId());
167 }
168 else
169 {
170 DbgPrintf(5, _T("%s: agent data push: not in trusted node list for target %s [%d]"), sender->getName(), target->getName(), target->getId());
171 target = NULL;
172 }
173 }
174 }
175 else
176 {
177 target = sender;
178 }
179
180 if (target != NULL)
181 {
182 DbgPrintf(5, _T("%s: agent data push: %s=%s"), target->getName(), name, value);
183 DCObject *dci = target->getDCObjectByName(name);
184 if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM) && (dci->getDataSource() == DS_PUSH_AGENT) && (dci->getStatus() == ITEM_STATUS_ACTIVE))
185 {
186 DbgPrintf(5, _T("%s: agent data push: found DCI %d"), target->getName(), dci->getId());
187 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
188 if (t == 0)
189 t = time(NULL);
190 target->processNewDCValue(dci, t, value);
191 if (t > dci->getLastPollTime())
192 dci->setLastPollTime(t);
193 }
194 else
195 {
196 DbgPrintf(5, _T("%s: agent data push: DCI not found for %s"), target->getName(), name);
197 }
198 }
199 else
200 {
201 DbgPrintf(5, _T("%s: agent data push: target node not found or not accessible"), sender->getName());
202 }
203 }
204 }
205 }
206
207 /**
208 * Print message.
209 */
210 void AgentConnectionEx::printMsg(const TCHAR *format, ...)
211 {
212 va_list args;
213 va_start(args, format);
214 nxlog_debug2(6, format, args);
215 va_end(args);
216 }
217
218 /**
219 * Cancel unknown file monitoring
220 */
221 static void CancelUnknownFileMonitoring(Node *object,TCHAR *remoteFile)
222 {
223 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: unknown subscription will be canceled."));
224 AgentConnection *conn = object->createAgentConnection();
225 if(conn != NULL)
226 {
227 NXCPMessage request;
228 request.setId(conn->generateRequestId());
229 request.setCode(CMD_CANCEL_FILE_MONITORING);
230 request.setField(VID_FILE_NAME, remoteFile);
231 request.setField(VID_OBJECT_ID, object->getId());
232 NXCPMessage* response = conn->customRequest(&request);
233 delete response;
234 conn->decRefCount();
235 }
236 }
237
238 /**
239 * Recieve file monitoring information and resend to all required user sessions
240 */
241 void AgentConnectionEx::onFileMonitoringData(NXCPMessage *pMsg)
242 {
243 Node *object = NULL;
244 if (m_nodeId != 0)
245 object = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
246 if (object != NULL)
247 {
248 TCHAR remoteFile[MAX_PATH];
249 pMsg->getFieldAsString(VID_FILE_NAME, remoteFile, MAX_PATH);
250 ObjectArray<ClientSession>* result = g_monitoringList.findClientByFNameAndNodeID(remoteFile, object->getId());
251 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: found %d sessions for remote file %s on node %s [%d]"), result->size(), remoteFile, object->getName(), object->getId());
252 int validSessionCount = result->size();
253 for(int i = 0; i < result->size(); i++)
254 {
255 if(!result->get(i)->sendMessage(pMsg))
256 {
257 MONITORED_FILE file;
258 _tcsncpy(file.fileName, remoteFile, MAX_PATH);
259 file.nodeID = m_nodeId;
260 file.session = result->get(i);
261 g_monitoringList.removeMonitoringFile(&file);
262 validSessionCount--;
263
264 if (validSessionCount == 0)
265 CancelUnknownFileMonitoring(object, remoteFile);
266 }
267 }
268 if (result->size() == 0)
269 {
270 CancelUnknownFileMonitoring(object, remoteFile);
271 }
272 delete result;
273 }
274 else
275 {
276 g_monitoringList.removeDisconnectedNode(m_nodeId);
277 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: node object not found"));
278 }
279 }
280
281 /**
282 * Ask modules if they can procress custom message
283 */
284 bool AgentConnectionEx::processCustomMessage(NXCPMessage *msg)
285 {
286 TCHAR buffer[128];
287 DbgPrintf(6, _T("AgentConnectionEx::processCustomMessage: processing message %s ID %d"),
288 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
289
290 for(UINT32 i = 0; i < g_dwNumModules; i++)
291 {
292 if (g_pModuleList[i].pfOnAgentMessage != NULL)
293 {
294 if (g_pModuleList[i].pfOnAgentMessage(msg, m_nodeId))
295 return true; // accepted by module
296 }
297 }
298 return false;
299 }
300
301 /**
302 * Create SNMP proxy transport for sending trap response
303 */
304 static SNMP_ProxyTransport *CreateSNMPProxyTransport(AgentConnectionEx *conn, Node *originNode, const InetAddress& originAddr, UINT16 port)
305 {
306 conn->incRefCount();
307 SNMP_ProxyTransport *snmpTransport = new SNMP_ProxyTransport(conn, originAddr, port);
308 if (originNode != NULL)
309 {
310 snmpTransport->setSecurityContext(originNode->getSnmpSecurityContext());
311 }
312 return snmpTransport;
313 }
314
315 /**
316 * Recieve trap sent throught proxy agent
317 */
318 void AgentConnectionEx::onSnmpTrap(NXCPMessage *msg)
319 {
320 Node *proxyNode = NULL;
321 TCHAR ipStringBuffer[4096];
322
323 static BYTE engineId[] = { 0x80, 0x00, 0x00, 0x00, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01, 0x00 };
324 SNMP_Engine localEngine(engineId, 12);
325
326 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Received SNMP trap message from agent at %s, node ID %d"),
327 getIpAddr().toString(ipStringBuffer), m_nodeId);
328
329 if (m_nodeId != 0)
330 proxyNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
331 if (proxyNode != NULL)
332 {
333 // Check for duplicate traps - only accept traps with ID
334 // higher than last received
335 bool acceptTrap;
336 UINT32 trapId = msg->getId();
337 if (trapId != 0)
338 {
339 acceptTrap = proxyNode->checkSNMPTrapId(trapId);
340 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
341 }
342 else
343 {
344 acceptTrap = false;
345 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trap ID not provided"));
346 }
347
348 if (acceptTrap)
349 {
350 InetAddress originSenderIP = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
351 UINT32 pduLenght = msg->getFieldAsUInt32(VID_PDU_SIZE);
352 BYTE *pduBytes = (BYTE*)malloc(pduLenght);
353 msg->getFieldAsBinary(VID_PDU, pduBytes, pduLenght);
354 UINT32 zoneId = IsZoningEnabled() ? msg->getFieldAsUInt32(VID_ZONE_ID) : 0;
355 Node *originNode = FindNodeByIP(zoneId, originSenderIP);
356 if ((originNode != NULL) || ConfigReadInt(_T("LogAllSNMPTraps"), FALSE))
357 {
358 SNMP_PDU *pdu = new SNMP_PDU;
359 if (pdu->parse(pduBytes, pduLenght, (originNode != NULL) ? originNode->getSnmpSecurityContext() : NULL, true))
360 {
361 DbgPrintf(6, _T("SNMPTrapReceiver: received PDU of type %d"), pdu->getCommand());
362 if ((pdu->getCommand() == SNMP_TRAP) || (pdu->getCommand() == SNMP_INFORM_REQUEST))
363 {
364 bool isInformRequest = (pdu->getCommand() == SNMP_INFORM_REQUEST);
365 SNMP_ProxyTransport *snmpTransport = isInformRequest ? CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT)) : NULL;
366 if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_INFORM_REQUEST))
367 {
368 SNMP_SecurityContext *context = snmpTransport->getSecurityContext();
369 context->setAuthoritativeEngine(localEngine);
370 }
371 ProcessTrap(pdu, originSenderIP, zoneId, msg->getFieldAsUInt16(VID_PORT), snmpTransport, &localEngine, isInformRequest);
372 delete snmpTransport;
373 }
374 else if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_GET_REQUEST) && (pdu->getAuthoritativeEngine().getIdLen() == 0))
375 {
376 // Engine ID discovery
377 DbgPrintf(6, _T("SNMPTrapReceiver: EngineId discovery"));
378
379 SNMP_ProxyTransport *snmpTransport = CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT));
380
381 SNMP_PDU *response = new SNMP_PDU(SNMP_REPORT, pdu->getRequestId(), pdu->getVersion());
382 response->setReportable(false);
383 response->setMessageId(pdu->getMessageId());
384 response->setContextEngineId(localEngine.getId(), localEngine.getIdLen());
385
386 SNMP_Variable *var = new SNMP_Variable(_T(".1.3.6.1.6.3.15.1.1.4.0"));
387 var->setValueFromString(ASN_INTEGER, _T("2"));
388 response->bindVariable(var);
389
390 SNMP_SecurityContext *context = new SNMP_SecurityContext();
391 localEngine.setTime((int)time(NULL));
392 context->setAuthoritativeEngine(localEngine);
393 context->setSecurityModel(SNMP_SECURITY_MODEL_USM);
394 context->setAuthMethod(SNMP_AUTH_NONE);
395 context->setPrivMethod(SNMP_ENCRYPT_NONE);
396 snmpTransport->setSecurityContext(context);
397
398 snmpTransport->setWaitForResponse(false);
399 snmpTransport->sendMessage(response);
400 delete response;
401 delete snmpTransport;
402 }
403 else if (pdu->getCommand() == SNMP_REPORT)
404 {
405 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
406 }
407 delete pdu;
408 }
409 else if (pdu->getCommand() == SNMP_REPORT)
410 {
411 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
412 }
413 }
414 else
415 {
416 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): cannot find origin node with IP %s and not accepting traps from unknown sources"), originSenderIP.toString(ipStringBuffer));
417 }
418 }
419 }
420 else
421 {
422 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Cannot find node for IP address %s"), getIpAddr().toString(ipStringBuffer));
423 }
424 }
425
426 /**
427 * Deploy policy to agent
428 */
429 UINT32 AgentConnectionEx::deployPolicy(AgentPolicy *policy)
430 {
431 UINT32 rqId, rcc;
432 NXCPMessage msg(getProtocolVersion());
433
434 rqId = generateRequestId();
435 msg.setId(rqId);
436 msg.setCode(CMD_DEPLOY_AGENT_POLICY);
437 if (policy->createDeploymentMessage(&msg))
438 {
439 if (sendMessage(&msg))
440 {
441 rcc = waitForRCC(rqId, getCommandTimeout());
442 }
443 else
444 {
445 rcc = ERR_CONNECTION_BROKEN;
446 }
447 }
448 else
449 {
450 rcc = ERR_INTERNAL_ERROR;
451 }
452 return rcc;
453 }
454
455 /**
456 * Uninstall policy from agent
457 */
458 UINT32 AgentConnectionEx::uninstallPolicy(AgentPolicy *policy)
459 {
460 UINT32 rqId, rcc;
461 NXCPMessage msg(getProtocolVersion());
462
463 rqId = generateRequestId();
464 msg.setId(rqId);
465 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
466 if (policy->createUninstallMessage(&msg))
467 {
468 if (sendMessage(&msg))
469 {
470 rcc = waitForRCC(rqId, getCommandTimeout());
471 }
472 else
473 {
474 rcc = ERR_CONNECTION_BROKEN;
475 }
476 }
477 else
478 {
479 rcc = ERR_INTERNAL_ERROR;
480 }
481 return rcc;
482 }
483
484 /**
485 * Process collected data information (for DCI with agent-side cache)
486 */
487 UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
488 {
489 if (g_flags & AF_SHUTDOWN)
490 return ERR_INTERNAL_ERROR;
491
492 if (m_nodeId == 0)
493 {
494 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: node ID is 0 for agent session"));
495 return ERR_INTERNAL_ERROR;
496 }
497
498 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
499 if (node == NULL)
500 {
501 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
502 return ERR_INTERNAL_ERROR;
503 }
504
505 int origin = msg->getFieldAsInt16(VID_DCI_SOURCE_TYPE);
506 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
507 {
508 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: unsupported data source type %d"), origin);
509 return ERR_INTERNAL_ERROR;
510 }
511
512 DataCollectionTarget *target;
513 uuid targetId = msg->getFieldAsGUID(VID_NODE_ID);
514 if (!targetId.isNull())
515 {
516 NetObj *object = FindObjectByGUID(targetId, -1);
517 if (object == NULL)
518 {
519 TCHAR buffer[64];
520 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: cannot find target node with GUID %s"), targetId.toString(buffer));
521 return ERR_INTERNAL_ERROR;
522 }
523 if (!object->isDataCollectionTarget())
524 {
525 TCHAR buffer[64];
526 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: object with GUID %s is not a data collection target"), targetId.toString(buffer));
527 return ERR_INTERNAL_ERROR;
528 }
529 target = (DataCollectionTarget *)object;
530 }
531 else
532 {
533 target = node;
534 }
535
536 UINT32 dciId = msg->getFieldAsUInt32(VID_DCI_ID);
537 DCObject *dcObject = target->getDCObjectById(dciId);
538 if (dcObject == NULL)
539 {
540 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: cannot find DCI with ID %d on object %s [%d]"),
541 dciId, target->getName(), target->getId());
542 return ERR_INTERNAL_ERROR;
543 }
544
545 int type = msg->getFieldAsInt16(VID_DCOBJECT_TYPE);
546 if ((dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
547 {
548 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch"),
549 dcObject->getName(), dciId, target->getName(), target->getId());
550 return ERR_INTERNAL_ERROR;
551 }
552
553 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
554 UINT32 status = msg->getFieldAsUInt32(VID_STATUS);
555 bool success = true;
556
557 void *value;
558 switch(type)
559 {
560 case DCO_TYPE_ITEM:
561 value = msg->getFieldAsString(VID_VALUE);
562 break;
563 case DCO_TYPE_LIST:
564 value = new StringList();
565 break;
566 case DCO_TYPE_TABLE:
567 value = new Table(msg);
568 break;
569 default:
570 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: invalid type %d of DCI %s [%d] on object %s [%d]"),
571 type, dcObject->getName(), dciId, target->getName(), target->getId());
572 return ERR_INTERNAL_ERROR;
573 }
574 nxlog_debug(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d]"),
575 dcObject->getName(), dciId, type, status, target->getName(), target->getId());
576
577 switch(status)
578 {
579 case ERR_SUCCESS:
580 {
581 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
582 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
583 success = target->processNewDCValue(dcObject, t, value);
584 if (t > dcObject->getLastPollTime())
585 dcObject->setLastPollTime(t);
586 break;
587 }
588 case ERR_UNKNOWN_PARAMETER:
589 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
590 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
591 dcObject->processNewError(false, t);
592 break;
593 case ERR_NO_SUCH_INSTANCE:
594 dcObject->processNewError(true, t);
595 break;
596 case ERR_INTERNAL_ERROR:
597 dcObject->processNewError(true, t);
598 break;
599 }
600
601 switch(type)
602 {
603 case DCO_TYPE_ITEM:
604 free(value);
605 break;
606 case DCO_TYPE_LIST:
607 delete (StringList *)value;
608 break;
609 }
610
611 return success ? ERR_SUCCESS : ERR_INTERNAL_ERROR;
612 }
613
614 /**
615 * Process collected data information in bulk mode (for DCI with agent-side cache)
616 */
617 UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
618 {
619 if (g_flags & AF_SHUTDOWN)
620 return ERR_INTERNAL_ERROR;
621
622 if (m_nodeId == 0)
623 {
624 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: node ID is 0 for agent session"));
625 return ERR_INTERNAL_ERROR;
626 }
627
628 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
629 if (node == NULL)
630 {
631 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
632 return ERR_INTERNAL_ERROR;
633 }
634
635 int count = request->getFieldAsInt16(VID_NUM_ELEMENTS);
636 if (count > MAX_BULK_DATA_BLOCK_SIZE)
637 count = MAX_BULK_DATA_BLOCK_SIZE;
638 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: %d elements from node %s [%d]"), count, node->getName(), node->getId());
639
640 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
641 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
642 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
643 for(int i = 0; i < count; i++, fieldId += 10)
644 {
645 int origin = request->getFieldAsInt16(fieldId + 1);
646 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
647 {
648 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: unsupported data source type %d (element %d)"), origin, i);
649 status[i] = BULK_DATA_REC_FAILURE;
650 continue;
651 }
652
653 DataCollectionTarget *target;
654 uuid targetId = request->getFieldAsGUID(fieldId + 3);
655 if (!targetId.isNull())
656 {
657 NetObj *object = FindObjectByGUID(targetId, -1);
658 if (object == NULL)
659 {
660 TCHAR buffer[64];
661 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find target object with GUID %s (element %d)"),
662 targetId.toString(buffer), i);
663 status[i] = BULK_DATA_REC_FAILURE;
664 continue;
665 }
666 if (!object->isDataCollectionTarget())
667 {
668 TCHAR buffer[64];
669 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: object with GUID %s (element %d) is not a data collection target"),
670 targetId.toString(buffer), i);
671 status[i] = BULK_DATA_REC_FAILURE;
672 continue;
673 }
674 target = (DataCollectionTarget *)object;
675 }
676 else
677 {
678 target = node;
679 }
680
681 UINT32 dciId = request->getFieldAsUInt32(fieldId);
682 DCObject *dcObject = target->getDCObjectById(dciId);
683 if (dcObject == NULL)
684 {
685 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find DCI with ID %d on object %s [%d] (element %d)"),
686 dciId, target->getName(), target->getId(), i);
687 status[i] = BULK_DATA_REC_FAILURE;
688 continue;
689 }
690
691 int type = request->getFieldAsInt16(fieldId + 2);
692 if ((type != DCO_TYPE_ITEM) || (dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
693 {
694 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch (element %d)"),
695 dcObject->getName(), dciId, target->getName(), target->getId(), i);
696 status[i] = BULK_DATA_REC_FAILURE;
697 continue;
698 }
699
700 void *value = request->getFieldAsString(fieldId + 5);
701 UINT32 status_code = request->getFieldAsUInt32(fieldId + 6);
702 nxlog_debug(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d] (element %d)"),
703 dcObject->getName(), dciId, type, status, target->getName(), target->getId(), i);
704 time_t t = request->getFieldAsTime(fieldId + 4);
705 bool success = true;
706
707 switch(status_code)
708 {
709 case ERR_SUCCESS:
710 {
711 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
712 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
713 success = target->processNewDCValue(dcObject, t, value);
714 if (t > dcObject->getLastPollTime())
715 dcObject->setLastPollTime(t);
716 break;
717 }
718 case ERR_UNKNOWN_PARAMETER:
719 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
720 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
721 dcObject->processNewError(false, t);
722 break;
723 case ERR_NO_SUCH_INSTANCE:
724 dcObject->processNewError(true, t);
725 break;
726 case ERR_INTERNAL_ERROR:
727 dcObject->processNewError(true, t);
728 break;
729 }
730
731
732 status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
733 free(value);
734 }
735
736 response->setField(VID_STATUS, status, count);
737 return ERR_SUCCESS;
738 }