337d51331ccfdd33709abc238239ac3f18352045
[public/netxms.git] / src / server / core / cluster.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: cluster.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Cluster class default constructor
27 */
28 Cluster::Cluster() : DataCollectionTarget()
29 {
30 m_dwClusterType = 0;
31 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
32 m_dwNumResources = 0;
33 m_pResourceList = NULL;
34 m_zoneUIN = 0;
35 }
36
37 /**
38 * Cluster class new object constructor
39 */
40 Cluster::Cluster(const TCHAR *pszName, UINT32 zoneUIN) : DataCollectionTarget(pszName)
41 {
42 m_dwClusterType = 0;
43 m_syncNetworks = new ObjectArray<InetAddress>(8, 8, true);
44 m_dwNumResources = 0;
45 m_pResourceList = NULL;
46 m_zoneUIN = zoneUIN;
47 }
48
49 /**
50 * Destructor
51 */
52 Cluster::~Cluster()
53 {
54 delete m_syncNetworks;
55 safe_free(m_pResourceList);
56 }
57
58 /**
59 * Create object from database data
60 */
61 bool Cluster::loadFromDatabase(DB_HANDLE hdb, UINT32 dwId)
62 {
63 TCHAR szQuery[256];
64 bool bResult = false;
65 DB_RESULT hResult;
66 UINT32 dwNodeId;
67 NetObj *pObject;
68 int i, nRows;
69
70 m_id = dwId;
71 if (!loadCommonProperties(hdb))
72 {
73 nxlog_debug(2, _T("Cannot load common properties for cluster object %d"), dwId);
74 return false;
75 }
76
77 _sntprintf(szQuery, 256, _T("SELECT cluster_type,zone_guid FROM clusters WHERE id=%d"), (int)m_id);
78 hResult = DBSelect(hdb, szQuery);
79 if (hResult == NULL)
80 return false;
81
82 m_dwClusterType = DBGetFieldULong(hResult, 0, 0);
83 m_zoneUIN = DBGetFieldULong(hResult, 0, 1);
84 DBFreeResult(hResult);
85
86 // Load DCI and access list
87 loadACLFromDB(hdb);
88 loadItemsFromDB(hdb);
89 for(i = 0; i < m_dcObjects->size(); i++)
90 if (!m_dcObjects->get(i)->loadThresholdsFromDB(hdb))
91 return false;
92
93 if (!m_isDeleted)
94 {
95 // Load member nodes
96 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT node_id FROM cluster_members WHERE cluster_id=%d"), m_id);
97 hResult = DBSelect(hdb, szQuery);
98 if (hResult != NULL)
99 {
100 nRows = DBGetNumRows(hResult);
101 for(i = 0; i < nRows; i++)
102 {
103 dwNodeId = DBGetFieldULong(hResult, i, 0);
104 pObject = FindObjectById(dwNodeId);
105 if (pObject != NULL)
106 {
107 if (pObject->getObjectClass() == OBJECT_NODE)
108 {
109 addChild(pObject);
110 pObject->addParent(this);
111 }
112 else
113 {
114 nxlog_write(MSG_CLUSTER_MEMBER_NOT_NODE, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
115 break;
116 }
117 }
118 else
119 {
120 nxlog_write(MSG_INVALID_CLUSTER_MEMBER, EVENTLOG_ERROR_TYPE, "dd", m_id, dwNodeId);
121 break;
122 }
123 }
124 if (i == nRows)
125 bResult = true;
126 DBFreeResult(hResult);
127 }
128
129 // Load sync net list
130 if (bResult)
131 {
132 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT subnet_addr,subnet_mask FROM cluster_sync_subnets WHERE cluster_id=%d"), m_id);
133 hResult = DBSelect(hdb, szQuery);
134 if (hResult != NULL)
135 {
136 int count = DBGetNumRows(hResult);
137 for(i = 0; i < count; i++)
138 {
139 InetAddress *addr = new InetAddress(DBGetFieldInetAddr(hResult, i, 0));
140 addr->setMaskBits(DBGetFieldLong(hResult, i, 1));
141 m_syncNetworks->add(addr);
142 }
143 DBFreeResult(hResult);
144 }
145 else
146 {
147 bResult = false;
148 }
149 }
150
151 // Load resources
152 if (bResult)
153 {
154 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("SELECT resource_id,resource_name,ip_addr,current_owner FROM cluster_resources WHERE cluster_id=%d"), m_id);
155 hResult = DBSelect(hdb, szQuery);
156 if (hResult != NULL)
157 {
158 m_dwNumResources = DBGetNumRows(hResult);
159 if (m_dwNumResources > 0)
160 {
161 m_pResourceList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * m_dwNumResources);
162 for(i = 0; i < (int)m_dwNumResources; i++)
163 {
164 m_pResourceList[i].dwId = DBGetFieldULong(hResult, i, 0);
165 DBGetField(hResult, i, 1, m_pResourceList[i].szName, MAX_DB_STRING);
166 m_pResourceList[i].ipAddr = DBGetFieldInetAddr(hResult, i, 2);
167 m_pResourceList[i].dwCurrOwner = DBGetFieldULong(hResult, i, 3);
168 }
169 }
170 DBFreeResult(hResult);
171 }
172 else
173 {
174 bResult = false;
175 }
176 }
177 }
178 else
179 {
180 bResult = true;
181 }
182
183 return bResult;
184 }
185
186 /**
187 * Called by client session handler to check if threshold summary should be shown for this object.
188 */
189 bool Cluster::showThresholdSummary()
190 {
191 return true;
192 }
193
194 /**
195 * Save object to database
196 */
197 bool Cluster::saveToDatabase(DB_HANDLE hdb)
198 {
199 lockProperties();
200 bool success = saveCommonProperties(hdb);
201 if (!success)
202 {
203 unlockProperties();
204 return false;
205 }
206
207 DB_STATEMENT hStmt;
208 if (IsDatabaseRecordExist(hdb, _T("clusters"), _T("id"), m_id))
209 hStmt = DBPrepare(hdb, _T("UPDATE clusters SET cluster_type=?,zone_guid=? WHERE id=?"));
210 else
211 hStmt = DBPrepare(hdb, _T("INSERT INTO clusters (cluster_type,zone_guid,id) VALUES (?,?,?)"));
212 if (hStmt != NULL)
213 {
214 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_dwClusterType);
215 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_zoneUIN);
216 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, m_id);
217 success = DBExecute(hStmt);
218 DBFreeStatement(hStmt);
219 }
220 else
221 {
222 success = false;
223 }
224
225 if (success)
226 {
227 success = saveACLToDB(hdb);
228 }
229 unlockProperties();
230
231 if (success)
232 {
233 lockDciAccess(false);
234 for(int i = 0; (i < m_dcObjects->size()) && success; i++)
235 success = m_dcObjects->get(i)->saveToDatabase(hdb);
236 unlockDciAccess();
237 }
238
239 if (success)
240 {
241 // Save cluster members list
242 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_members WHERE cluster_id=?"));
243 if (hStmt != NULL)
244 {
245 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
246 success = DBExecute(hStmt);
247 DBFreeStatement(hStmt);
248 }
249 else
250 {
251 success = false;
252 }
253
254 if (success)
255 {
256 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_members (cluster_id,node_id) VALUES (?,?)"));
257 if (hStmt != NULL)
258 {
259 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
260 lockChildList(false);
261 for(int i = 0; (i < m_childList->size()) && success; i++)
262 {
263 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
264 continue;
265
266 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_childList->get(i)->getId());
267 success = DBExecute(hStmt);
268 }
269 unlockChildList();
270 DBFreeStatement(hStmt);
271 }
272 else
273 {
274 success = false;
275 }
276 }
277 }
278
279 if (success)
280 {
281 // Save sync net list
282 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=?"));
283 if (hStmt != NULL)
284 {
285 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
286 success = DBExecute(hStmt);
287 DBFreeStatement(hStmt);
288 }
289 else
290 {
291 success = false;
292 }
293
294 if (success)
295 {
296 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_sync_subnets (cluster_id,subnet_addr,subnet_mask) VALUES (?,?,?)"));
297 if (hStmt != NULL)
298 {
299 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
300 lockProperties();
301 for(int i = 0; (i < m_syncNetworks->size()) && success; i++)
302 {
303 const InetAddress *net = m_syncNetworks->get(i);
304 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, net->toString(), DB_BIND_TRANSIENT);
305 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, net->getMaskBits());
306 success = DBExecute(hStmt);
307 }
308 unlockProperties();
309 DBFreeStatement(hStmt);
310 }
311 else
312 {
313 success = false;
314 }
315 }
316 }
317
318 if (success)
319 {
320 // Save resource list
321 hStmt = DBPrepare(hdb, _T("DELETE FROM cluster_resources WHERE cluster_id=?"));
322 if (hStmt != NULL)
323 {
324 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
325 success = DBExecute(hStmt);
326 DBFreeStatement(hStmt);
327 }
328 else
329 {
330 success = false;
331 }
332
333 if (success)
334 {
335 hStmt = DBPrepare(hdb, _T("INSERT INTO cluster_resources (cluster_id,resource_id,resource_name,ip_addr,current_owner) VALUES (?,?,?,?,?)"));
336 if (hStmt != NULL)
337 {
338 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
339 lockProperties();
340 for(UINT32 i = 0; (i < m_dwNumResources) && success; i++)
341 {
342 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_pResourceList[i].dwId);
343 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_pResourceList[i].szName, DB_BIND_STATIC);
344 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, m_pResourceList[i].ipAddr.toString(), DB_BIND_TRANSIENT);
345 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_pResourceList[i].dwCurrOwner);
346 success = DBExecute(hStmt);
347 }
348 unlockProperties();
349 DBFreeStatement(hStmt);
350 }
351 else
352 {
353 success = false;
354 }
355 }
356 }
357
358 // Clear modifications flag
359 lockProperties();
360 m_isModified = false;
361 unlockProperties();
362 return success;
363 }
364
365 /**
366 * Delete object from database
367 */
368 bool Cluster::deleteFromDatabase(DB_HANDLE hdb)
369 {
370 bool success = DataCollectionTarget::deleteFromDatabase(hdb);
371 if (success)
372 {
373 success = executeQueryOnObject(hdb, _T("DELETE FROM clusters WHERE id=?"));
374 if (success)
375 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_members WHERE cluster_id=?"));
376 if (success)
377 success = executeQueryOnObject(hdb, _T("DELETE FROM cluster_sync_subnets WHERE cluster_id=?"));
378 }
379 return success;
380 }
381
382 /**
383 * Create CSCP message with object's data
384 */
385 void Cluster::fillMessageInternal(NXCPMessage *pMsg)
386 {
387 UINT32 i, dwId;
388
389 DataCollectionTarget::fillMessageInternal(pMsg);
390 pMsg->setField(VID_CLUSTER_TYPE, m_dwClusterType);
391 pMsg->setField(VID_ZONE_UIN, m_zoneUIN);
392
393 pMsg->setField(VID_NUM_SYNC_SUBNETS, (UINT32)m_syncNetworks->size());
394 for(i = 0, dwId = VID_SYNC_SUBNETS_BASE; i < (UINT32)m_syncNetworks->size(); i++)
395 pMsg->setField(dwId++, *(m_syncNetworks->get(i)));
396
397 pMsg->setField(VID_NUM_RESOURCES, m_dwNumResources);
398 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < m_dwNumResources; i++, dwId += 6)
399 {
400 pMsg->setField(dwId++, m_pResourceList[i].dwId);
401 pMsg->setField(dwId++, m_pResourceList[i].szName);
402 pMsg->setField(dwId++, m_pResourceList[i].ipAddr);
403 pMsg->setField(dwId++, m_pResourceList[i].dwCurrOwner);
404 }
405 }
406
407 /**
408 * Modify object from message
409 */
410 UINT32 Cluster::modifyFromMessageInternal(NXCPMessage *pRequest)
411 {
412 if (pRequest->isFieldExist(VID_FLAGS))
413 m_flags = pRequest->getFieldAsUInt32(VID_FLAGS);
414
415 // Change cluster type
416 if (pRequest->isFieldExist(VID_CLUSTER_TYPE))
417 m_dwClusterType = pRequest->getFieldAsUInt32(VID_CLUSTER_TYPE);
418
419 // Change sync subnets
420 if (pRequest->isFieldExist(VID_NUM_SYNC_SUBNETS))
421 {
422 m_syncNetworks->clear();
423 int count = pRequest->getFieldAsInt32(VID_NUM_SYNC_SUBNETS);
424 UINT32 fieldId = VID_SYNC_SUBNETS_BASE;
425 for(int i = 0; i < count; i++)
426 {
427 m_syncNetworks->add(new InetAddress(pRequest->getFieldAsInetAddress(fieldId++)));
428 }
429 }
430
431 // Change resource list
432 if (pRequest->isFieldExist(VID_NUM_RESOURCES))
433 {
434 UINT32 i, j, dwId, dwCount;
435 CLUSTER_RESOURCE *pList;
436
437 dwCount = pRequest->getFieldAsUInt32(VID_NUM_RESOURCES);
438 if (dwCount > 0)
439 {
440 pList = (CLUSTER_RESOURCE *)malloc(sizeof(CLUSTER_RESOURCE) * dwCount);
441 memset(pList, 0, sizeof(CLUSTER_RESOURCE) * dwCount);
442 for(i = 0, dwId = VID_RESOURCE_LIST_BASE; i < dwCount; i++, dwId += 7)
443 {
444 pList[i].dwId = pRequest->getFieldAsUInt32(dwId++);
445 pRequest->getFieldAsString(dwId++, pList[i].szName, MAX_DB_STRING);
446 pList[i].ipAddr = pRequest->getFieldAsInetAddress(dwId++);
447 }
448
449 // Update current owner information in existing resources
450 for(i = 0; i < m_dwNumResources; i++)
451 {
452 for(j = 0; j < dwCount; j++)
453 {
454 if (m_pResourceList[i].dwId == pList[j].dwId)
455 {
456 pList[j].dwCurrOwner = m_pResourceList[i].dwCurrOwner;
457 break;
458 }
459 }
460 }
461
462 // Replace list
463 safe_free(m_pResourceList);
464 m_pResourceList = pList;
465 }
466 else
467 {
468 safe_free_and_null(m_pResourceList);
469 }
470 m_dwNumResources = dwCount;
471 }
472
473 return DataCollectionTarget::modifyFromMessageInternal(pRequest);
474 }
475
476 /**
477 * Check if given address is within sync network
478 */
479 bool Cluster::isSyncAddr(const InetAddress& addr)
480 {
481 bool bRet = false;
482
483 lockProperties();
484 for(int i = 0; i < m_syncNetworks->size(); i++)
485 {
486 if (m_syncNetworks->get(i)->contain(addr))
487 {
488 bRet = true;
489 break;
490 }
491 }
492 unlockProperties();
493 return bRet;
494 }
495
496 /**
497 * Check if given address is a resource address
498 */
499 bool Cluster::isVirtualAddr(const InetAddress& addr)
500 {
501 UINT32 i;
502 bool bRet = false;
503
504 lockProperties();
505 for(i = 0; i < m_dwNumResources; i++)
506 {
507 if (m_pResourceList[i].ipAddr.equals(addr))
508 {
509 bRet = true;
510 break;
511 }
512 }
513 unlockProperties();
514 return bRet;
515 }
516
517 /**
518 * Configuration poll
519 */
520 void Cluster::configurationPoll(PollerInfo *poller, ClientSession *pSession, UINT32 dwRqId)
521 {
522 if (m_runtimeFlags & DCDF_DELETE_IN_PROGRESS)
523 {
524 if (dwRqId == 0)
525 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
526 return;
527 }
528
529 poller->setStatus(_T("wait for lock"));
530 pollerLock();
531
532 if (IsShutdownInProgress())
533 {
534 pollerUnlock();
535 return;
536 }
537
538 m_pollRequestor = pSession;
539 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Applying templates\r\n"), m_name);
540 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Applying templates"), m_name);
541 if (ConfigReadInt(_T("ClusterTemplateAutoApply"), 0))
542 applyUserTemplates();
543
544 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Updating container bindings\r\n"), m_name);
545 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Updating container bindings"), m_name);
546 if (ConfigReadInt(_T("ClusterContainerAutoBind"), 0))
547 updateContainerMembership();
548
549 lockProperties();
550 m_lastConfigurationPoll = time(NULL);
551 m_runtimeFlags &= ~DCDF_QUEUED_FOR_CONFIGURATION_POLL;
552 unlockProperties();
553
554 poller->setStatus(_T("hook"));
555 executeHookScript(_T("ConfigurationPoll"));
556
557 sendPollerMsg(dwRqId, _T("CLUSTER CONFIGURATION POLL [%s]: Finished\r\n"), m_name);
558 DbgPrintf(6, _T("CLUSTER CONFIGURATION POLL [%s]: Finished"), m_name);
559
560 pollerUnlock();
561 m_runtimeFlags |= DCDF_CONFIGURATION_POLL_PASSED;
562 }
563
564 /**
565 * Status poll
566 */
567 void Cluster::statusPoll(PollerInfo *poller, ClientSession *pSession, UINT32 dwRqId)
568 {
569 if (m_runtimeFlags & DCDF_DELETE_IN_PROGRESS)
570 {
571 if (dwRqId == 0)
572 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
573 return;
574 }
575
576 poller->setStatus(_T("wait for lock"));
577 pollerLock();
578
579 if (IsShutdownInProgress())
580 {
581 pollerUnlock();
582 return;
583 }
584
585 BOOL bModified = FALSE, bAllDown;
586 BYTE *pbResourceFound;
587
588 // Create polling list
589 ObjectArray<DataCollectionTarget> pollList(m_childList->size(), 16, false);
590 lockChildList(false);
591 int i;
592 for(i = 0; i < m_childList->size(); i++)
593 {
594 NetObj *object = m_childList->get(i);
595 if ((object->getStatus() != STATUS_UNMANAGED) && object->isDataCollectionTarget())
596 {
597 object->incRefCount();
598 static_cast<DataCollectionTarget*>(object)->lockForStatusPoll();
599 pollList.add(static_cast<DataCollectionTarget*>(object));
600 }
601 }
602 unlockChildList();
603
604 // Perform status poll on all member nodes
605 m_pollRequestor = pSession;
606 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Polling member nodes\r\n"), m_name);
607 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling member nodes"), m_name);
608 for(i = 0, bAllDown = TRUE; i < pollList.size(); i++)
609 {
610 DataCollectionTarget *object = pollList.get(i);
611 object->statusPollPollerEntry(poller, pSession, dwRqId);
612 if ((object->getObjectClass() == OBJECT_NODE) && !static_cast<Node*>(object)->isDown())
613 bAllDown = FALSE;
614 }
615
616 if (bAllDown)
617 {
618 if (!(m_state & CLSF_DOWN))
619 {
620 m_state |= CLSF_DOWN;
621 PostEvent(EVENT_CLUSTER_DOWN, m_id, NULL);
622 }
623 }
624 else
625 {
626 if (m_state & CLSF_DOWN)
627 {
628 m_state &= ~CLSF_DOWN;
629 PostEvent(EVENT_CLUSTER_UP, m_id, NULL);
630 }
631 }
632
633 // Check for cluster resource movement
634 if (!bAllDown)
635 {
636 pbResourceFound = (BYTE *)malloc(m_dwNumResources);
637 memset(pbResourceFound, 0, m_dwNumResources);
638
639 poller->setStatus(_T("resource poll"));
640 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Polling resources\r\n"), m_name);
641 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling resources"), m_name);
642 for(i = 0; i < pollList.size(); i++)
643 {
644 if (pollList.get(i)->getObjectClass() != OBJECT_NODE)
645 continue;
646
647 Node *node = static_cast<Node*>(pollList.get(i));
648 InterfaceList *pIfList = node->getInterfaceList();
649 if (pIfList != NULL)
650 {
651 lockProperties();
652 for(int j = 0; j < pIfList->size(); j++)
653 {
654 for(UINT32 k = 0; k < m_dwNumResources; k++)
655 {
656 if (pIfList->get(j)->hasAddress(m_pResourceList[k].ipAddr))
657 {
658 if (m_pResourceList[k].dwCurrOwner != node->getId())
659 {
660 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Resource %s owner changed\r\n"), m_name, m_pResourceList[k].szName);
661 DbgPrintf(5, _T("CLUSTER STATUS POLL [%s]: Resource %s owner changed"),
662 m_name, m_pResourceList[k].szName);
663
664 // Resource moved or go up
665 if (m_pResourceList[k].dwCurrOwner == 0)
666 {
667 // Resource up
668 PostEvent(EVENT_CLUSTER_RESOURCE_UP, m_id, "dsds",
669 m_pResourceList[k].dwId, m_pResourceList[k].szName,
670 node->getId(), node->getName());
671 }
672 else
673 {
674 // Moved
675 NetObj *pObject = FindObjectById(m_pResourceList[k].dwCurrOwner);
676 PostEvent(EVENT_CLUSTER_RESOURCE_MOVED, m_id, "dsdsds",
677 m_pResourceList[k].dwId, m_pResourceList[k].szName,
678 m_pResourceList[k].dwCurrOwner,
679 (pObject != NULL) ? pObject->getName() : _T("<unknown>"),
680 node->getId(), node->getName());
681 }
682 m_pResourceList[k].dwCurrOwner = node->getId();
683 bModified = TRUE;
684 }
685 pbResourceFound[k] = 1;
686 }
687 }
688 }
689 unlockProperties();
690 delete pIfList;
691 }
692 else
693 {
694 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Cannot get interface list from %s\r\n"), m_name, node->getName());
695 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Cannot get interface list from %s"),
696 m_name, node->getName());
697 }
698 }
699
700 // Check for missing virtual addresses
701 lockProperties();
702 for(i = 0; i < (int)m_dwNumResources; i++)
703 {
704 if ((!pbResourceFound[i]) && (m_pResourceList[i].dwCurrOwner != 0))
705 {
706 NetObj *pObject = FindObjectById(m_pResourceList[i].dwCurrOwner);
707 PostEvent(EVENT_CLUSTER_RESOURCE_DOWN, m_id, "dsds",
708 m_pResourceList[i].dwId, m_pResourceList[i].szName,
709 m_pResourceList[i].dwCurrOwner,
710 (pObject != NULL) ? pObject->getName() : _T("<unknown>"));
711 m_pResourceList[i].dwCurrOwner = 0;
712 bModified = TRUE;
713 }
714 }
715 unlockProperties();
716 free(pbResourceFound);
717 }
718
719
720 // Execute hook script
721 poller->setStatus(_T("hook"));
722 executeHookScript(_T("StatusPoll"));
723
724 calculateCompoundStatus(true);
725 poller->setStatus(_T("cleanup"));
726
727 // Cleanup
728 for(i = 0; i < pollList.size(); i++)
729 {
730 pollList.get(i)->decRefCount();
731 }
732
733 lockProperties();
734 if (bModified)
735 setModified();
736 m_lastStatusPoll = time(NULL);
737 m_runtimeFlags &= ~DCDF_QUEUED_FOR_STATUS_POLL;
738 unlockProperties();
739
740 sendPollerMsg(dwRqId, _T("CLUSTER STATUS POLL [%s]: Finished\r\n"), m_name);
741 DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Finished"), m_name);
742
743 pollerUnlock();
744 }
745
746 /**
747 * Check if node is current owner of resource
748 */
749 bool Cluster::isResourceOnNode(UINT32 dwResource, UINT32 dwNode)
750 {
751 bool bRet = FALSE;
752
753 lockProperties();
754 for(UINT32 i = 0; i < m_dwNumResources; i++)
755 {
756 if (m_pResourceList[i].dwId == dwResource)
757 {
758 if (m_pResourceList[i].dwCurrOwner == dwNode)
759 bRet = true;
760 break;
761 }
762 }
763 unlockProperties();
764 return bRet;
765 }
766
767 /**
768 * Get node ID of resource owner
769 */
770 UINT32 Cluster::getResourceOwnerInternal(UINT32 id, const TCHAR *name)
771 {
772 UINT32 ownerId = 0;
773 lockProperties();
774 for(UINT32 i = 0; i < m_dwNumResources; i++)
775 {
776 if (((name != NULL) && !_tcsicmp(name, m_pResourceList[i].szName)) ||
777 (m_pResourceList[i].dwId == id))
778 {
779 ownerId = m_pResourceList[i].dwCurrOwner;
780 break;
781 }
782 }
783 unlockProperties();
784 return ownerId;
785 }
786
787 /**
788 * Collect aggregated data for cluster nodes - single value
789 */
790 UINT32 Cluster::collectAggregatedData(DCItem *item, TCHAR *buffer)
791 {
792 lockChildList(false);
793 ItemValue **values = (ItemValue **)malloc(sizeof(ItemValue *) * m_childList->size());
794 int valueCount = 0;
795 for(int i = 0; i < m_childList->size(); i++)
796 {
797 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
798 continue;
799
800 Node *node = (Node *)m_childList->get(i);
801 DCObject *dco = node->getDCObjectByTemplateId(item->getId());
802 if ((dco != NULL) &&
803 (dco->getType() == DCO_TYPE_ITEM) &&
804 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
805 ((dco->getErrorCount() == 0) || dco->isAggregateWithErrors()) &&
806 dco->matchClusterResource())
807 {
808 ItemValue *v = ((DCItem *)dco)->getInternalLastValue();
809 if (v != NULL)
810 values[valueCount++] = v;
811 }
812 }
813 unlockChildList();
814
815 UINT32 rcc = DCE_SUCCESS;
816 if (valueCount > 0)
817 {
818 ItemValue result;
819 switch(item->getAggregationFunction())
820 {
821 case DCF_FUNCTION_AVG:
822 CalculateItemValueAverage(result, item->getDataType(), valueCount, values);
823 break;
824 case DCF_FUNCTION_MAX:
825 CalculateItemValueMax(result, item->getDataType(), valueCount, values);
826 break;
827 case DCF_FUNCTION_MIN:
828 CalculateItemValueMin(result, item->getDataType(), valueCount, values);
829 break;
830 case DCF_FUNCTION_SUM:
831 CalculateItemValueTotal(result, item->getDataType(), valueCount, values);
832 break;
833 default:
834 rcc = DCE_NOT_SUPPORTED;
835 break;
836 }
837 nx_strncpy(buffer, result.getString(), MAX_RESULT_LENGTH);
838 }
839 else
840 {
841 rcc = DCE_COLLECTION_ERROR;
842 }
843
844 for(int i = 0; i < valueCount; i++)
845 delete values[i];
846 safe_free(values);
847
848 return rcc;
849 }
850
851 /**
852 * Collect aggregated data for cluster nodes - table
853 */
854 UINT32 Cluster::collectAggregatedData(DCTable *table, Table **result)
855 {
856 lockChildList(false);
857 Table **values = (Table **)malloc(sizeof(Table *) * m_childList->size());
858 int valueCount = 0;
859 for(int i = 0; i < m_childList->size(); i++)
860 {
861 if (m_childList->get(i)->getObjectClass() != OBJECT_NODE)
862 continue;
863
864 Node *node = (Node *)m_childList->get(i);
865 DCObject *dco = node->getDCObjectByTemplateId(table->getId());
866 if ((dco != NULL) &&
867 (dco->getType() == DCO_TYPE_TABLE) &&
868 (dco->getStatus() == ITEM_STATUS_ACTIVE) &&
869 ((dco->getErrorCount() == 0) || dco->isAggregateWithErrors()) &&
870 dco->matchClusterResource())
871 {
872 Table *v = ((DCTable *)dco)->getLastValue();
873 if (v != NULL)
874 values[valueCount++] = v;
875 }
876 }
877 unlockChildList();
878
879 UINT32 rcc = DCE_SUCCESS;
880 if (valueCount > 0)
881 {
882 *result = new Table(values[0]);
883 for(int i = 1; i < valueCount; i++)
884 table->mergeValues(*result, values[i], i);
885 }
886 else
887 {
888 rcc = DCE_COLLECTION_ERROR;
889 }
890
891 for(int i = 0; i < valueCount; i++)
892 values[i]->decRefCount();
893 safe_free(values);
894
895 return rcc;
896 }
897
898 /**
899 * Unbind cluster from template
900 */
901 void Cluster::unbindFromTemplate(UINT32 dwTemplateId, bool removeDCI)
902 {
903 DataCollectionTarget::unbindFromTemplate(dwTemplateId, removeDCI);
904 queueUpdate();
905 }
906
907 /**
908 * Called when data collection configuration changed
909 */
910 void Cluster::onDataCollectionChange()
911 {
912 queueUpdate();
913 }
914
915 /**
916 * Create NXSL object for this object
917 */
918 NXSL_Value *Cluster::createNXSLObject()
919 {
920 return new NXSL_Value(new NXSL_Object(&g_nxslClusterClass, this));
921 }
922
923 /**
924 * Get cluster nodes as NXSL array
925 */
926 NXSL_Array *Cluster::getNodesForNXSL()
927 {
928 NXSL_Array *nodes = new NXSL_Array();
929 int index = 0;
930
931 lockChildList(false);
932 for(int i = 0; i < m_childList->size(); i++)
933 {
934 if (m_childList->get(i)->getObjectClass() == OBJECT_NODE)
935 {
936 nodes->set(index++, new NXSL_Value(new NXSL_Object(&g_nxslNodeClass, m_childList->get(i))));
937 }
938 }
939 unlockChildList();
940
941 return nodes;
942 }
943
944 /**
945 * Serialize object to JSON
946 */
947 json_t *Cluster::toJson()
948 {
949 json_t *root = DataCollectionTarget::toJson();
950 json_object_set_new(root, "clusterType", json_integer(m_dwClusterType));
951 json_object_set_new(root, "syncNetworks", json_object_array(m_syncNetworks));
952 json_object_set_new(root, "lastStatusPoll", json_integer(m_lastStatusPoll));
953 json_object_set_new(root, "lastConfigurationPoll", json_integer(m_lastConfigurationPoll));
954 json_object_set_new(root, "zoneUIN", json_integer(m_zoneUIN));
955
956 json_t *resources = json_array();
957 for(UINT32 i = 0; i < m_dwNumResources; i++)
958 {
959 json_t *r = json_object();
960 json_object_set_new(r, "id", json_integer(m_pResourceList[i].dwId));
961 json_object_set_new(r, "name", json_string_t(m_pResourceList[i].szName));
962 json_object_set_new(r, "address", m_pResourceList[i].ipAddr.toJson());
963 json_object_set_new(r, "currentOwner", json_integer(m_pResourceList[i].dwCurrOwner));
964 json_array_append_new(resources, r);
965 }
966 json_object_set_new(root, "resources", resources);
967 return root;
968 }