implemented syslog proxy in agent (issue #568)
[public/netxms.git] / src / server / core / agent.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: agent.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Externals
27 */
28 void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, UINT32 zoneId, int srcPort, SNMP_Transport *pTransport, SNMP_Engine *localEngine, bool isInformRq);
29 void QueueProxiedSyslogMessage(const InetAddress &addr, UINT32 zoneId, time_t timestamp, const char *msg, int msgLen);
30
31 /**
32 * Destructor for extended agent connection class
33 */
34 AgentConnectionEx::~AgentConnectionEx()
35 {
36 }
37
38 /**
39 * Trap processor
40 */
41 void AgentConnectionEx::onTrap(NXCPMessage *pMsg)
42 {
43 UINT32 dwEventCode;
44 int i, iNumArgs;
45 Node *pNode = NULL;
46 TCHAR *pszArgList[32], szBuffer[32];
47 char szFormat[] = "ssssssssssssssssssssssssssssssss";
48
49 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Received trap message from agent at %s, node ID %d"), getIpAddr().toString(szBuffer), m_nodeId);
50 if (m_nodeId != 0)
51 pNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
52 if (pNode == NULL)
53 pNode = FindNodeByIP(0, getIpAddr());
54 if (pNode != NULL)
55 {
56 if (pNode->getStatus() != STATUS_UNMANAGED)
57 {
58 // Check for duplicate traps - only accept traps with ID
59 // higher than last received
60 // agents prior to 1.1.6 will not send trap id
61 // we should accept trap in that case to maintain compatibility
62 bool acceptTrap;
63 QWORD trapId = pMsg->getFieldAsUInt64(VID_TRAP_ID);
64 if (trapId != 0)
65 {
66 acceptTrap = pNode->checkAgentTrapId(trapId);
67 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
68 }
69 else
70 {
71 acceptTrap = true;
72 DbgPrintf(5, _T("AgentConnectionEx::onTrap(): trap ID not provided"));
73 }
74
75 if (acceptTrap)
76 {
77 dwEventCode = pMsg->getFieldAsUInt32(VID_EVENT_CODE);
78 if ((dwEventCode == 0) && pMsg->isFieldExist(VID_EVENT_NAME))
79 {
80 TCHAR eventName[256];
81 pMsg->getFieldAsString(VID_EVENT_NAME, eventName, 256);
82 dwEventCode = EventCodeFromName(eventName, 0);
83 }
84 iNumArgs = (int)pMsg->getFieldAsUInt16(VID_NUM_ARGS);
85 if (iNumArgs > 32)
86 iNumArgs = 32;
87 for(i = 0; i < iNumArgs; i++)
88 pszArgList[i] = pMsg->getFieldAsString(VID_EVENT_ARG_BASE + i);
89 DbgPrintf(3, _T("Event from trap: %d"), dwEventCode);
90
91 szFormat[iNumArgs] = 0;
92 PostEvent(dwEventCode, pNode->getId(), (iNumArgs > 0) ? szFormat : NULL,
93 pszArgList[0], pszArgList[1], pszArgList[2], pszArgList[3],
94 pszArgList[4], pszArgList[5], pszArgList[6], pszArgList[7],
95 pszArgList[8], pszArgList[9], pszArgList[10], pszArgList[11],
96 pszArgList[12], pszArgList[13], pszArgList[14], pszArgList[15],
97 pszArgList[16], pszArgList[17], pszArgList[18], pszArgList[19],
98 pszArgList[20], pszArgList[21], pszArgList[22], pszArgList[23],
99 pszArgList[24], pszArgList[25], pszArgList[26], pszArgList[27],
100 pszArgList[28], pszArgList[29], pszArgList[30], pszArgList[31]);
101
102 // Cleanup
103 for(i = 0; i < iNumArgs; i++)
104 free(pszArgList[i]);
105 }
106 }
107 else
108 {
109 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): node %s [%d] in in UNMANAGED state - trap ignored"), pNode->getName(), pNode->getId());
110 }
111 }
112 else
113 {
114 DbgPrintf(3, _T("AgentConnectionEx::onTrap(): Cannot find node for IP address %s"), getIpAddr().toString(szBuffer));
115 }
116 }
117
118 /**
119 * Incoming syslog message processor
120 */
121 void AgentConnectionEx::onSyslogMessage(NXCPMessage *msg)
122 {
123 TCHAR buffer[64];
124 nxlog_debug(3, _T("AgentConnectionEx::onSyslogMessage(): Received message from agent at %s, node ID %d"), getIpAddr().toString(buffer), m_nodeId);
125
126 UINT32 zoneId = msg->getFieldAsUInt32(VID_ZONE_ID);
127 Node *node = NULL;
128 if (m_nodeId != 0)
129 node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
130 if (node == NULL)
131 node = FindNodeByIP(zoneId, getIpAddr());
132 if (node != NULL)
133 {
134 // Check for duplicate messages - only accept messages with ID
135 // higher than last received
136 if (node->checkSyslogMessageId(msg->getFieldAsUInt64(VID_REQUEST_ID)))
137 {
138 int msgLen = msg->getFieldAsInt32(VID_MESSAGE_LENGTH);
139 if (msgLen < 2048)
140 {
141 char message[2048];
142 msg->getFieldAsBinary(VID_MESSAGE, (BYTE *)message, msgLen + 1);
143 QueueProxiedSyslogMessage(msg->getFieldAsInetAddress(VID_IP_ADDRESS), msg->getFieldAsUInt32(VID_ZONE_ID),
144 msg->getFieldAsTime(VID_TIMESTAMP), message, msgLen);
145 }
146 }
147 else
148 {
149 nxlog_debug(5, _T("AgentConnectionEx::onSyslogMessage(): message ID is invalid (node %s [%d])"), node->getName(), node->getId());
150 }
151 }
152 else
153 {
154 nxlog_debug(5, _T("AgentConnectionEx::onSyslogMessage(): Cannot find node for IP address %s"), getIpAddr().toString(buffer));
155 }
156 }
157
158 /**
159 * Handler for data push
160 */
161 void AgentConnectionEx::onDataPush(NXCPMessage *msg)
162 {
163 if (g_flags & AF_SHUTDOWN)
164 return;
165
166 TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
167 msg->getFieldAsString(VID_NAME, name, MAX_PARAM_NAME);
168 msg->getFieldAsString(VID_VALUE, value, MAX_RESULT_LENGTH);
169
170 Node *sender = NULL;
171 if (m_nodeId != 0)
172 sender = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
173 if (sender == NULL)
174 sender = FindNodeByIP(0, getIpAddr());
175
176 if (sender != NULL)
177 {
178 // Check for duplicate data requests - only accept requests with ID
179 // higher than last received
180 // agents prior to 1.2.10 will not send request id
181 // we should accept data in that case to maintain compatibility
182 bool acceptRequest;
183 QWORD requestId = msg->getFieldAsUInt64(VID_REQUEST_ID);
184 if (requestId != 0)
185 {
186 acceptRequest = sender->checkAgentPushRequestId(requestId);
187 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): requestId is%s valid"), acceptRequest ? _T("") : _T(" not"));
188 }
189 else
190 {
191 acceptRequest = true;
192 DbgPrintf(5, _T("AgentConnectionEx::onDataPush(): request ID not provided"));
193 }
194
195 if (acceptRequest)
196 {
197 Node *target;
198 UINT32 objectId = msg->getFieldAsUInt32(VID_OBJECT_ID);
199 if (objectId != 0)
200 {
201 // push on behalf of other node
202 target = (Node *)FindObjectById(objectId, OBJECT_NODE);
203 if (target != NULL)
204 {
205 if (target->isTrustedNode(sender->getId()))
206 {
207 DbgPrintf(5, _T("%s: agent data push: target set to %s [%d]"), sender->getName(), target->getName(), target->getId());
208 }
209 else
210 {
211 DbgPrintf(5, _T("%s: agent data push: not in trusted node list for target %s [%d]"), sender->getName(), target->getName(), target->getId());
212 target = NULL;
213 }
214 }
215 }
216 else
217 {
218 target = sender;
219 }
220
221 if (target != NULL)
222 {
223 DbgPrintf(5, _T("%s: agent data push: %s=%s"), target->getName(), name, value);
224 DCObject *dci = target->getDCObjectByName(name);
225 if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM) && (dci->getDataSource() == DS_PUSH_AGENT) && (dci->getStatus() == ITEM_STATUS_ACTIVE))
226 {
227 DbgPrintf(5, _T("%s: agent data push: found DCI %d"), target->getName(), dci->getId());
228 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
229 if (t == 0)
230 t = time(NULL);
231 target->processNewDCValue(dci, t, value);
232 if (t > dci->getLastPollTime())
233 dci->setLastPollTime(t);
234 }
235 else
236 {
237 DbgPrintf(5, _T("%s: agent data push: DCI not found for %s"), target->getName(), name);
238 }
239 }
240 else
241 {
242 DbgPrintf(5, _T("%s: agent data push: target node not found or not accessible"), sender->getName());
243 }
244 }
245 }
246 }
247
248 /**
249 * Print message.
250 */
251 void AgentConnectionEx::printMsg(const TCHAR *format, ...)
252 {
253 va_list args;
254 va_start(args, format);
255 nxlog_debug2(6, format, args);
256 va_end(args);
257 }
258
259 /**
260 * Cancel unknown file monitoring
261 */
262 static void CancelUnknownFileMonitoring(Node *object,TCHAR *remoteFile)
263 {
264 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: unknown subscription will be canceled."));
265 AgentConnection *conn = object->createAgentConnection();
266 if(conn != NULL)
267 {
268 NXCPMessage request;
269 request.setId(conn->generateRequestId());
270 request.setCode(CMD_CANCEL_FILE_MONITORING);
271 request.setField(VID_FILE_NAME, remoteFile);
272 request.setField(VID_OBJECT_ID, object->getId());
273 NXCPMessage* response = conn->customRequest(&request);
274 delete response;
275 conn->decRefCount();
276 }
277 }
278
279 /**
280 * Recieve file monitoring information and resend to all required user sessions
281 */
282 void AgentConnectionEx::onFileMonitoringData(NXCPMessage *pMsg)
283 {
284 Node *object = NULL;
285 if (m_nodeId != 0)
286 object = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
287 if (object != NULL)
288 {
289 TCHAR remoteFile[MAX_PATH];
290 pMsg->getFieldAsString(VID_FILE_NAME, remoteFile, MAX_PATH);
291 ObjectArray<ClientSession>* result = g_monitoringList.findClientByFNameAndNodeID(remoteFile, object->getId());
292 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: found %d sessions for remote file %s on node %s [%d]"), result->size(), remoteFile, object->getName(), object->getId());
293 int validSessionCount = result->size();
294 for(int i = 0; i < result->size(); i++)
295 {
296 if(!result->get(i)->sendMessage(pMsg))
297 {
298 MONITORED_FILE file;
299 _tcsncpy(file.fileName, remoteFile, MAX_PATH);
300 file.nodeID = m_nodeId;
301 file.session = result->get(i);
302 g_monitoringList.removeMonitoringFile(&file);
303 validSessionCount--;
304
305 if (validSessionCount == 0)
306 CancelUnknownFileMonitoring(object, remoteFile);
307 }
308 }
309 if (result->size() == 0)
310 {
311 CancelUnknownFileMonitoring(object, remoteFile);
312 }
313 delete result;
314 }
315 else
316 {
317 g_monitoringList.removeDisconnectedNode(m_nodeId);
318 DbgPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: node object not found"));
319 }
320 }
321
322 /**
323 * Ask modules if they can procress custom message
324 */
325 bool AgentConnectionEx::processCustomMessage(NXCPMessage *msg)
326 {
327 TCHAR buffer[128];
328 DbgPrintf(6, _T("AgentConnectionEx::processCustomMessage: processing message %s ID %d"),
329 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
330
331 for(UINT32 i = 0; i < g_dwNumModules; i++)
332 {
333 if (g_pModuleList[i].pfOnAgentMessage != NULL)
334 {
335 if (g_pModuleList[i].pfOnAgentMessage(msg, m_nodeId))
336 return true; // accepted by module
337 }
338 }
339 return false;
340 }
341
342 /**
343 * Create SNMP proxy transport for sending trap response
344 */
345 static SNMP_ProxyTransport *CreateSNMPProxyTransport(AgentConnectionEx *conn, Node *originNode, const InetAddress& originAddr, UINT16 port)
346 {
347 conn->incRefCount();
348 SNMP_ProxyTransport *snmpTransport = new SNMP_ProxyTransport(conn, originAddr, port);
349 if (originNode != NULL)
350 {
351 snmpTransport->setSecurityContext(originNode->getSnmpSecurityContext());
352 }
353 return snmpTransport;
354 }
355
356 /**
357 * Recieve trap sent throught proxy agent
358 */
359 void AgentConnectionEx::onSnmpTrap(NXCPMessage *msg)
360 {
361 Node *proxyNode = NULL;
362 TCHAR ipStringBuffer[4096];
363
364 static BYTE engineId[] = { 0x80, 0x00, 0x00, 0x00, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01, 0x00 };
365 SNMP_Engine localEngine(engineId, 12);
366
367 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Received SNMP trap message from agent at %s, node ID %d"),
368 getIpAddr().toString(ipStringBuffer), m_nodeId);
369
370 if (m_nodeId != 0)
371 proxyNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
372 if (proxyNode != NULL)
373 {
374 // Check for duplicate traps - only accept traps with ID
375 // higher than last received
376 bool acceptTrap;
377 UINT32 trapId = msg->getId();
378 if (trapId != 0)
379 {
380 acceptTrap = proxyNode->checkSNMPTrapId(trapId);
381 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
382 }
383 else
384 {
385 acceptTrap = false;
386 DbgPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trap ID not provided"));
387 }
388
389 if (acceptTrap)
390 {
391 InetAddress originSenderIP = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
392 UINT32 pduLenght = msg->getFieldAsUInt32(VID_PDU_SIZE);
393 BYTE *pduBytes = (BYTE*)malloc(pduLenght);
394 msg->getFieldAsBinary(VID_PDU, pduBytes, pduLenght);
395 UINT32 zoneId = IsZoningEnabled() ? msg->getFieldAsUInt32(VID_ZONE_ID) : 0;
396 Node *originNode = FindNodeByIP(zoneId, originSenderIP);
397 if ((originNode != NULL) || ConfigReadInt(_T("LogAllSNMPTraps"), FALSE))
398 {
399 SNMP_PDU *pdu = new SNMP_PDU;
400 if (pdu->parse(pduBytes, pduLenght, (originNode != NULL) ? originNode->getSnmpSecurityContext() : NULL, true))
401 {
402 DbgPrintf(6, _T("SNMPTrapReceiver: received PDU of type %d"), pdu->getCommand());
403 if ((pdu->getCommand() == SNMP_TRAP) || (pdu->getCommand() == SNMP_INFORM_REQUEST))
404 {
405 bool isInformRequest = (pdu->getCommand() == SNMP_INFORM_REQUEST);
406 SNMP_ProxyTransport *snmpTransport = isInformRequest ? CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT)) : NULL;
407 if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_INFORM_REQUEST))
408 {
409 SNMP_SecurityContext *context = snmpTransport->getSecurityContext();
410 context->setAuthoritativeEngine(localEngine);
411 }
412 ProcessTrap(pdu, originSenderIP, zoneId, msg->getFieldAsUInt16(VID_PORT), snmpTransport, &localEngine, isInformRequest);
413 delete snmpTransport;
414 }
415 else if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_GET_REQUEST) && (pdu->getAuthoritativeEngine().getIdLen() == 0))
416 {
417 // Engine ID discovery
418 DbgPrintf(6, _T("SNMPTrapReceiver: EngineId discovery"));
419
420 SNMP_ProxyTransport *snmpTransport = CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT));
421
422 SNMP_PDU *response = new SNMP_PDU(SNMP_REPORT, pdu->getRequestId(), pdu->getVersion());
423 response->setReportable(false);
424 response->setMessageId(pdu->getMessageId());
425 response->setContextEngineId(localEngine.getId(), localEngine.getIdLen());
426
427 SNMP_Variable *var = new SNMP_Variable(_T(".1.3.6.1.6.3.15.1.1.4.0"));
428 var->setValueFromString(ASN_INTEGER, _T("2"));
429 response->bindVariable(var);
430
431 SNMP_SecurityContext *context = new SNMP_SecurityContext();
432 localEngine.setTime((int)time(NULL));
433 context->setAuthoritativeEngine(localEngine);
434 context->setSecurityModel(SNMP_SECURITY_MODEL_USM);
435 context->setAuthMethod(SNMP_AUTH_NONE);
436 context->setPrivMethod(SNMP_ENCRYPT_NONE);
437 snmpTransport->setSecurityContext(context);
438
439 snmpTransport->setWaitForResponse(false);
440 snmpTransport->sendMessage(response);
441 delete response;
442 delete snmpTransport;
443 }
444 else if (pdu->getCommand() == SNMP_REPORT)
445 {
446 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
447 }
448 delete pdu;
449 }
450 else if (pdu->getCommand() == SNMP_REPORT)
451 {
452 DbgPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
453 }
454 }
455 else
456 {
457 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): cannot find origin node with IP %s and not accepting traps from unknown sources"), originSenderIP.toString(ipStringBuffer));
458 }
459 }
460 }
461 else
462 {
463 DbgPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Cannot find node for IP address %s"), getIpAddr().toString(ipStringBuffer));
464 }
465 }
466
467 /**
468 * Deploy policy to agent
469 */
470 UINT32 AgentConnectionEx::deployPolicy(AgentPolicy *policy)
471 {
472 UINT32 rqId, rcc;
473 NXCPMessage msg(getProtocolVersion());
474
475 rqId = generateRequestId();
476 msg.setId(rqId);
477 msg.setCode(CMD_DEPLOY_AGENT_POLICY);
478 if (policy->createDeploymentMessage(&msg))
479 {
480 if (sendMessage(&msg))
481 {
482 rcc = waitForRCC(rqId, getCommandTimeout());
483 }
484 else
485 {
486 rcc = ERR_CONNECTION_BROKEN;
487 }
488 }
489 else
490 {
491 rcc = ERR_INTERNAL_ERROR;
492 }
493 return rcc;
494 }
495
496 /**
497 * Uninstall policy from agent
498 */
499 UINT32 AgentConnectionEx::uninstallPolicy(AgentPolicy *policy)
500 {
501 UINT32 rqId, rcc;
502 NXCPMessage msg(getProtocolVersion());
503
504 rqId = generateRequestId();
505 msg.setId(rqId);
506 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
507 if (policy->createUninstallMessage(&msg))
508 {
509 if (sendMessage(&msg))
510 {
511 rcc = waitForRCC(rqId, getCommandTimeout());
512 }
513 else
514 {
515 rcc = ERR_CONNECTION_BROKEN;
516 }
517 }
518 else
519 {
520 rcc = ERR_INTERNAL_ERROR;
521 }
522 return rcc;
523 }
524
525 /**
526 * Process collected data information (for DCI with agent-side cache)
527 */
528 UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
529 {
530 if (g_flags & AF_SHUTDOWN)
531 return ERR_INTERNAL_ERROR;
532
533 if (m_nodeId == 0)
534 {
535 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: node ID is 0 for agent session"));
536 return ERR_INTERNAL_ERROR;
537 }
538
539 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
540 if (node == NULL)
541 {
542 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
543 return ERR_INTERNAL_ERROR;
544 }
545
546 int origin = msg->getFieldAsInt16(VID_DCI_SOURCE_TYPE);
547 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
548 {
549 DbgPrintf(5, _T("AgentConnectionEx::processCollectedData: unsupported data source type %d"), origin);
550 return ERR_INTERNAL_ERROR;
551 }
552
553 DataCollectionTarget *target;
554 uuid targetId = msg->getFieldAsGUID(VID_NODE_ID);
555 if (!targetId.isNull())
556 {
557 NetObj *object = FindObjectByGUID(targetId, -1);
558 if (object == NULL)
559 {
560 TCHAR buffer[64];
561 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: cannot find target node with GUID %s"), targetId.toString(buffer));
562 return ERR_INTERNAL_ERROR;
563 }
564 if (!object->isDataCollectionTarget())
565 {
566 TCHAR buffer[64];
567 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: object with GUID %s is not a data collection target"), targetId.toString(buffer));
568 return ERR_INTERNAL_ERROR;
569 }
570 target = (DataCollectionTarget *)object;
571 }
572 else
573 {
574 target = node;
575 }
576
577 UINT32 dciId = msg->getFieldAsUInt32(VID_DCI_ID);
578 DCObject *dcObject = target->getDCObjectById(dciId);
579 if (dcObject == NULL)
580 {
581 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: cannot find DCI with ID %d on object %s [%d]"),
582 dciId, target->getName(), target->getId());
583 return ERR_INTERNAL_ERROR;
584 }
585
586 int type = msg->getFieldAsInt16(VID_DCOBJECT_TYPE);
587 if ((dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
588 {
589 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch"),
590 dcObject->getName(), dciId, target->getName(), target->getId());
591 return ERR_INTERNAL_ERROR;
592 }
593
594 time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
595 UINT32 status = msg->getFieldAsUInt32(VID_STATUS);
596 bool success = true;
597
598 void *value;
599 switch(type)
600 {
601 case DCO_TYPE_ITEM:
602 value = msg->getFieldAsString(VID_VALUE);
603 break;
604 case DCO_TYPE_LIST:
605 value = new StringList();
606 break;
607 case DCO_TYPE_TABLE:
608 value = new Table(msg);
609 break;
610 default:
611 nxlog_debug(5, _T("AgentConnectionEx::processCollectedData: invalid type %d of DCI %s [%d] on object %s [%d]"),
612 type, dcObject->getName(), dciId, target->getName(), target->getId());
613 return ERR_INTERNAL_ERROR;
614 }
615 nxlog_debug(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d]"),
616 dcObject->getName(), dciId, type, status, target->getName(), target->getId());
617
618 switch(status)
619 {
620 case ERR_SUCCESS:
621 {
622 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
623 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
624 success = target->processNewDCValue(dcObject, t, value);
625 if (t > dcObject->getLastPollTime())
626 dcObject->setLastPollTime(t);
627 break;
628 }
629 case ERR_UNKNOWN_PARAMETER:
630 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
631 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
632 dcObject->processNewError(false, t);
633 break;
634 case ERR_NO_SUCH_INSTANCE:
635 dcObject->processNewError(true, t);
636 break;
637 case ERR_INTERNAL_ERROR:
638 dcObject->processNewError(true, t);
639 break;
640 }
641
642 switch(type)
643 {
644 case DCO_TYPE_ITEM:
645 free(value);
646 break;
647 case DCO_TYPE_LIST:
648 delete (StringList *)value;
649 break;
650 }
651
652 return success ? ERR_SUCCESS : ERR_INTERNAL_ERROR;
653 }
654
655 /**
656 * Process collected data information in bulk mode (for DCI with agent-side cache)
657 */
658 UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
659 {
660 if (g_flags & AF_SHUTDOWN)
661 return ERR_INTERNAL_ERROR;
662
663 if (m_nodeId == 0)
664 {
665 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: node ID is 0 for agent session"));
666 return ERR_INTERNAL_ERROR;
667 }
668
669 Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
670 if (node == NULL)
671 {
672 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
673 return ERR_INTERNAL_ERROR;
674 }
675
676 int count = request->getFieldAsInt16(VID_NUM_ELEMENTS);
677 if (count > MAX_BULK_DATA_BLOCK_SIZE)
678 count = MAX_BULK_DATA_BLOCK_SIZE;
679 DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: %d elements from node %s [%d]"), count, node->getName(), node->getId());
680
681 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
682 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
683 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
684 for(int i = 0; i < count; i++, fieldId += 10)
685 {
686 int origin = request->getFieldAsInt16(fieldId + 1);
687 if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
688 {
689 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: unsupported data source type %d (element %d)"), origin, i);
690 status[i] = BULK_DATA_REC_FAILURE;
691 continue;
692 }
693
694 DataCollectionTarget *target;
695 uuid targetId = request->getFieldAsGUID(fieldId + 3);
696 if (!targetId.isNull())
697 {
698 NetObj *object = FindObjectByGUID(targetId, -1);
699 if (object == NULL)
700 {
701 TCHAR buffer[64];
702 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find target object with GUID %s (element %d)"),
703 targetId.toString(buffer), i);
704 status[i] = BULK_DATA_REC_FAILURE;
705 continue;
706 }
707 if (!object->isDataCollectionTarget())
708 {
709 TCHAR buffer[64];
710 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: object with GUID %s (element %d) is not a data collection target"),
711 targetId.toString(buffer), i);
712 status[i] = BULK_DATA_REC_FAILURE;
713 continue;
714 }
715 target = (DataCollectionTarget *)object;
716 }
717 else
718 {
719 target = node;
720 }
721
722 UINT32 dciId = request->getFieldAsUInt32(fieldId);
723 DCObject *dcObject = target->getDCObjectById(dciId);
724 if (dcObject == NULL)
725 {
726 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find DCI with ID %d on object %s [%d] (element %d)"),
727 dciId, target->getName(), target->getId(), i);
728 status[i] = BULK_DATA_REC_FAILURE;
729 continue;
730 }
731
732 int type = request->getFieldAsInt16(fieldId + 2);
733 if ((type != DCO_TYPE_ITEM) || (dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
734 {
735 nxlog_debug(5, _T("AgentConnectionEx::processBulkCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch (element %d)"),
736 dcObject->getName(), dciId, target->getName(), target->getId(), i);
737 status[i] = BULK_DATA_REC_FAILURE;
738 continue;
739 }
740
741 void *value = request->getFieldAsString(fieldId + 5);
742 UINT32 status_code = request->getFieldAsUInt32(fieldId + 6);
743 nxlog_debug(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d] (element %d)"),
744 dcObject->getName(), dciId, type, status, target->getName(), target->getId(), i);
745 time_t t = request->getFieldAsTime(fieldId + 4);
746 bool success = true;
747
748 switch(status_code)
749 {
750 case ERR_SUCCESS:
751 {
752 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
753 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
754 success = target->processNewDCValue(dcObject, t, value);
755 if (t > dcObject->getLastPollTime())
756 dcObject->setLastPollTime(t);
757 break;
758 }
759 case ERR_UNKNOWN_PARAMETER:
760 if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
761 dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
762 dcObject->processNewError(false, t);
763 break;
764 case ERR_NO_SUCH_INSTANCE:
765 dcObject->processNewError(true, t);
766 break;
767 case ERR_INTERNAL_ERROR:
768 dcObject->processNewError(true, t);
769 break;
770 }
771
772
773 status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
774 free(value);
775 }
776
777 response->setField(VID_STATUS, status, count);
778 return ERR_SUCCESS;
779 }