all pollers converted to single thread pool
[public/netxms.git] / src / server / core / cluster.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2013 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: cluster.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Cluster class default constructor
27 */
28 Cluster::Cluster() : DataCollectionTarget()
29 {
30 m_dwClusterType = 0;
31 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
32 m_dwNumResources = 0;
33 m_pResourceList = NULL;
34 m_tmLastPoll = 0;
35 m_dwFlags = 0;
36 m_zoneId = 0;
37 }
38
39 /**
40 * Cluster class new object constructor
41 */
42 Cluster::Cluster(const TCHAR *pszName, UINT32 zoneId) : DataCollectionTarget(pszName)
43 {
44 m_dwClusterType = 0;
45 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
46 m_dwNumResources = 0;
47 m_pResourceList = NULL;
48 m_tmLastPoll = 0;
49 m_dwFlags = 0;
50 m_zoneId = zoneId;
51 }
52
53 /**
54 * Destructor
55 */
56 Cluster::~Cluster()
57 {
58 delete m_syncNetworks;
59 safe_free(m_pResourceList);
60 }
61
62 /**
63 * Create object from database data
64 */
65 BOOL Cluster::loadFromDatabase(UINT32 dwId)
66 {
67 TCHAR szQuery[256];
68 BOOL bResult = FALSE;
69 DB_RESULT hResult;
70 UINT32 dwNodeId;
71 NetObj *pObject;
72 int i, nRows;
73
74 m_id = dwId;
75
76 if (!loadCommonProperties())
77 {
78 DbgPrintf(2, _T("Cannot load common properties for cluster object %d"), dwId);
79 return FALSE;
80 }
81
82 _sntprintf(szQuery, 256, _T("SELECT cluster_type,zone_guid FROM clusters WHERE id=%d"), (int)m_id);
83 hResult = DBSelect(g_hCoreDB, szQuery);
84 if (hResult == NULL)
85 return FALSE;
86
87 m_dwClusterType = DBGetFieldULong(hResult, 0, 0);
88 m_zoneId = DBGetFieldULong(hResult, 0, 1);
89 DBFreeResult(hResult);
90
91 // Load DCI and access list
92 loadACLFromDB();
93 loadItemsFromDB();
94 for(i = 0; i < m_dcObjects->size(); i++)
95 if (!m_dcObjects->get(i)->loadThresholdsFromDB())
96 return FALSE;
97
98 if (!m_isDeleted)
99 {
100 // Load member nodes
101 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT node_id FROM cluster_members WHERE cluster_id=%d"), m_id);
102 hResult = DBSelect(g_hCoreDB, szQuery);
103 if (hResult != NULL)
104 {
105 nRows = DBGetNumRows(hResult);
106 for(i = 0; i < nRows; i++)
107 {
108 dwNodeId = DBGetFieldULong(hResult, i, 0);
109 pObject = FindObjectById(dwNodeId);
110 if (pObject != NULL)
111 {
112 if (pObject->getObjectClass() == OBJECT_NODE)
113 {
114 AddChild(pObject);
115 pObject->AddParent(this);
116 }
117 else
118 {
119 nxlog_write(MSG_CLUSTER_MEMBER_NOT_NODE, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
120 break;
121 }
122 }
123 else
124 {
125 nxlog_write(MSG_INVALID_CLUSTER_MEMBER, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
126 break;
127 }
128 }
129 if (i == nRows)
130 bResult = TRUE;
131 DBFreeResult(hResult);
132 }
133
134 // Load sync net list
135 if (bResult)
136 {
137 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT subnet_addr,subnet_mask FROM cluster_sync_subnets WHERE cluster_id=%d"), m_id);
138 hResult = DBSelect(g_hCoreDB, szQuery);
139 if (hResult != NULL)
140 {
141 int count = DBGetNumRows(hResult);
142 for(i = 0; i < count; i++)
143 {
144 InetAddress *addr = new InetAddress(DBGetFieldInetAddr(hResult, i, 0));
145 addr->setMaskBits(DBGetFieldLong(hResult, i, 1));
146 m_syncNetworks->add(addr);
147 }
148 DBFreeResult(hResult);
149 }
150 else
151 {
152 bResult = FALSE;
153 }
154 }
155
156 // Load resources
157 if (bResult)
158 {
159 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT resource_id,resource_name,ip_addr,current_owner FROM cluster_resources WHERE cluster_id=%d"), m_id);
160 hResult = DBSelect(g_hCoreDB, szQuery);
161 if (hResult != NULL)
162 {
163 m_dwNumResources = DBGetNumRows(hResult);
164 if (m_dwNumResources > 0)
165 {
166 m_pResourceList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * m_dwNumResources);
167 for(i = 0; i < (int)m_dwNumResources; i++)
168 {
169 m_pResourceList[i].dwId = DBGetFieldULong(hResult, i, 0);
170 DBGetField(hResult, i, 1, m_pResourceList[i].szName, MAX_DB_STRING);
171 m_pResourceList[i].ipAddr = DBGetFieldInetAddr(hResult, i, 2);
172 m_pResourceList[i].dwCurrOwner = DBGetFieldULong(hResult, i, 3);
173 }
174 }
175 DBFreeResult(hResult);
176 }
177 else
178 {
179 bResult = FALSE;
180 }
181 }
182 }
183 else
184 {
185 bResult = TRUE;
186 }
187
188 return bResult;
189 }
190
191 /**
192 * Save object to database
193 */
194 BOOL Cluster::saveToDatabase(DB_HANDLE hdb)
195 {
196 TCHAR szQuery[4096], szIpAddr[64];
197 BOOL bResult;
198 UINT32 i;
199
200 // Lock object's access
201 lockProperties();
202
203 saveCommonProperties(hdb);
204
205 if (IsDatabaseRecordExist(hdb, _T("clusters"), _T("id"), m_id))
206 _sntprintf(szQuery, 4096,
207 _T("UPDATE clusters SET cluster_type=%d,zone_guid=%d WHERE id=%d"),
208 (int)m_dwClusterType, (int)m_zoneId, (int)m_id);
209 else
210 _sntprintf(szQuery, 4096,
211 _T("INSERT INTO clusters (id,cluster_type,zone_guid) VALUES (%d,%d,%d)"),
212 (int)m_id, (int)m_dwClusterType, (int)m_zoneId);
213 bResult = DBQuery(hdb, szQuery);
214
215 // Save data collection items
216 if (bResult)
217 {
218 lockDciAccess(false);
219 for(i = 0; i < (UINT32)m_dcObjects->size(); i++)
220 m_dcObjects->get(i)->saveToDB(hdb);
221 unlockDciAccess();
222
223 // Save cluster members list
224 if (DBBegin(hdb))
225 {
226 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM cluster_members WHERE cluster_id=%d"), m_id);
227 DBQuery(hdb, szQuery);
228 LockChildList(FALSE);
229 for(i = 0; i < m_dwChildCount; i++)
230 {
231 if (m_pChildList[i]->getObjectClass() == OBJECT_NODE)
232 {
233 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("INSERT INTO cluster_members (cluster_id,node_id) VALUES (%d,%d)"),
234 m_id, m_pChildList[i]->getId());
235 bResult = DBQuery(hdb, szQuery);
236 if (!bResult)
237 break;
238 }
239 }
240 UnlockChildList();
241 if (bResult)
242 DBCommit(hdb);
243 else
244 DBRollback(hdb);
245 }
246 else
247 {
248 bResult = FALSE;
249 }
250
251 // Save sync net list
252 if (bResult)
253 {
254 if (DBBegin(hdb))
255 {
256 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=%d"), m_id);
257 DBQuery(hdb, szQuery);
258 for(int i = 0; i < m_syncNetworks->size(); i++)
259 {
260 InetAddress *net = m_syncNetworks->get(i);
261 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("INSERT INTO cluster_sync_subnets (cluster_id,subnet_addr,subnet_mask) VALUES (%d,'%s',%d)"),
262 (int)m_id, net->toString(szIpAddr), net->getMaskBits());
263 bResult = DBQuery(hdb, szQuery);
264 if (!bResult)
265 break;
266 }
267 if (bResult)
268 DBCommit(hdb);
269 else
270 DBRollback(hdb);
271 }
272 else
273 {
274 bResult = FALSE;
275 }
276 }
277
278 // Save resource list
279 if (bResult)
280 {
281 if (DBBegin(hdb))
282 {
283 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM cluster_resources WHERE cluster_id=%d"), m_id);
284 DBQuery(hdb, szQuery);
285 for(i = 0; i < m_dwNumResources; i++)
286 {
287 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("INSERT INTO cluster_resources (cluster_id,resource_id,resource_name,ip_addr,current_owner) VALUES (%d,%d,%s,'%s',%d)"),
288 m_id, m_pResourceList[i].dwId, (const TCHAR *)DBPrepareString(hdb, m_pResourceList[i].szName),
289 m_pResourceList[i].ipAddr.toString(szIpAddr),
290 m_pResourceList[i].dwCurrOwner);
291 bResult = DBQuery(hdb, szQuery);
292 if (!bResult)
293 break;
294 }
295 if (bResult)
296 DBCommit(hdb);
297 else
298 DBRollback(hdb);
299 }
300 else
301 {
302 bResult = FALSE;
303 }
304 }
305 }
306
307 // Save access list
308 saveACLToDB(hdb);
309
310 // Clear modifications flag and unlock object
311 if (bResult)
312 m_isModified = false;
313 unlockProperties();
314
315 return bResult;
316 }
317
318 /**
319 * Delete object from database
320 */
321 bool Cluster::deleteFromDatabase(DB_HANDLE hdb)
322 {
323 bool success = DataCollectionTarget::deleteFromDatabase(hdb);
324 if (success)
325 {
326 success = executeQueryOnObject(hdb, _T("DELETE FROM clusters WHERE id=?"));
327 if (success)
328 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_members WHERE cluster_id=?"));
329 if (success)
330 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=?"));
331 }
332 return success;
333 }
334
335 /**
336 * Create CSCP message with object's data
337 */
338 void Cluster::fillMessageInternal(NXCPMessage *pMsg)
339 {
340 UINT32 i, dwId;
341
342 DataCollectionTarget::fillMessageInternal(pMsg);
343 pMsg->setField(VID_CLUSTER_TYPE, m_dwClusterType);
344 pMsg->setField(VID_ZONE_ID, m_zoneId);
345
346 pMsg->setField(VID_NUM_SYNC_SUBNETS, (UINT32)m_syncNetworks->size());
347 for(i = 0, dwId = VID_SYNC_SUBNETS_BASE; i < (UINT32)m_syncNetworks->size(); i++)
348 pMsg->setField(dwId++, *(m_syncNetworks->get(i)));
349
350 pMsg->setField(VID_NUM_RESOURCES, m_dwNumResources);
351 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < m_dwNumResources; i++, dwId += 6)
352 {
353 pMsg->setField(dwId++, m_pResourceList[i].dwId);
354 pMsg->setField(dwId++, m_pResourceList[i].szName);
355 pMsg->setField(dwId++, m_pResourceList[i].ipAddr);
356 pMsg->setField(dwId++, m_pResourceList[i].dwCurrOwner);
357 }
358 }
359
360 /**
361 * Modify object from message
362 */
363 UINT32 Cluster::modifyFromMessageInternal(NXCPMessage *pRequest)
364 {
365 // Change cluster type
366 if (pRequest->isFieldExist(VID_CLUSTER_TYPE))
367 m_dwClusterType = pRequest->getFieldAsUInt32(VID_CLUSTER_TYPE);
368
369 // Change sync subnets
370 if (pRequest->isFieldExist(VID_NUM_SYNC_SUBNETS))
371 {
372 m_syncNetworks->clear();
373 int count = pRequest->getFieldAsInt32(VID_NUM_SYNC_SUBNETS);
374 UINT32 fieldId = VID_SYNC_SUBNETS_BASE;
375 for(int i = 0; i < count; i++)
376 {
377 m_syncNetworks->add(new InetAddress(pRequest->getFieldAsInetAddress(fieldId++)));
378 }
379 }
380
381 // Change resource list
382 if (pRequest->isFieldExist(VID_NUM_RESOURCES))
383 {
384 UINT32 i, j, dwId, dwCount;
385 CLUSTER_RESOURCE *pList;
386
387 dwCount = pRequest->getFieldAsUInt32(VID_NUM_RESOURCES);
388 if (dwCount > 0)
389 {
390 pList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * dwCount);
391 memset(pList, 0, sizeof(CLUSTER_RESOURCE) * dwCount);
392 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < dwCount; i++, dwId += 7)
393 {
394 pList[i].dwId = pRequest->getFieldAsUInt32(dwId++);
395 pRequest->getFieldAsString(dwId++, pList[i].szName, MAX_DB_STRING);
396 pList[i].ipAddr = pRequest->getFieldAsInetAddress(dwId++);
397 }
398
399 // Update current owner information in existing resources
400 for(i = 0; i < m_dwNumResources; i++)
401 {
402 for(j = 0; j < dwCount; j++)
403 {
404 if (m_pResourceList[i].dwId == pList[j].dwId)
405 {
406 pList[j].dwCurrOwner = m_pResourceList[i].dwCurrOwner;
407 break;
408 }
409 }
410 }
411
412 // Replace list
413 safe_free(m_pResourceList);
414 m_pResourceList = pList;
415 }
416 else
417 {
418 safe_free_and_null(m_pResourceList);
419 }
420 m_dwNumResources = dwCount;
421 }
422
423 return DataCollectionTarget::modifyFromMessageInternal(pRequest);
424 }
425
426 /**
427 * Check if given address is within sync network
428 */
429 bool Cluster::isSyncAddr(const InetAddress& addr)
430 {
431 bool bRet = false;
432
433 lockProperties();
434 for(int i = 0; i < m_syncNetworks->size(); i++)
435 {
436 if (m_syncNetworks->get(i)->contain(addr))
437 {
438 bRet = true;
439 break;
440 }
441 }
442 unlockProperties();
443 return bRet;
444 }
445
446 /**
447 * Check if given address is a resource address
448 */
449 bool Cluster::isVirtualAddr(const InetAddress& addr)
450 {
451 UINT32 i;
452 bool bRet = false;
453
454 lockProperties();
455 for(i = 0; i < m_dwNumResources; i++)
456 {
457 if (m_pResourceList[i].ipAddr.equals(addr))
458 {
459 bRet = true;
460 break;
461 }
462 }
463 unlockProperties();
464 return bRet;
465 }
466
467 /**
468 * Entry point for status poller thread
469 */
470 void Cluster::statusPoll(PollerInfo *poller)
471 {
472 poller->startExecution();
473 statusPoll(NULL, 0, poller);
474 delete poller;
475 }
476
477 /**
478 * Status poll
479 */
480 void Cluster::statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
481 {
482 UINT32 i, j, k, dwPollListSize;
483 InterfaceList *pIfList;
484 BOOL bModified = FALSE, bAllDown;
485 BYTE *pbResourceFound;
486 NetObj **ppPollList;
487
488 // Create polling list
489 ppPollList = (NetObj **)malloc(sizeof(NetObj *) * m_dwChildCount);
490 LockChildList(FALSE);
491 for(i = 0, dwPollListSize = 0; i < m_dwChildCount; i++)
492 if ((m_pChildList[i]->Status() != STATUS_UNMANAGED) &&
493 (m_pChildList[i]->getObjectClass() == OBJECT_NODE))
494 {
495 m_pChildList[i]->incRefCount();
496 ((Node *)m_pChildList[i])->lockForStatusPoll();
497 ppPollList[dwPollListSize++] = m_pChildList[i];
498 }
499 UnlockChildList();
500
501 // Perform status poll on all member nodes
502 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling member nodes"), m_name);
503 for(i = 0, bAllDown = TRUE; i < dwPollListSize; i++)
504 {
505 ((Node *)ppPollList[i])->statusPoll(pSession, dwRqId, poller);
506 if (!((Node *)ppPollList[i])->isDown())
507 bAllDown = FALSE;
508 }
509
510 if (bAllDown)
511 {
512 if (!(m_dwFlags & CLF_DOWN))
513 {
514 m_dwFlags |= CLF_DOWN;
515 PostEvent(EVENT_CLUSTER_DOWN, m_id, NULL);
516 }
517 }
518 else
519 {
520 if (m_dwFlags & CLF_DOWN)
521 {
522 m_dwFlags &= ~CLF_DOWN;
523 PostEvent(EVENT_CLUSTER_UP, m_id, NULL);
524 }
525 }
526
527 // Check for cluster resource movement
528 if (!bAllDown)
529 {
530 pbResourceFound = (BYTE *)malloc(m_dwNumResources);
531 memset(pbResourceFound, 0, m_dwNumResources);
532
533 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling resources"), m_name);
534 for(i = 0; i < dwPollListSize; i++)
535 {
536 pIfList = ((Node *)ppPollList[i])->getInterfaceList();
537 if (pIfList != NULL)
538 {
539 lockProperties();
540 for(j = 0; j < (UINT32)pIfList->size(); j++)
541 {
542 for(k = 0; k < m_dwNumResources; k++)
543 {
544 if (pIfList->get(j)->hasAddress(m_pResourceList[k].ipAddr))
545 {
546 if (m_pResourceList[k].dwCurrOwner != ppPollList[i]->getId())
547 {
548 DbgPrintf(5, _T("CLUSTER STATUS POLL [%s]: Resource %s owner changed"),
549 m_name, m_pResourceList[k].szName);
550
551 // Resource moved or go up
552 if (m_pResourceList[k].dwCurrOwner == 0)
553 {
554 // Resource up
555 PostEvent(EVENT_CLUSTER_RESOURCE_UP, m_id, "dsds",
556 m_pResourceList[k].dwId, m_pResourceList[k].szName,
557 ppPollList[i]->getId(), ppPollList[i]->getName());
558 }
559 else
560 {
561 // Moved
562 NetObj *pObject;
563
564 pObject = FindObjectById(m_pResourceList[k].dwCurrOwner);
565 PostEvent(EVENT_CLUSTER_RESOURCE_MOVED, m_id, "dsdsds",
566 m_pResourceList[k].dwId, m_pResourceList[k].szName,
567 m_pResourceList[k].dwCurrOwner,
568 (pObject != NULL) ? pObject->getName() : _T("<unknown>"),
569 ppPollList[i]->getId(), ppPollList[i]->getName());
570 }
571 m_pResourceList[k].dwCurrOwner = ppPollList[i]->getId();
572 bModified = TRUE;
573 }
574 pbResourceFound[k] = 1;
575 }
576 }
577 }
578 unlockProperties();
579 delete pIfList;
580 }
581 else
582 {
583 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Cannot get interface list from %s"),
584 m_name, ppPollList[i]->getName());
585 }
586 }
587
588 // Check for missing virtual addresses
589 lockProperties();
590 for(i = 0; i < m_dwNumResources; i++)
591 {
592 if ((!pbResourceFound[i]) && (m_pResourceList[i].dwCurrOwner != 0))
593 {
594 NetObj *pObject;
595
596 pObject = FindObjectById(m_pResourceList[i].dwCurrOwner);
597 PostEvent(EVENT_CLUSTER_RESOURCE_DOWN, m_id, "dsds",
598 m_pResourceList[i].dwId, m_pResourceList[i].szName,
599 m_pResourceList[i].dwCurrOwner,
600 (pObject != NULL) ? pObject->getName() : _T("<unknown>"));
601 m_pResourceList[i].dwCurrOwner = 0;
602 bModified = TRUE;
603 }
604 }
605 unlockProperties();
606 safe_free(pbResourceFound);
607 }
608
609 // Cleanup
610 for(i = 0; i < dwPollListSize; i++)
611 {
612 ppPollList[i]->decRefCount();
613 }
614 safe_free(ppPollList);
615
616 lockProperties();
617 if (bModified)
618 setModified();
619 m_tmLastPoll = time(NULL);
620 m_dwFlags &= ~CLF_QUEUED_FOR_STATUS_POLL;
621 unlockProperties();
622
623 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Finished"), m_name);
624 }
625
626 /**
627 * Check if node is current owner of resource
628 */
629 bool Cluster::isResourceOnNode(UINT32 dwResource, UINT32 dwNode)
630 {
631 bool bRet = FALSE;
632 UINT32 i;
633
634 lockProperties();
635 for(i = 0; i < m_dwNumResources; i++)
636 {
637 if (m_pResourceList[i].dwId == dwResource)
638 {
639 if (m_pResourceList[i].dwCurrOwner == dwNode)
640 bRet = true;
641 break;
642 }
643 }
644 unlockProperties();
645 return bRet;
646 }
647
648 /**
649 * Collect aggregated data for cluster nodes - single value
650 */
651 UINT32 Cluster::collectAggregatedData(DCItem *item, TCHAR *buffer)
652 {
653 LockChildList(TRUE);
654 ItemValue **values = (ItemValue **)malloc(sizeof(ItemValue *) * m_dwChildCount);
655 int valueCount = 0;
656 for(UINT32 i = 0; i < m_dwChildCount; i++)
657 {
658 if (m_pChildList[i]->getObjectClass() != OBJECT_NODE)
659 continue;
660
661 Node *node = (Node *)m_pChildList[i];
662 DCObject *dco = node->getDCObjectByTemplateId(item->getId());
663 if ((dco != NULL) &&
664 (dco->getType() == DCO_TYPE_ITEM) &&
665 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
666 (dco->getErrorCount() == 0) &&
667 dco->matchClusterResource())
668 {
669 ItemValue *v = ((DCItem *)dco)->getInternalLastValue();
670 if (v != NULL)
671 values[valueCount++] = v;
672 }
673 }
674 UnlockChildList();
675
676 UINT32 rcc = DCE_SUCCESS;
677 if (valueCount > 0)
678 {
679 ItemValue result;
680 switch(item->getAggregationFunction())
681 {
682 case DCF_FUNCTION_AVG:
683 CalculateItemValueAverage(result, item->getDataType(), valueCount, values);
684 break;
685 case DCF_FUNCTION_MAX:
686 CalculateItemValueMax(result, item->getDataType(), valueCount, values);
687 break;
688 case DCF_FUNCTION_MIN:
689 CalculateItemValueMin(result, item->getDataType(), valueCount, values);
690 break;
691 case DCF_FUNCTION_SUM:
692 CalculateItemValueTotal(result, item->getDataType(), valueCount, values);
693 break;
694 default:
695 rcc = DCE_NOT_SUPPORTED;
696 break;
697 }
698 nx_strncpy(buffer, result.getString(), MAX_RESULT_LENGTH);
699 }
700 else
701 {
702 rcc = DCE_COMM_ERROR;
703 }
704
705 for(int i = 0; i < valueCount; i++)
706 delete values[i];
707 safe_free(values);
708
709 return rcc;
710 }
711
712 /**
713 * Collect aggregated data for cluster nodes - table
714 */
715 UINT32 Cluster::collectAggregatedData(DCTable *table, Table **result)
716 {
717 LockChildList(TRUE);
718 Table **values = (Table **)malloc(sizeof(Table *) * m_dwChildCount);
719 int valueCount = 0;
720 for(UINT32 i = 0; i < m_dwChildCount; i++)
721 {
722 if (m_pChildList[i]->getObjectClass() != OBJECT_NODE)
723 continue;
724
725 Node *node = (Node *)m_pChildList[i];
726 DCObject *dco = node->getDCObjectByTemplateId(table->getId());
727 if ((dco != NULL) &&
728 (dco->getType() == DCO_TYPE_TABLE) &&
729 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
730 (dco->getErrorCount() == 0) &&
731 dco->matchClusterResource())
732 {
733 Table *v = ((DCTable *)dco)->getLastValue();
734 if (v != NULL)
735 values[valueCount++] = v;
736 }
737 }
738 UnlockChildList();
739
740 UINT32 rcc = DCE_SUCCESS;
741 if (valueCount > 0)
742 {
743 *result = new Table(values[0]);
744 for(int i = 1; i < valueCount; i++)
745 table->mergeValues(*result, values[i], i);
746 }
747 else
748 {
749 rcc = DCE_COMM_ERROR;
750 }
751
752 for(int i = 0; i < valueCount; i++)
753 values[i]->decRefCount();
754 safe_free(values);
755
756 return rcc;
757 }
758
759 /**
760 * Unbind cluster from template
761 */
762 void Cluster::unbindFromTemplate(UINT32 dwTemplateId, bool removeDCI)
763 {
764 DataCollectionTarget::unbindFromTemplate(dwTemplateId, removeDCI);
765 queueUpdate();
766 }