object save optimization - object properties divided into groups and anly modified...
[public/netxms.git] / src / server / core / cluster.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: cluster.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Cluster class default constructor
27 */
28 Cluster::Cluster() : DataCollectionTarget()
29 {
30 m_dwClusterType = 0;
31 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
32 m_dwNumResources = 0;
33 m_pResourceList = NULL;
34 m_zoneUIN = 0;
35 }
36
37 /**
38 * Cluster class new object constructor
39 */
40 Cluster::Cluster(const TCHAR *pszName, UINT32 zoneUIN) : DataCollectionTarget(pszName)
41 {
42 m_dwClusterType = 0;
43 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
44 m_dwNumResources = 0;
45 m_pResourceList = NULL;
46 m_zoneUIN = zoneUIN;
47 }
48
49 /**
50 * Destructor
51 */
52 Cluster::~Cluster()
53 {
54 delete m_syncNetworks;
55 safe_free(m_pResourceList);
56 }
57
58 /**
59 * Create object from database data
60 */
61 bool Cluster::loadFromDatabase(DB_HANDLE hdb, UINT32 dwId)
62 {
63 TCHAR szQuery[256];
64 bool bResult = false;
65 DB_RESULT hResult;
66 UINT32 dwNodeId;
67 NetObj *pObject;
68 int i, nRows;
69
70 m_id = dwId;
71 if (!loadCommonProperties(hdb))
72 {
73 nxlog_debug(2, _T("Cannot load common properties for cluster object %d"), dwId);
74 return false;
75 }
76
77 _sntprintf(szQuery, 256, _T("SELECT cluster_type,zone_guid FROM clusters WHERE id=%d"), (int)m_id);
78 hResult = DBSelect(hdb, szQuery);
79 if (hResult == NULL)
80 return false;
81
82 m_dwClusterType = DBGetFieldULong(hResult, 0, 0);
83 m_zoneUIN = DBGetFieldULong(hResult, 0, 1);
84 DBFreeResult(hResult);
85
86 // Load DCI and access list
87 loadACLFromDB(hdb);
88 loadItemsFromDB(hdb);
89 for(i = 0; i < m_dcObjects->size(); i++)
90 if (!m_dcObjects->get(i)->loadThresholdsFromDB(hdb))
91 return false;
92
93 if (!m_isDeleted)
94 {
95 // Load member nodes
96 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT node_id FROM cluster_members WHERE cluster_id=%d"), m_id);
97 hResult = DBSelect(hdb, szQuery);
98 if (hResult != NULL)
99 {
100 nRows = DBGetNumRows(hResult);
101 for(i = 0; i < nRows; i++)
102 {
103 dwNodeId = DBGetFieldULong(hResult, i, 0);
104 pObject = FindObjectById(dwNodeId);
105 if (pObject != NULL)
106 {
107 if (pObject->getObjectClass() == OBJECT_NODE)
108 {
109 addChild(pObject);
110 pObject->addParent(this);
111 }
112 else
113 {
114 nxlog_write(MSG_CLUSTER_MEMBER_NOT_NODE, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
115 break;
116 }
117 }
118 else
119 {
120 nxlog_write(MSG_INVALID_CLUSTER_MEMBER, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
121 break;
122 }
123 }
124 if (i == nRows)
125 bResult = true;
126 DBFreeResult(hResult);
127 }
128
129 // Load sync net list
130 if (bResult)
131 {
132 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT subnet_addr,subnet_mask FROM cluster_sync_subnets WHERE cluster_id=%d"), m_id);
133 hResult = DBSelect(hdb, szQuery);
134 if (hResult != NULL)
135 {
136 int count = DBGetNumRows(hResult);
137 for(i = 0; i < count; i++)
138 {
139 InetAddress *addr = new InetAddress(DBGetFieldInetAddr(hResult, i, 0));
140 addr->setMaskBits(DBGetFieldLong(hResult, i, 1));
141 m_syncNetworks->add(addr);
142 }
143 DBFreeResult(hResult);
144 }
145 else
146 {
147 bResult = false;
148 }
149 }
150
151 // Load resources
152 if (bResult)
153 {
154 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT resource_id,resource_name,ip_addr,current_owner FROM cluster_resources WHERE cluster_id=%d"), m_id);
155 hResult = DBSelect(hdb, szQuery);
156 if (hResult != NULL)
157 {
158 m_dwNumResources = DBGetNumRows(hResult);
159 if (m_dwNumResources > 0)
160 {
161 m_pResourceList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * m_dwNumResources);
162 for(i = 0; i < (int)m_dwNumResources; i++)
163 {
164 m_pResourceList[i].dwId = DBGetFieldULong(hResult, i, 0);
165 DBGetField(hResult, i, 1, m_pResourceList[i].szName, MAX_DB_STRING);
166 m_pResourceList[i].ipAddr = DBGetFieldInetAddr(hResult, i, 2);
167 m_pResourceList[i].dwCurrOwner = DBGetFieldULong(hResult, i, 3);
168 }
169 }
170 DBFreeResult(hResult);
171 }
172 else
173 {
174 bResult = false;
175 }
176 }
177 }
178 else
179 {
180 bResult = true;
181 }
182
183 return bResult;
184 }
185
186 /**
187 * Called by client session handler to check if threshold summary should be shown for this object.
188 */
189 bool Cluster::showThresholdSummary()
190 {
191 return true;
192 }
193
194 /**
195 * Save object to database
196 */
197 bool Cluster::saveToDatabase(DB_HANDLE hdb)
198 {
199 lockProperties();
200 bool success = saveCommonProperties(hdb);
201 if (!success)
202 {
203 unlockProperties();
204 return false;
205 }
206
207 DB_STATEMENT hStmt;
208 if (IsDatabaseRecordExist(hdb, _T("clusters"), _T("id"), m_id))
209 hStmt = DBPrepare(hdb, _T("UPDATE clusters SET cluster_type=?,zone_guid=? WHERE id=?"));
210 else
211 hStmt = DBPrepare(hdb, _T("INSERT INTO clusters (cluster_type,zone_guid,id) VALUES (?,?,?)"));
212 if (hStmt != NULL)
213 {
214 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_dwClusterType);
215 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_zoneUIN);
216 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, m_id);
217 success = DBExecute(hStmt);
218 DBFreeStatement(hStmt);
219 }
220 else
221 {
222 success = false;
223 }
224
225 if (success)
226 {
227 success = saveACLToDB(hdb);
228 }
229 unlockProperties();
230
231 if (success && (m_modified & MODIFY_DATA_COLLECTION))
232 {
233 lockDciAccess(false);
234 for(int i = 0; (i < m_dcObjects->size()) && success; i++)
235 success = m_dcObjects->get(i)->saveToDatabase(hdb);
236 unlockDciAccess();
237 }
238
239 if (success)
240 {
241 // Save cluster members list
242 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_members WHERE cluster_id=?"));
243 if (hStmt != NULL)
244 {
245 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
246 success = DBExecute(hStmt);
247 DBFreeStatement(hStmt);
248 }
249 else
250 {
251 success = false;
252 }
253
254 if (success)
255 {
256 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_members (cluster_id,node_id) VALUES (?,?)"));
257 if (hStmt != NULL)
258 {
259 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
260 lockChildList(false);
261 for(int i = 0; (i < m_childList->size()) && success; i++)
262 {
263 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
264 continue;
265
266 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_childList->get(i)->getId());
267 success = DBExecute(hStmt);
268 }
269 unlockChildList();
270 DBFreeStatement(hStmt);
271 }
272 else
273 {
274 success = false;
275 }
276 }
277 }
278
279 if (success)
280 {
281 // Save sync net list
282 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=?"));
283 if (hStmt != NULL)
284 {
285 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
286 success = DBExecute(hStmt);
287 DBFreeStatement(hStmt);
288 }
289 else
290 {
291 success = false;
292 }
293
294 if (success)
295 {
296 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_sync_subnets (cluster_id,subnet_addr,subnet_mask) VALUES (?,?,?)"));
297 if (hStmt != NULL)
298 {
299 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
300 lockProperties();
301 for(int i = 0; (i < m_syncNetworks->size()) && success; i++)
302 {
303 const InetAddress *net = m_syncNetworks->get(i);
304 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, net->toString(), DB_BIND_TRANSIENT);
305 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, net->getMaskBits());
306 success = DBExecute(hStmt);
307 }
308 unlockProperties();
309 DBFreeStatement(hStmt);
310 }
311 else
312 {
313 success = false;
314 }
315 }
316 }
317
318 if (success)
319 {
320 // Save resource list
321 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_resources WHERE cluster_id=?"));
322 if (hStmt != NULL)
323 {
324 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
325 success = DBExecute(hStmt);
326 DBFreeStatement(hStmt);
327 }
328 else
329 {
330 success = false;
331 }
332
333 if (success)
334 {
335 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_resources (cluster_id,resource_id,resource_name,ip_addr,current_owner) VALUES (?,?,?,?,?)"));
336 if (hStmt != NULL)
337 {
338 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
339 lockProperties();
340 for(UINT32 i = 0; (i < m_dwNumResources) && success; i++)
341 {
342 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_pResourceList[i].dwId);
343 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_pResourceList[i].szName, DB_BIND_STATIC);
344 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, m_pResourceList[i].ipAddr.toString(), DB_BIND_TRANSIENT);
345 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_pResourceList[i].dwCurrOwner);
346 success = DBExecute(hStmt);
347 }
348 unlockProperties();
349 DBFreeStatement(hStmt);
350 }
351 else
352 {
353 success = false;
354 }
355 }
356 }
357
358 // Clear modifications flag
359 lockProperties();
360 m_modified = 0;
361 unlockProperties();
362 return success;
363 }
364
365 /**
366 * Delete object from database
367 */
368 bool Cluster::deleteFromDatabase(DB_HANDLE hdb)
369 {
370 bool success = DataCollectionTarget::deleteFromDatabase(hdb);
371 if (success)
372 {
373 success = executeQueryOnObject(hdb, _T("DELETE FROM clusters WHERE id=?"));
374 if (success)
375 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_members WHERE cluster_id=?"));
376 if (success)
377 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=?"));
378 }
379 return success;
380 }
381
382 /**
383 * Create CSCP message with object's data
384 */
385 void Cluster::fillMessageInternal(NXCPMessage *pMsg)
386 {
387 UINT32 i, dwId;
388
389 DataCollectionTarget::fillMessageInternal(pMsg);
390 pMsg->setField(VID_CLUSTER_TYPE, m_dwClusterType);
391 pMsg->setField(VID_ZONE_UIN, m_zoneUIN);
392
393 pMsg->setField(VID_NUM_SYNC_SUBNETS, (UINT32)m_syncNetworks->size());
394 for(i = 0, dwId = VID_SYNC_SUBNETS_BASE; i < (UINT32)m_syncNetworks->size(); i++)
395 pMsg->setField(dwId++, *(m_syncNetworks->get(i)));
396
397 pMsg->setField(VID_NUM_RESOURCES, m_dwNumResources);
398 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < m_dwNumResources; i++, dwId += 6)
399 {
400 pMsg->setField(dwId++, m_pResourceList[i].dwId);
401 pMsg->setField(dwId++, m_pResourceList[i].szName);
402 pMsg->setField(dwId++, m_pResourceList[i].ipAddr);
403 pMsg->setField(dwId++, m_pResourceList[i].dwCurrOwner);
404 }
405 }
406
407 /**
408 * Modify object from message
409 */
410 UINT32 Cluster::modifyFromMessageInternal(NXCPMessage *pRequest)
411 {
412 if (pRequest->isFieldExist(VID_FLAGS))
413 m_flags = pRequest->getFieldAsUInt32(VID_FLAGS);
414
415 // Change cluster type
416 if (pRequest->isFieldExist(VID_CLUSTER_TYPE))
417 m_dwClusterType = pRequest->getFieldAsUInt32(VID_CLUSTER_TYPE);
418
419 // Change sync subnets
420 if (pRequest->isFieldExist(VID_NUM_SYNC_SUBNETS))
421 {
422 m_syncNetworks->clear();
423 int count = pRequest->getFieldAsInt32(VID_NUM_SYNC_SUBNETS);
424 UINT32 fieldId = VID_SYNC_SUBNETS_BASE;
425 for(int i = 0; i < count; i++)
426 {
427 m_syncNetworks->add(new InetAddress(pRequest->getFieldAsInetAddress(fieldId++)));
428 }
429 }
430
431 // Change resource list
432 if (pRequest->isFieldExist(VID_NUM_RESOURCES))
433 {
434 UINT32 i, j, dwId, dwCount;
435 CLUSTER_RESOURCE *pList;
436
437 dwCount = pRequest->getFieldAsUInt32(VID_NUM_RESOURCES);
438 if (dwCount > 0)
439 {
440 pList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * dwCount);
441 memset(pList, 0, sizeof(CLUSTER_RESOURCE) * dwCount);
442 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < dwCount; i++, dwId += 7)
443 {
444 pList[i].dwId = pRequest->getFieldAsUInt32(dwId++);
445 pRequest->getFieldAsString(dwId++, pList[i].szName, MAX_DB_STRING);
446 pList[i].ipAddr = pRequest->getFieldAsInetAddress(dwId++);
447 }
448
449 // Update current owner information in existing resources
450 for(i = 0; i < m_dwNumResources; i++)
451 {
452 for(j = 0; j < dwCount; j++)
453 {
454 if (m_pResourceList[i].dwId == pList[j].dwId)
455 {
456 pList[j].dwCurrOwner = m_pResourceList[i].dwCurrOwner;
457 break;
458 }
459 }
460 }
461
462 // Replace list
463 safe_free(m_pResourceList);
464 m_pResourceList = pList;
465 }
466 else
467 {
468 safe_free_and_null(m_pResourceList);
469 }
470 m_dwNumResources = dwCount;
471 }
472
473 return DataCollectionTarget::modifyFromMessageInternal(pRequest);
474 }
475
476 /**
477 * Check if given address is within sync network
478 */
479 bool Cluster::isSyncAddr(const InetAddress& addr)
480 {
481 bool bRet = false;
482
483 lockProperties();
484 for(int i = 0; i < m_syncNetworks->size(); i++)
485 {
486 if (m_syncNetworks->get(i)->contain(addr))
487 {
488 bRet = true;
489 break;
490 }
491 }
492 unlockProperties();
493 return bRet;
494 }
495
496 /**
497 * Check if given address is a resource address
498 */
499 bool Cluster::isVirtualAddr(const InetAddress& addr)
500 {
501 UINT32 i;
502 bool bRet = false;
503
504 lockProperties();
505 for(i = 0; i < m_dwNumResources; i++)
506 {
507 if (m_pResourceList[i].ipAddr.equals(addr))
508 {
509 bRet = true;
510 break;
511 }
512 }
513 unlockProperties();
514 return bRet;
515 }
516
517 /**
518 * Configuration poll
519 */
520 void Cluster::configurationPoll(PollerInfo *poller, ClientSession *pSession, UINT32 dwRqId)
521 {
522 if (m_runtimeFlags & DCDF_DELETE_IN_PROGRESS)
523 {
524 if (dwRqId == 0)
525 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
526 return;
527 }
528
529 poller->setStatus(_T("wait for lock"));
530 pollerLock();
531
532 if (IsShutdownInProgress())
533 {
534 pollerUnlock();
535 return;
536 }
537
538 m_pollRequestor = pSession;
539 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Applying templates\r\n"), m_name);
540 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Applying templates"), m_name);
541 if (ConfigReadInt(_T("ClusterTemplateAutoApply"), 0))
542 applyUserTemplates();
543
544 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Updating container bindings\r\n"), m_name);
545 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Updating container bindings"), m_name);
546 if (ConfigReadInt(_T("ClusterContainerAutoBind"), 0))
547 updateContainerMembership();
548
549 lockProperties();
550 m_lastConfigurationPoll = time(NULL);
551 m_runtimeFlags &= ~DCDF_QUEUED_FOR_CONFIGURATION_POLL;
552 unlockProperties();
553
554 poller->setStatus(_T("hook"));
555 executeHookScript(_T("ConfigurationPoll"));
556
557 sendPollerMsg(dwRqId, _T("CLUSTER CONFIGURATION POLL [%s]: Finished\r\n"), m_name);
558 DbgPrintf(6, _T("CLUSTER CONFIGURATION POLL [%s]: Finished"), m_name);
559
560 pollerUnlock();
561 m_runtimeFlags |= DCDF_CONFIGURATION_POLL_PASSED;
562 }
563
564 /**
565 * Status poll
566 */
567 void Cluster::statusPoll(PollerInfo *poller, ClientSession *pSession, UINT32 dwRqId)
568 {
569 if (m_runtimeFlags & DCDF_DELETE_IN_PROGRESS)
570 {
571 if (dwRqId == 0)
572 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
573 return;
574 }
575
576 poller->setStatus(_T("wait for lock"));
577 pollerLock();
578
579 if (IsShutdownInProgress())
580 {
581 pollerUnlock();
582 return;
583 }
584
585 UINT32 modified = 0;
586
587 // Create polling list
588 ObjectArray<DataCollectionTarget> pollList(m_childList->size(), 16, false);
589 lockChildList(false);
590 int i;
591 for(i = 0; i < m_childList->size(); i++)
592 {
593 NetObj *object = m_childList->get(i);
594 if ((object->getStatus() != STATUS_UNMANAGED) && object->isDataCollectionTarget())
595 {
596 object->incRefCount();
597 static_cast<DataCollectionTarget*>(object)->lockForStatusPoll();
598 pollList.add(static_cast<DataCollectionTarget*>(object));
599 }
600 }
601 unlockChildList();
602
603 // Perform status poll on all member nodes
604 m_pollRequestor = pSession;
605 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Polling member nodes\r\n"), m_name);
606 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling member nodes"), m_name);
607 bool allDown = true;
608 for(i = 0; i < pollList.size(); i++)
609 {
610 DataCollectionTarget *object = pollList.get(i);
611 object->statusPollPollerEntry(poller, pSession, dwRqId);
612 if ((object->getObjectClass() == OBJECT_NODE) && !static_cast<Node*>(object)->isDown())
613 allDown = false;
614 }
615
616 if (allDown)
617 {
618 if (!(m_state & CLSF_DOWN))
619 {
620 m_state |= CLSF_DOWN;
621 PostEvent(EVENT_CLUSTER_DOWN, m_id, NULL);
622 }
623 }
624 else
625 {
626 if (m_state & CLSF_DOWN)
627 {
628 m_state &= ~CLSF_DOWN;
629 PostEvent(EVENT_CLUSTER_UP, m_id, NULL);
630 }
631 }
632
633 // Check for cluster resource movement
634 if (!allDown)
635 {
636 BYTE *resourceFound = (BYTE *)calloc(m_dwNumResources, 1);
637
638 poller->setStatus(_T("resource poll"));
639 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Polling resources\r\n"), m_name);
640 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling resources"), m_name);
641 for(i = 0; i < pollList.size(); i++)
642 {
643 if (pollList.get(i)->getObjectClass() != OBJECT_NODE)
644 continue;
645
646 Node *node = static_cast<Node*>(pollList.get(i));
647 InterfaceList *pIfList = node->getInterfaceList();
648 if (pIfList != NULL)
649 {
650 lockProperties();
651 for(int j = 0; j < pIfList->size(); j++)
652 {
653 for(UINT32 k = 0; k < m_dwNumResources; k++)
654 {
655 if (pIfList->get(j)->hasAddress(m_pResourceList[k].ipAddr))
656 {
657 if (m_pResourceList[k].dwCurrOwner != node->getId())
658 {
659 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Resource %s owner changed\r\n"), m_name, m_pResourceList[k].szName);
660 DbgPrintf(5, _T("CLUSTER STATUS POLL [%s]: Resource %s owner changed"),
661 m_name, m_pResourceList[k].szName);
662
663 // Resource moved or go up
664 if (m_pResourceList[k].dwCurrOwner == 0)
665 {
666 // Resource up
667 PostEvent(EVENT_CLUSTER_RESOURCE_UP, m_id, "dsds",
668 m_pResourceList[k].dwId, m_pResourceList[k].szName,
669 node->getId(), node->getName());
670 }
671 else
672 {
673 // Moved
674 NetObj *pObject = FindObjectById(m_pResourceList[k].dwCurrOwner);
675 PostEvent(EVENT_CLUSTER_RESOURCE_MOVED, m_id, "dsdsds",
676 m_pResourceList[k].dwId, m_pResourceList[k].szName,
677 m_pResourceList[k].dwCurrOwner,
678 (pObject != NULL) ? pObject->getName() : _T("<unknown>"),
679 node->getId(), node->getName());
680 }
681 m_pResourceList[k].dwCurrOwner = node->getId();
682 modified |= MODIFIED_CLUSTER_RESOURCES;
683 }
684 resourceFound[k] = 1;
685 }
686 }
687 }
688 unlockProperties();
689 delete pIfList;
690 }
691 else
692 {
693 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Cannot get interface list from %s\r\n"), m_name, node->getName());
694 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Cannot get interface list from %s"),
695 m_name, node->getName());
696 }
697 }
698
699 // Check for missing virtual addresses
700 lockProperties();
701 for(i = 0; i < (int)m_dwNumResources; i++)
702 {
703 if ((!resourceFound[i]) && (m_pResourceList[i].dwCurrOwner != 0))
704 {
705 NetObj *pObject = FindObjectById(m_pResourceList[i].dwCurrOwner);
706 PostEvent(EVENT_CLUSTER_RESOURCE_DOWN, m_id, "dsds",
707 m_pResourceList[i].dwId, m_pResourceList[i].szName,
708 m_pResourceList[i].dwCurrOwner,
709 (pObject != NULL) ? pObject->getName() : _T("<unknown>"));
710 m_pResourceList[i].dwCurrOwner = 0;
711 modified |= MODIFIED_CLUSTER_RESOURCES;
712 }
713 }
714 unlockProperties();
715 free(resourceFound);
716 }
717
718 // Execute hook script
719 poller->setStatus(_T("hook"));
720 executeHookScript(_T("StatusPoll"));
721
722 calculateCompoundStatus(true);
723 poller->setStatus(_T("cleanup"));
724
725 // Cleanup
726 for(i = 0; i < pollList.size(); i++)
727 {
728 pollList.get(i)->decRefCount();
729 }
730
731 lockProperties();
732 if (modified != 0)
733 setModified(modified);
734 m_lastStatusPoll = time(NULL);
735 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
736 unlockProperties();
737
738 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Finished\r\n"), m_name);
739 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Finished"), m_name);
740
741 pollerUnlock();
742 }
743
744 /**
745 * Check if node is current owner of resource
746 */
747 bool Cluster::isResourceOnNode(UINT32 dwResource, UINT32 dwNode)
748 {
749 bool bRet = FALSE;
750
751 lockProperties();
752 for(UINT32 i = 0; i < m_dwNumResources; i++)
753 {
754 if (m_pResourceList[i].dwId == dwResource)
755 {
756 if (m_pResourceList[i].dwCurrOwner == dwNode)
757 bRet = true;
758 break;
759 }
760 }
761 unlockProperties();
762 return bRet;
763 }
764
765 /**
766 * Get node ID of resource owner
767 */
768 UINT32 Cluster::getResourceOwnerInternal(UINT32 id, const TCHAR *name)
769 {
770 UINT32 ownerId = 0;
771 lockProperties();
772 for(UINT32 i = 0; i < m_dwNumResources; i++)
773 {
774 if (((name != NULL) && !_tcsicmp(name, m_pResourceList[i].szName)) ||
775 (m_pResourceList[i].dwId == id))
776 {
777 ownerId = m_pResourceList[i].dwCurrOwner;
778 break;
779 }
780 }
781 unlockProperties();
782 return ownerId;
783 }
784
785 /**
786 * Collect aggregated data for cluster nodes - single value
787 */
788 UINT32 Cluster::collectAggregatedData(DCItem *item, TCHAR *buffer)
789 {
790 lockChildList(false);
791 ItemValue **values = (ItemValue **)malloc(sizeof(ItemValue *) * m_childList->size());
792 int valueCount = 0;
793 for(int i = 0; i < m_childList->size(); i++)
794 {
795 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
796 continue;
797
798 Node *node = (Node *)m_childList->get(i);
799 DCObject *dco = node->getDCObjectByTemplateId(item->getId());
800 if ((dco != NULL) &&
801 (dco->getType() == DCO_TYPE_ITEM) &&
802 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
803 ((dco->getErrorCount() == 0) || dco->isAggregateWithErrors()) &&
804 dco->matchClusterResource())
805 {
806 ItemValue *v = ((DCItem *)dco)->getInternalLastValue();
807 if (v != NULL)
808 values[valueCount++] = v;
809 }
810 }
811 unlockChildList();
812
813 UINT32 rcc = DCE_SUCCESS;
814 if (valueCount > 0)
815 {
816 ItemValue result;
817 switch(item->getAggregationFunction())
818 {
819 case DCF_FUNCTION_AVG:
820 CalculateItemValueAverage(result, item->getDataType(), valueCount, values);
821 break;
822 case DCF_FUNCTION_MAX:
823 CalculateItemValueMax(result, item->getDataType(), valueCount, values);
824 break;
825 case DCF_FUNCTION_MIN:
826 CalculateItemValueMin(result, item->getDataType(), valueCount, values);
827 break;
828 case DCF_FUNCTION_SUM:
829 CalculateItemValueTotal(result, item->getDataType(), valueCount, values);
830 break;
831 default:
832 rcc = DCE_NOT_SUPPORTED;
833 break;
834 }
835 nx_strncpy(buffer, result.getString(), MAX_RESULT_LENGTH);
836 }
837 else
838 {
839 rcc = DCE_COLLECTION_ERROR;
840 }
841
842 for(int i = 0; i < valueCount; i++)
843 delete values[i];
844 safe_free(values);
845
846 return rcc;
847 }
848
849 /**
850 * Collect aggregated data for cluster nodes - table
851 */
852 UINT32 Cluster::collectAggregatedData(DCTable *table, Table **result)
853 {
854 lockChildList(false);
855 Table **values = (Table **)malloc(sizeof(Table *) * m_childList->size());
856 int valueCount = 0;
857 for(int i = 0; i < m_childList->size(); i++)
858 {
859 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
860 continue;
861
862 Node *node = (Node *)m_childList->get(i);
863 DCObject *dco = node->getDCObjectByTemplateId(table->getId());
864 if ((dco != NULL) &&
865 (dco->getType() == DCO_TYPE_TABLE) &&
866 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
867 ((dco->getErrorCount() == 0) || dco->isAggregateWithErrors()) &&
868 dco->matchClusterResource())
869 {
870 Table *v = ((DCTable *)dco)->getLastValue();
871 if (v != NULL)
872 values[valueCount++] = v;
873 }
874 }
875 unlockChildList();
876
877 UINT32 rcc = DCE_SUCCESS;
878 if (valueCount > 0)
879 {
880 *result = new Table(values[0]);
881 for(int i = 1; i < valueCount; i++)
882 table->mergeValues(*result, values[i], i);
883 }
884 else
885 {
886 rcc = DCE_COLLECTION_ERROR;
887 }
888
889 for(int i = 0; i < valueCount; i++)
890 values[i]->decRefCount();
891 safe_free(values);
892
893 return rcc;
894 }
895
896 /**
897 * Unbind cluster from template
898 */
899 void Cluster::unbindFromTemplate(UINT32 dwTemplateId, bool removeDCI)
900 {
901 DataCollectionTarget::unbindFromTemplate(dwTemplateId, removeDCI);
902 queueUpdate();
903 }
904
905 /**
906 * Called when data collection configuration changed
907 */
908 void Cluster::onDataCollectionChange()
909 {
910 queueUpdate();
911 }
912
913 /**
914 * Create NXSL object for this object
915 */
916 NXSL_Value *Cluster::createNXSLObject()
917 {
918 return new NXSL_Value(new NXSL_Object(&g_nxslClusterClass, this));
919 }
920
921 /**
922 * Get cluster nodes as NXSL array
923 */
924 NXSL_Array *Cluster::getNodesForNXSL()
925 {
926 NXSL_Array *nodes = new NXSL_Array();
927 int index = 0;
928
929 lockChildList(false);
930 for(int i = 0; i < m_childList->size(); i++)
931 {
932 if (m_childList->get(i)->getObjectClass() == OBJECT_NODE)
933 {
934 nodes->set(index++, new NXSL_Value(new NXSL_Object(&g_nxslNodeClass, m_childList->get(i))));
935 }
936 }
937 unlockChildList();
938
939 return nodes;
940 }
941
942 /**
943 * Serialize object to JSON
944 */
945 json_t *Cluster::toJson()
946 {
947 json_t *root = DataCollectionTarget::toJson();
948 json_object_set_new(root, "clusterType", json_integer(m_dwClusterType));
949 json_object_set_new(root, "syncNetworks", json_object_array(m_syncNetworks));
950 json_object_set_new(root, "lastStatusPoll", json_integer(m_lastStatusPoll));
951 json_object_set_new(root, "lastConfigurationPoll", json_integer(m_lastConfigurationPoll));
952 json_object_set_new(root, "zoneUIN", json_integer(m_zoneUIN));
953
954 json_t *resources = json_array();
955 for(UINT32 i = 0; i < m_dwNumResources; i++)
956 {
957 json_t *r = json_object();
958 json_object_set_new(r, "id", json_integer(m_pResourceList[i].dwId));
959 json_object_set_new(r, "name", json_string_t(m_pResourceList[i].szName));
960 json_object_set_new(r, "address", m_pResourceList[i].ipAddr.toJson());
961 json_object_set_new(r, "currentOwner", json_integer(m_pResourceList[i].dwCurrOwner));
962 json_array_append_new(resources, r);
963 }
964 json_object_set_new(root, "resources", resources);
965 return root;
966 }