agent data reconciliation block size and timeout can be configured
[public/netxms.git] / src / server / core / agent.cpp
CommitLineData
9fa031cd 1/*
5039dede 2** NetXMS - Network Management System
2df047f4 3** Copyright (C) 2003-2016 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: agent.cpp
20**
21**/
22
23#include "nxcore.h"
24
489b117b 25/**
26 * Externals
27 */
c75e9ee4 28void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, int srcPort, SNMP_Transport *pTransport, SNMP_Engine *localEngine, bool isInformRq);
489b117b 29
d1c1c522
VK
30/**
31 * Destructor for extended agent connection class
32 */
5039dede
AK
33AgentConnectionEx::~AgentConnectionEx()
34{
35}
36
d1c1c522
VK
37/**
38 * Trap processor
39 */
b368969c 40void AgentConnectionEx::onTrap(NXCPMessage *pMsg)
5039dede 41{
967893bb 42 UINT32 dwEventCode;
5039dede 43 int i, iNumArgs;
d1c1c522 44 Node *pNode = NULL;
5039dede 45 TCHAR *pszArgList[32], szBuffer[32];
35f836fe 46 char szFormat[] = "ssssssssssssssssssssssssssssssss";
5039dede 47
c11eee9b 48 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Received trap message from agent at %s, node ID %d"), getIpAddr().toString(szBuffer), m_nodeId);
d1c1c522
VK
49 if (m_nodeId != 0)
50 pNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
51 if (pNode == NULL)
c11eee9b 52 pNode = FindNodeByIP(0, getIpAddr().getAddressV4());
5039dede
AK
53 if (pNode != NULL)
54 {
5c5c7111
VK
55 if (pNode->Status() != STATUS_UNMANAGED)
56 {
57 // Check for duplicate traps - only accept traps with ID
58 // higher than last received
59 // agents prior to 1.1.6 will not send trap id
60 // we should accept trap in that case to maintain compatibility
61 bool acceptTrap;
b368969c 62 QWORD trapId = pMsg->getFieldAsUInt64(VID_TRAP_ID);
5c5c7111
VK
63 if (trapId != 0)
64 {
65 acceptTrap = pNode->checkAgentTrapId(trapId);
66 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
67 }
68 else
69 {
70 acceptTrap = true;
71 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trap ID not provided"));
72 }
fed33789 73
5c5c7111
VK
74 if (acceptTrap)
75 {
b368969c 76 dwEventCode = pMsg->getFieldAsUInt32(VID_EVENT_CODE);
5c5c7111
VK
77 if ((dwEventCode == 0) && pMsg->isFieldExist(VID_EVENT_NAME))
78 {
79 TCHAR eventName[256];
b368969c 80 pMsg->getFieldAsString(VID_EVENT_NAME, eventName, 256);
5c5c7111
VK
81 dwEventCode = EventCodeFromName(eventName, 0);
82 }
b368969c 83 iNumArgs = (int)pMsg->getFieldAsUInt16(VID_NUM_ARGS);
5c5c7111
VK
84 if (iNumArgs > 32)
85 iNumArgs = 32;
86 for(i = 0; i < iNumArgs; i++)
b368969c 87 pszArgList[i] = pMsg->getFieldAsString(VID_EVENT_ARG_BASE + i);
5c5c7111 88 DbgPrintf(3, _T("Event from trap: %d"), dwEventCode);
5039dede 89
5c5c7111 90 szFormat[iNumArgs] = 0;
c42b4551 91 PostEvent(dwEventCode, pNode->getId(), (iNumArgs > 0) ? szFormat : NULL,
5c5c7111
VK
92 pszArgList[0], pszArgList[1], pszArgList[2], pszArgList[3],
93 pszArgList[4], pszArgList[5], pszArgList[6], pszArgList[7],
94 pszArgList[8], pszArgList[9], pszArgList[10], pszArgList[11],
95 pszArgList[12], pszArgList[13], pszArgList[14], pszArgList[15],
96 pszArgList[16], pszArgList[17], pszArgList[18], pszArgList[19],
97 pszArgList[20], pszArgList[21], pszArgList[22], pszArgList[23],
98 pszArgList[24], pszArgList[25], pszArgList[26], pszArgList[27],
99 pszArgList[28], pszArgList[29], pszArgList[30], pszArgList[31]);
9fa031cd 100
5c5c7111
VK
101 // Cleanup
102 for(i = 0; i < iNumArgs; i++)
103 free(pszArgList[i]);
104 }
105 }
106 else
107 {
c42b4551 108 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): node %s [%d] in in UNMANAGED state - trap ignored"), pNode->getName(), pNode->getId());
5c5c7111 109 }
5039dede
AK
110 }
111 else
112 {
c11eee9b 113 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Cannot find node for IP address %s"), getIpAddr().toString(szBuffer));
5039dede
AK
114 }
115}
45d84f8a 116
d1c1c522
VK
117/**
118 * Handler for data push
119 */
b368969c 120void AgentConnectionEx::onDataPush(NXCPMessage *msg)
f480bdd4 121{
1693f955
VK
122 if (g_flags & AF_SHUTDOWN)
123 return;
f480bdd4 124
1693f955 125 TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
b368969c
VK
126 msg->getFieldAsString(VID_NAME, name, MAX_PARAM_NAME);
127 msg->getFieldAsString(VID_VALUE, value, MAX_RESULT_LENGTH);
f480bdd4 128
e3ee1478 129 Node *sender = NULL;
42a3be4f
VK
130 if (m_nodeId != 0)
131 sender = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
132 if (sender == NULL)
6fbaa926 133 sender = FindNodeByIP(0, getIpAddr());
42a3be4f 134
4e46505f 135 if (sender != NULL)
f480bdd4 136 {
42a3be4f
VK
137 // Check for duplicate data requests - only accept requests with ID
138 // higher than last received
139 // agents prior to 1.2.10 will not send request id
140 // we should accept data in that case to maintain compatibility
141 bool acceptRequest;
b368969c 142 QWORD requestId = msg->getFieldAsUInt64(VID_REQUEST_ID);
42a3be4f
VK
143 if (requestId != 0)
144 {
145 acceptRequest = sender->checkAgentPushRequestId(requestId);
146 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): requestId is%s valid"), acceptRequest ? _T("") : _T(" not"));
147 }
148 else
149 {
150 acceptRequest = true;
151 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): request ID not provided"));
152 }
153
154 if (acceptRequest)
155 {
156 Node *target;
b368969c 157 UINT32 objectId = msg->getFieldAsUInt32(VID_OBJECT_ID);
42a3be4f 158 if (objectId != 0)
4e46505f 159 {
42a3be4f
VK
160 // push on behalf of other node
161 target = (Node *)FindObjectById(objectId, OBJECT_NODE);
162 if (target != NULL)
4e46505f 163 {
c42b4551 164 if (target->isTrustedNode(sender->getId()))
42a3be4f 165 {
c42b4551 166 DbgPrintf(5, _T("%s: agent data push: target set to %s [%d]"), sender->getName(), target->getName(), target->getId());
42a3be4f
VK
167 }
168 else
169 {
c42b4551 170 DbgPrintf(5, _T("%s: agent data push: not in trusted node list for target %s [%d]"), sender->getName(), target->getName(), target->getId());
42a3be4f
VK
171 target = NULL;
172 }
4e46505f
VK
173 }
174 }
42a3be4f
VK
175 else
176 {
177 target = sender;
178 }
4e46505f 179
42a3be4f
VK
180 if (target != NULL)
181 {
c42b4551 182 DbgPrintf(5, _T("%s: agent data push: %s=%s"), target->getName(), name, value);
42a3be4f
VK
183 DCObject *dci = target->getDCObjectByName(name);
184 if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM) && (dci->getDataSource() == DS_PUSH_AGENT) && (dci->getStatus() == ITEM_STATUS_ACTIVE))
185 {
c42b4551 186 DbgPrintf(5, _T("%s: agent data push: found DCI %d"), target->getName(), dci->getId());
b60584cf
VK
187 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
188 if (t == 0)
189 t = time(NULL);
42a3be4f 190 target->processNewDCValue(dci, t, value);
b60584cf
VK
191 if (t > dci->getLastPollTime())
192 dci->setLastPollTime(t);
42a3be4f
VK
193 }
194 else
195 {
c42b4551 196 DbgPrintf(5, _T("%s: agent data push: DCI not found for %s"), target->getName(), name);
42a3be4f
VK
197 }
198 }
199 else
200 {
c42b4551 201 DbgPrintf(5, _T("%s: agent data push: target node not found or not accessible"), sender->getName());
42a3be4f 202 }
4e46505f 203 }
f480bdd4
VK
204 }
205}
206
af21affe
VK
207/**
208 * Print message.
209 */
210void AgentConnectionEx::printMsg(const TCHAR *format, ...)
211{
212 va_list args;
af21affe 213 va_start(args, format);
2df047f4 214 nxlog_debug2(6, format, args);
af21affe
VK
215 va_end(args);
216}
217
1693f955
VK
218/**
219 * Cancel unknown file monitoring
220 */
221static void CancelUnknownFileMonitoring(Node *object,TCHAR *remoteFile)
a07511b5 222{
223 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: unknown subscription will be canceled."));
224 AgentConnection *conn = object->createAgentConnection();
225 if(conn != NULL)
226 {
227 NXCPMessage request;
228 request.setId(conn->generateRequestId());
229 request.setCode(CMD_CANCEL_FILE_MONITORING);
230 request.setField(VID_FILE_NAME, remoteFile);
231 request.setField(VID_OBJECT_ID, object->getId());
232 NXCPMessage* response = conn->customRequest(&request);
233 delete response;
1693f955 234 conn->decRefCount();
a07511b5 235 }
a07511b5 236}
237
76b4edb5 238/**
9fa031cd 239 * Recieve file monitoring information and resend to all required user sessions
240 */
b368969c 241void AgentConnectionEx::onFileMonitoringData(NXCPMessage *pMsg)
9fa031cd 242{
489b117b 243 Node *object = NULL;
9fa031cd 244 if (m_nodeId != 0)
245 object = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
246 if (object != NULL)
247 {
18b237f7 248 TCHAR remoteFile[MAX_PATH];
b368969c 249 pMsg->getFieldAsString(VID_FILE_NAME, remoteFile, MAX_PATH);
c42b4551
VK
250 ObjectArray<ClientSession>* result = g_monitoringList.findClientByFNameAndNodeID(remoteFile, object->getId());
251 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: found %d sessions for remote file %s on node %s [%d]"), result->size(), remoteFile, object->getName(), object->getId());
a07511b5 252 int validSessionCount = result->size();
9fa031cd 253 for(int i = 0; i < result->size(); i++)
254 {
a07511b5 255 if(!result->get(i)->sendMessage(pMsg))
256 {
257 MONITORED_FILE file;
258 _tcsncpy(file.fileName, remoteFile, MAX_PATH);
259 file.nodeID = m_nodeId;
260 file.session = result->get(i);
261 g_monitoringList.removeMonitoringFile(&file);
262 validSessionCount--;
263
1693f955
VK
264 if (validSessionCount == 0)
265 CancelUnknownFileMonitoring(object, remoteFile);
a07511b5 266 }
05e3ba32 267 }
1693f955 268 if (result->size() == 0)
05e3ba32 269 {
1693f955 270 CancelUnknownFileMonitoring(object, remoteFile);
9fa031cd 271 }
ece85e30 272 delete result;
9fa031cd 273 }
274 else
275 {
f9f79a93 276 g_monitoringList.removeDisconnectedNode(m_nodeId);
18b237f7 277 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: node object not found"));
9fa031cd 278 }
279}
280
5bbb1485 281/**
282 * Ask modules if they can procress custom message
283 */
284bool AgentConnectionEx::processCustomMessage(NXCPMessage *msg)
285{
ac1a1de4 286 TCHAR buffer[128];
d7af297b 287 DbgPrintf(6, _T("AgentConnectionEx::processCustomMessage: processing message %s ID %d"),
ac1a1de4 288 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
5bbb1485 289
ac1a1de4
VK
290 for(UINT32 i = 0; i < g_dwNumModules; i++)
291 {
292 if (g_pModuleList[i].pfOnAgentMessage != NULL)
293 {
294 if (g_pModuleList[i].pfOnAgentMessage(msg, m_nodeId))
295 return true; // accepted by module
296 }
297 }
5bbb1485 298 return false;
299}
300
cce2f2ef
VK
301/**
302 * Create SNMP proxy transport for sending trap response
303 */
304static SNMP_ProxyTransport *CreateSNMPProxyTransport(AgentConnectionEx *conn, Node *originNode, const InetAddress& originAddr, UINT16 port)
305{
306 conn->incRefCount();
307 SNMP_ProxyTransport *snmpTransport = new SNMP_ProxyTransport(conn, originAddr, port);
308 if (originNode != NULL)
309 {
310 snmpTransport->setSecurityContext(originNode->getSnmpSecurityContext());
311 }
312 return snmpTransport;
313}
314
489b117b 315/**
316 * Recieve trap sent throught proxy agent
317 */
b368969c 318void AgentConnectionEx::onSnmpTrap(NXCPMessage *msg)
489b117b 319{
320 Node *proxyNode = NULL;
321 TCHAR ipStringBuffer[4096];
322
323 static BYTE engineId[] = { 0x80, 0x00, 0x00, 0x00, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01, 0x00 };
324 SNMP_Engine localEngine(engineId, 12);
325
d7af297b 326 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Received SNMP trap message from agent at %s, node ID %d"),
ac1a1de4
VK
327 getIpAddr().toString(ipStringBuffer), m_nodeId);
328
489b117b 329 if (m_nodeId != 0)
330 proxyNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
331 if (proxyNode != NULL)
332 {
333 // Check for duplicate traps - only accept traps with ID
334 // higher than last received
335 bool acceptTrap;
b368969c 336 UINT32 trapId = msg->getId();
489b117b 337 if (trapId != 0)
338 {
339 acceptTrap = proxyNode->checkSNMPTrapId(trapId);
340 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
341 }
342 else
343 {
344 acceptTrap = false;
345 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trap ID not provided"));
346 }
347
348 if (acceptTrap)
349 {
c75e9ee4 350 InetAddress originSenderIP = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
b368969c 351 UINT32 pduLenght = msg->getFieldAsUInt32(VID_PDU_SIZE);
489b117b 352 BYTE *pduBytes = (BYTE*)malloc(pduLenght);
b368969c 353 msg->getFieldAsBinary(VID_PDU, pduBytes, pduLenght);
cce2f2ef
VK
354 Node *originNode = FindNodeByIP(0, originSenderIP);
355 if ((originNode != NULL) || ConfigReadInt(_T("LogAllSNMPTraps"), FALSE))
489b117b 356 {
489b117b 357 SNMP_PDU *pdu = new SNMP_PDU;
aef85265 358 if (pdu->parse(pduBytes, pduLenght, (originNode != NULL) ? originNode->getSnmpSecurityContext() : NULL, true))
489b117b 359 {
360 DbgPrintf(6, _T("SNMPTrapReceiver: received PDU of type %d"), pdu->getCommand());
361 if ((pdu->getCommand() == SNMP_TRAP) || (pdu->getCommand() == SNMP_INFORM_REQUEST))
362 {
cce2f2ef
VK
363 bool isInformRequest = (pdu->getCommand() == SNMP_INFORM_REQUEST);
364 SNMP_ProxyTransport *snmpTransport = isInformRequest ? CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT)) : NULL;
489b117b 365 if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_INFORM_REQUEST))
366 {
cce2f2ef 367 SNMP_SecurityContext *context = snmpTransport->getSecurityContext();
489b117b 368 context->setAuthoritativeEngine(localEngine);
369 }
cce2f2ef
VK
370 ProcessTrap(pdu, originSenderIP, msg->getFieldAsUInt16(VID_PORT), snmpTransport, &localEngine, isInformRequest);
371 delete snmpTransport;
489b117b 372 }
373 else if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_GET_REQUEST) && (pdu->getAuthoritativeEngine().getIdLen() == 0))
374 {
375 // Engine ID discovery
376 DbgPrintf(6, _T("SNMPTrapReceiver: EngineId discovery"));
377
cce2f2ef
VK
378 SNMP_ProxyTransport *snmpTransport = CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT));
379
489b117b 380 SNMP_PDU *response = new SNMP_PDU(SNMP_REPORT, pdu->getRequestId(), pdu->getVersion());
381 response->setReportable(false);
382 response->setMessageId(pdu->getMessageId());
383 response->setContextEngineId(localEngine.getId(), localEngine.getIdLen());
384
385 SNMP_Variable *var = new SNMP_Variable(_T(".1.3.6.1.6.3.15.1.1.4.0"));
386 var->setValueFromString(ASN_INTEGER, _T("2"));
387 response->bindVariable(var);
388
389 SNMP_SecurityContext *context = new SNMP_SecurityContext();
390 localEngine.setTime((int)time(NULL));
391 context->setAuthoritativeEngine(localEngine);
392 context->setSecurityModel(SNMP_SECURITY_MODEL_USM);
393 context->setAuthMethod(SNMP_AUTH_NONE);
394 context->setPrivMethod(SNMP_ENCRYPT_NONE);
cce2f2ef 395 snmpTransport->setSecurityContext(context);
489b117b 396
cce2f2ef
VK
397 snmpTransport->setWaitForResponse(false);
398 snmpTransport->sendMessage(response);
489b117b 399 delete response;
cce2f2ef 400 delete snmpTransport;
489b117b 401 }
402 else if (pdu->getCommand() == SNMP_REPORT)
403 {
9ceab287 404 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
489b117b 405 }
406 delete pdu;
407 }
408 else if (pdu->getCommand() == SNMP_REPORT)
409 {
9ceab287 410 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
489b117b 411 }
412 }
413 else
414 {
c75e9ee4 415 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): cannot find origin node with IP %s and not accepting traps from unknown sources"), originSenderIP.toString(ipStringBuffer));
489b117b 416 }
417 }
418 }
419 else
420 {
c11eee9b 421 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Cannot find node for IP address %s"), getIpAddr().toString(ipStringBuffer));
489b117b 422 }
423}
424
d1c1c522
VK
425/**
426 * Deploy policy to agent
427 */
967893bb 428UINT32 AgentConnectionEx::deployPolicy(AgentPolicy *policy)
45d84f8a 429{
967893bb 430 UINT32 rqId, rcc;
b368969c 431 NXCPMessage msg(getProtocolVersion());
45d84f8a
VK
432
433 rqId = generateRequestId();
b368969c
VK
434 msg.setId(rqId);
435 msg.setCode(CMD_DEPLOY_AGENT_POLICY);
8051849b
VK
436 if (policy->createDeploymentMessage(&msg))
437 {
7c521895 438 if (sendMessage(&msg))
8051849b 439 {
7c521895 440 rcc = waitForRCC(rqId, getCommandTimeout());
8051849b
VK
441 }
442 else
443 {
444 rcc = ERR_CONNECTION_BROKEN;
445 }
446 }
447 else
448 {
449 rcc = ERR_INTERNAL_ERROR;
450 }
45d84f8a
VK
451 return rcc;
452}
93599cfd 453
d1c1c522
VK
454/**
455 * Uninstall policy from agent
456 */
967893bb 457UINT32 AgentConnectionEx::uninstallPolicy(AgentPolicy *policy)
93599cfd 458{
967893bb 459 UINT32 rqId, rcc;
b368969c 460 NXCPMessage msg(getProtocolVersion());
93599cfd
VK
461
462 rqId = generateRequestId();
b368969c
VK
463 msg.setId(rqId);
464 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
93599cfd
VK
465 if (policy->createUninstallMessage(&msg))
466 {
7c521895 467 if (sendMessage(&msg))
93599cfd 468 {
7c521895 469 rcc = waitForRCC(rqId, getCommandTimeout());
93599cfd
VK
470 }
471 else
472 {
473 rcc = ERR_CONNECTION_BROKEN;
474 }
475 }
476 else
477 {
478 rcc = ERR_INTERNAL_ERROR;
479 }
480 return rcc;
481}
6fbaa926
VK
482
483/**
484 * Process collected data information (for DCI with agent-side cache)
485 */
02d936bd 486UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
6fbaa926 487{
1693f955
VK
488 if (g_flags & AF_SHUTDOWN)
489 return ERR_INTERNAL_ERROR;
490
02d936bd
VK
491 if (m_nodeId == 0)
492 {
493 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: node ID is 0 for agent session"));
494 return ERR_INTERNAL_ERROR;
495 }
496
497 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
498 if (node == NULL)
499 {
500 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
501 return ERR_INTERNAL_ERROR;
502 }
503
504 int origin = msg->getFieldAsInt16(VID_DCI_SOURCE_TYPE);
505 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
506 {
507 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: unsupported data source type %d"), origin);
508 return ERR_INTERNAL_ERROR;
509 }
510
86ef6701
VK
511 uuid nodeId = msg->getFieldAsGUID(VID_NODE_ID);
512 if (!nodeId.isNull())
02d936bd 513 {
86ef6701
VK
514 Node *targetNode = (Node *)FindObjectByGUID(nodeId, OBJECT_NODE);
515 if (targetNode == NULL)
02d936bd
VK
516 {
517 TCHAR buffer[64];
86ef6701 518 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find target node with GUID %s"), nodeId.toString(buffer));
02d936bd
VK
519 return ERR_INTERNAL_ERROR;
520 }
86ef6701 521 node = targetNode;
02d936bd
VK
522 }
523
524 UINT32 dciId = msg->getFieldAsUInt32(VID_DCI_ID);
525 DCObject *dcObject = node->getDCObjectById(dciId);
526 if (dcObject == NULL)
527 {
528 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find DCI with ID %d on node %s [%d]"), dciId, node->getName(), node->getId());
529 return ERR_INTERNAL_ERROR;
530 }
531
532 int type = msg->getFieldAsInt16(VID_DCOBJECT_TYPE);
533 if ((dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
534 {
535 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: DCI %s [%d] on node %s [%d] configuration mismatch"), dcObject->getName(), dciId, node->getName(), node->getId());
536 return ERR_INTERNAL_ERROR;
537 }
538
df94243f 539 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
540 UINT32 status = msg->getFieldAsUInt32(VID_STATUS);
541 bool success = true;
542
02d936bd
VK
543 void *value;
544 switch(type)
545 {
546 case DCO_TYPE_ITEM:
547 value = msg->getFieldAsString(VID_VALUE);
548 break;
549 case DCO_TYPE_LIST:
550 value = new StringList();
551 break;
552 case DCO_TYPE_TABLE:
553 value = new Table(msg);
554 break;
555 default:
556 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: invalid type %d of DCI %s [%d] on node %s [%d]"), type, dcObject->getName(), dciId, node->getName(), node->getId());
557 return ERR_INTERNAL_ERROR;
558 }
df94243f 559 DbgPrintf(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on node %s [%d]"), dcObject->getName(), dciId, type, status, node->getName(), node->getId());
02d936bd 560
df94243f 561 switch(status)
562 {
563 case ERR_SUCCESS:
564 {
565 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
566 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
567 success = node->processNewDCValue(dcObject, t, value);
568 if (t > dcObject->getLastPollTime())
569 dcObject->setLastPollTime(t);
570 break;
571 }
572 case ERR_UNKNOWN_PARAMETER:
573 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
574 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
575 dcObject->processNewError(false, t);
576 break;
577 case ERR_NO_SUCH_INSTANCE:
578 dcObject->processNewError(true, t);
579 break;
580 case ERR_INTERNAL_ERROR:
581 dcObject->processNewError(true, t);
582 break;
583 }
02d936bd
VK
584
585 switch(type)
586 {
587 case DCO_TYPE_ITEM:
588 free(value);
589 break;
590 case DCO_TYPE_LIST:
591 delete (StringList *)value;
592 break;
02d936bd 593 }
366440e8 594
02d936bd 595 return success ? ERR_SUCCESS : ERR_INTERNAL_ERROR;
6fbaa926 596}
a1273b42
VK
597
598/**
599 * Process collected data information in bulk mode (for DCI with agent-side cache)
600 */
601UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
602{
603 if (g_flags & AF_SHUTDOWN)
604 return ERR_INTERNAL_ERROR;
605
606 if (m_nodeId == 0)
607 {
608 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: node ID is 0 for agent session"));
609 return ERR_INTERNAL_ERROR;
610 }
611
612 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
613 if (node == NULL)
614 {
615 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
616 return ERR_INTERNAL_ERROR;
617 }
618
619 int count = request->getFieldAsInt16(VID_NUM_ELEMENTS);
9cf1b53c
VK
620 if (count > MAX_BULK_DATA_BLOCK_SIZE)
621 count = MAX_BULK_DATA_BLOCK_SIZE;
a1273b42
VK
622 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: %d elements from node %s [%d]"), count, node->getName(), node->getId());
623
9cf1b53c
VK
624 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
625 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
a1273b42
VK
626 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
627 for(int i = 0; i < count; i++, fieldId += 10)
628 {
629 int origin = request->getFieldAsInt16(fieldId + 1);
630 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
631 {
632 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: unsupported data source type %d (element %d)"), origin, i);
633 status[i] = BULK_DATA_REC_FAILURE;
634 continue;
635 }
636
637 uuid nodeId = request->getFieldAsGUID(fieldId + 3);
638 if (!nodeId.isNull())
639 {
640 Node *targetNode = (Node *)FindObjectByGUID(nodeId, OBJECT_NODE);
641 if (targetNode == NULL)
642 {
643 TCHAR buffer[64];
644 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find target node with GUID %s (element %d)"), nodeId.toString(buffer), i);
645 status[i] = BULK_DATA_REC_FAILURE;
646 continue;
647 }
648 node = targetNode;
649 }
650
651 UINT32 dciId = request->getFieldAsUInt32(fieldId);
652 DCObject *dcObject = node->getDCObjectById(dciId);
653 if (dcObject == NULL)
654 {
655 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find DCI with ID %d on node %s [%d] (element %d)"), dciId, node->getName(), node->getId(), i);
656 status[i] = BULK_DATA_REC_FAILURE;
657 continue;
658 }
659
660 int type = request->getFieldAsInt16(fieldId + 2);
661 if ((type != DCO_TYPE_ITEM) || (dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
662 {
663 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: DCI %s [%d] on node %s [%d] configuration mismatch (element %d)"), dcObject->getName(), dciId, node->getName(), node->getId(), i);
664 status[i] = BULK_DATA_REC_FAILURE;
665 continue;
666 }
667
668 void *value = request->getFieldAsString(fieldId + 5);
df94243f 669 UINT32 status_code = request->getFieldAsUInt32(fieldId + 6);
670 DbgPrintf(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on node %s [%d] (element %d)"), dcObject->getName(), dciId, type, status, node->getName(), node->getId(), i);
a1273b42 671 time_t t = request->getFieldAsTime(fieldId + 4);
df94243f 672 bool success = true;
673
674 switch(status_code)
675 {
676 case ERR_SUCCESS:
677 {
678 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
679 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
680 success = node->processNewDCValue(dcObject, t, value);
681 if (t > dcObject->getLastPollTime())
682 dcObject->setLastPollTime(t);
683 break;
684 }
685 case ERR_UNKNOWN_PARAMETER:
686 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
687 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
688 dcObject->processNewError(false, t);
689 break;
690 case ERR_NO_SUCH_INSTANCE:
691 dcObject->processNewError(true, t);
692 break;
693 case ERR_INTERNAL_ERROR:
694 dcObject->processNewError(true, t);
695 break;
696 }
a1273b42 697
df94243f 698
699 status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
a1273b42
VK
700 free(value);
701 }
702
9cf1b53c 703 response->setField(VID_STATUS, status, count);
a1273b42
VK
704 return ERR_SUCCESS;
705}