NamedPipe class splitted into NAmedPipe and NamedPipeListener; nxapush switched to...
[public/netxms.git] / src / agent / core / extagent.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: extagent.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 *Static data
27 */
28 static ObjectArray<ExternalSubagent> s_subagents;
29
30 /**
31 * Constructor
32 */
33 ExternalSubagent::ExternalSubagent(const TCHAR *name, const TCHAR *user)
34 {
35 nx_strncpy(m_name, name, MAX_SUBAGENT_NAME);
36 nx_strncpy(m_user, user, MAX_ESA_USER_NAME);
37 m_connected = false;
38 m_listener = NULL;
39 m_pipe = NULL;
40 m_msgQueue = new MsgWaitQueue();
41 m_requestId = 1;
42 }
43
44 /**
45 * Destructor
46 */
47 ExternalSubagent::~ExternalSubagent()
48 {
49 delete m_msgQueue;
50 delete m_listener;
51 }
52
53 /**
54 * Pipe request handler
55 */
56 static void RequestHandler(NamedPipe *pipe, void *userArg)
57 {
58 static_cast<ExternalSubagent*>(userArg)->connect(pipe);
59 }
60
61 /**
62 * Start listener
63 */
64 void ExternalSubagent::startListener()
65 {
66 TCHAR name[MAX_PIPE_NAME_LEN];
67 _sntprintf(name, MAX_PIPE_NAME_LEN, _T("nxagentd.subagent.%s"), m_name);
68 m_listener = NamedPipeListener::create(name, RequestHandler, this);
69 if (m_listener != NULL)
70 m_listener->start();
71 }
72
73 /*
74 * Send message to external subagent
75 */
76 bool ExternalSubagent::sendMessage(NXCPMessage *msg)
77 {
78 TCHAR buffer[256];
79 AgentWriteDebugLog(6, _T("ExternalSubagent::sendMessage(%s): sending message %s"), m_name, NXCPMessageCodeName(msg->getCode(), buffer));
80
81 NXCP_MESSAGE *rawMsg = msg->createMessage();
82 bool success = (m_pipe != NULL) ? m_pipe->write(rawMsg, ntohl(rawMsg->size)) : false;
83 free(rawMsg);
84 return success;
85 }
86
87 /**
88 * Wait for specific message to arrive
89 */
90 NXCPMessage *ExternalSubagent::waitForMessage(WORD code, UINT32 id)
91 {
92 return m_msgQueue->waitForMessage(code, id, 5000); // 5 sec timeout
93 }
94
95 /**
96 * Main connection thread
97 */
98 void ExternalSubagent::connect(NamedPipe *pipe)
99 {
100 TCHAR buffer[256];
101 UINT32 i;
102
103 m_pipe = pipe;
104 m_connected = true;
105 AgentWriteDebugLog(2, _T("ExternalSubagent(%s): connection established"), m_name);
106 PipeMessageReceiver receiver(pipe->handle(), 8192, 1048576); // 8K initial, 1M max
107 while(true)
108 {
109 MessageReceiverResult result;
110 NXCPMessage *msg = receiver.readMessage(INFINITE, &result);
111 if (msg == NULL)
112 {
113 AgentWriteDebugLog(6, _T("ExternalSubagent(%s): receiver failure (%s)"), m_name, AbstractMessageReceiver::resultToText(result));
114 break;
115 }
116 AgentWriteDebugLog(6, _T("ExternalSubagent(%s): received message %s"), m_name, NXCPMessageCodeName(msg->getCode(), buffer));
117 switch(msg->getCode())
118 {
119 case CMD_PUSH_DCI_DATA:
120 MutexLock(g_hSessionListAccess);
121 for(i = 0; i < g_dwMaxSessions; i++)
122 if (g_pSessionList[i] != NULL)
123 if (g_pSessionList[i]->canAcceptTraps())
124 {
125 g_pSessionList[i]->sendMessage(msg);
126 }
127 MutexUnlock(g_hSessionListAccess);
128 delete msg;
129 break;
130 case CMD_TRAP:
131 ForwardTrap(msg);
132 delete msg;
133 break;
134 default:
135 m_msgQueue->put(msg);
136 break;
137 }
138 }
139
140 AgentWriteDebugLog(2, _T("ExternalSubagent(%s): connection closed"), m_name);
141 m_connected = false;
142 m_msgQueue->clear();
143 m_pipe = NULL;
144 }
145
146 /**
147 * Send shutdown command
148 */
149 void ExternalSubagent::shutdown()
150 {
151 NXCPMessage msg(CMD_SHUTDOWN, m_requestId++);
152 sendMessage(&msg);
153 }
154
155 /**
156 * Send restart command
157 */
158 void ExternalSubagent::restart()
159 {
160 NXCPMessage msg(CMD_RESTART, m_requestId++);
161 sendMessage(&msg);
162 }
163
164 /**
165 * Get list of supported parameters
166 */
167 NETXMS_SUBAGENT_PARAM *ExternalSubagent::getSupportedParameters(UINT32 *count)
168 {
169 NXCPMessage msg;
170 NETXMS_SUBAGENT_PARAM *result = NULL;
171
172 msg.setCode(CMD_GET_PARAMETER_LIST);
173 msg.setId(m_requestId++);
174 if (sendMessage(&msg))
175 {
176 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
177 if (response != NULL)
178 {
179 if (response->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
180 {
181 *count = response->getFieldAsUInt32(VID_NUM_PARAMETERS);
182 result = (NETXMS_SUBAGENT_PARAM *)malloc(*count * sizeof(NETXMS_SUBAGENT_PARAM));
183 UINT32 varId = VID_PARAM_LIST_BASE;
184 for(UINT32 i = 0; i < *count; i++)
185 {
186 response->getFieldAsString(varId++, result[i].name, MAX_PARAM_NAME);
187 response->getFieldAsString(varId++, result[i].description, MAX_DB_STRING);
188 result[i].dataType = (int)response->getFieldAsUInt16(varId++);
189 }
190 }
191 delete response;
192 }
193 }
194 return result;
195 }
196
197 /**
198 * Get list of supported lists
199 */
200 NETXMS_SUBAGENT_LIST *ExternalSubagent::getSupportedLists(UINT32 *count)
201 {
202 NXCPMessage msg;
203 NETXMS_SUBAGENT_LIST *result = NULL;
204
205 msg.setCode(CMD_GET_ENUM_LIST);
206 msg.setId(m_requestId++);
207 if (sendMessage(&msg))
208 {
209 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
210 if (response != NULL)
211 {
212 if (response->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
213 {
214 *count = response->getFieldAsUInt32(VID_NUM_ENUMS);
215 result = (NETXMS_SUBAGENT_LIST *)malloc(*count * sizeof(NETXMS_SUBAGENT_LIST));
216 UINT32 varId = VID_ENUM_LIST_BASE;
217 for(UINT32 i = 0; i < *count; i++)
218 {
219 response->getFieldAsString(varId++, result[i].name, MAX_PARAM_NAME);
220 }
221 }
222 delete response;
223 }
224 }
225 return result;
226 }
227
228 /**
229 * Get list of supported tables
230 */
231 NETXMS_SUBAGENT_TABLE *ExternalSubagent::getSupportedTables(UINT32 *count)
232 {
233 NXCPMessage msg;
234 NETXMS_SUBAGENT_TABLE *result = NULL;
235
236 msg.setCode(CMD_GET_TABLE_LIST);
237 msg.setId(m_requestId++);
238 if (sendMessage(&msg))
239 {
240 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
241 if (response != NULL)
242 {
243 if (response->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
244 {
245 *count = response->getFieldAsUInt32(VID_NUM_TABLES);
246 result = (NETXMS_SUBAGENT_TABLE *)malloc(*count * sizeof(NETXMS_SUBAGENT_TABLE));
247 UINT32 varId = VID_TABLE_LIST_BASE;
248 for(UINT32 i = 0; i < *count; i++)
249 {
250 response->getFieldAsString(varId++, result[i].name, MAX_PARAM_NAME);
251 response->getFieldAsString(varId++, result[i].instanceColumns, MAX_COLUMN_NAME * MAX_INSTANCE_COLUMNS);
252 response->getFieldAsString(varId++, result[i].description, MAX_DB_STRING);
253 }
254 }
255 delete response;
256 }
257 }
258 return result;
259 }
260
261 /**
262 * List supported parameters
263 */
264 void ExternalSubagent::listParameters(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
265 {
266 UINT32 paramCount = 0;
267 NETXMS_SUBAGENT_PARAM *list = getSupportedParameters(&paramCount);
268 if (list != NULL)
269 {
270 UINT32 id = *baseId;
271
272 for(UINT32 i = 0; i < paramCount; i++)
273 {
274 msg->setField(id++, list[i].name);
275 msg->setField(id++, list[i].description);
276 msg->setField(id++, (WORD)list[i].dataType);
277 }
278 *baseId = id;
279 *count += paramCount;
280 free(list);
281 }
282 }
283
284 /**
285 * List supported parameters
286 */
287 void ExternalSubagent::listParameters(StringList *list)
288 {
289 UINT32 paramCount = 0;
290 NETXMS_SUBAGENT_PARAM *plist = getSupportedParameters(&paramCount);
291 if (plist != NULL)
292 {
293 for(UINT32 i = 0; i < paramCount; i++)
294 list->add(plist[i].name);
295 free(plist);
296 }
297 }
298
299 /**
300 * List supported lists
301 */
302 void ExternalSubagent::listLists(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
303 {
304 UINT32 paramCount = 0;
305 NETXMS_SUBAGENT_LIST *list = getSupportedLists(&paramCount);
306 if (list != NULL)
307 {
308 UINT32 id = *baseId;
309
310 for(UINT32 i = 0; i < paramCount; i++)
311 {
312 msg->setField(id++, list[i].name);
313 }
314 *baseId = id;
315 *count += paramCount;
316 free(list);
317 }
318 }
319
320 /**
321 * List supported lists
322 */
323 void ExternalSubagent::listLists(StringList *list)
324 {
325 UINT32 paramCount = 0;
326 NETXMS_SUBAGENT_LIST *plist = getSupportedLists(&paramCount);
327 if (plist != NULL)
328 {
329 for(UINT32 i = 0; i < paramCount; i++)
330 list->add(plist[i].name);
331 free(plist);
332 }
333 }
334
335 /**
336 * List supported tables
337 */
338 void ExternalSubagent::listTables(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
339 {
340 UINT32 paramCount = 0;
341 NETXMS_SUBAGENT_TABLE *list = getSupportedTables(&paramCount);
342 if (list != NULL)
343 {
344 UINT32 id = *baseId;
345
346 for(UINT32 i = 0; i < paramCount; i++)
347 {
348 msg->setField(id++, list[i].name);
349 msg->setField(id++, list[i].instanceColumns);
350 msg->setField(id++, list[i].description);
351 }
352 *baseId = id;
353 *count += paramCount;
354 free(list);
355 }
356 }
357
358 /**
359 * List supported tables
360 */
361 void ExternalSubagent::listTables(StringList *list)
362 {
363 UINT32 paramCount = 0;
364 NETXMS_SUBAGENT_TABLE *plist = getSupportedTables(&paramCount);
365 if (plist != NULL)
366 {
367 for(UINT32 i = 0; i < paramCount; i++)
368 list->add(plist[i].name);
369 free(plist);
370 }
371 }
372
373 /**
374 * Get parameter value from external subagent
375 */
376 UINT32 ExternalSubagent::getParameter(const TCHAR *name, TCHAR *buffer)
377 {
378 NXCPMessage msg;
379 UINT32 rcc;
380
381 msg.setCode(CMD_GET_PARAMETER);
382 msg.setId(m_requestId++);
383 msg.setField(VID_PARAMETER, name);
384 if (sendMessage(&msg))
385 {
386 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
387 if (response != NULL)
388 {
389 rcc = response->getFieldAsUInt32(VID_RCC);
390 if (rcc == ERR_SUCCESS)
391 response->getFieldAsString(VID_VALUE, buffer, MAX_RESULT_LENGTH);
392 delete response;
393 }
394 else
395 {
396 rcc = ERR_INTERNAL_ERROR;
397 }
398 }
399 else
400 {
401 rcc = ERR_CONNECTION_BROKEN;
402 }
403 return rcc;
404 }
405
406 /**
407 * Get table value from external subagent
408 */
409 UINT32 ExternalSubagent::getTable(const TCHAR *name, Table *value)
410 {
411 NXCPMessage msg;
412 UINT32 rcc;
413
414 msg.setCode(CMD_GET_TABLE);
415 msg.setId(m_requestId++);
416 msg.setField(VID_PARAMETER, name);
417 if (sendMessage(&msg))
418 {
419 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
420 if (response != NULL)
421 {
422 rcc = response->getFieldAsUInt32(VID_RCC);
423 if (rcc == ERR_SUCCESS)
424 value->updateFromMessage(response);
425 delete response;
426 }
427 else
428 {
429 rcc = ERR_INTERNAL_ERROR;
430 }
431 }
432 else
433 {
434 rcc = ERR_CONNECTION_BROKEN;
435 }
436 return rcc;
437 }
438
439 /**
440 * Get list value from external subagent
441 */
442 UINT32 ExternalSubagent::getList(const TCHAR *name, StringList *value)
443 {
444 NXCPMessage msg;
445 UINT32 rcc;
446
447 msg.setCode(CMD_GET_LIST);
448 msg.setId(m_requestId++);
449 msg.setField(VID_PARAMETER, name);
450 if (sendMessage(&msg))
451 {
452 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
453 if (response != NULL)
454 {
455 rcc = response->getFieldAsUInt32(VID_RCC);
456 if (rcc == ERR_SUCCESS)
457 {
458 UINT32 count = response->getFieldAsUInt32(VID_NUM_STRINGS);
459 for(UINT32 i = 0; i < count; i++)
460 value->addPreallocated(response->getFieldAsString(VID_ENUM_VALUE_BASE + i));
461 }
462 delete response;
463 }
464 else
465 {
466 rcc = ERR_INTERNAL_ERROR;
467 }
468 }
469 else
470 {
471 rcc = ERR_CONNECTION_BROKEN;
472 }
473 return rcc;
474 }
475
476 /**
477 * Add external subagent from config.
478 * Each line in config should be in form
479 * name:user
480 * If user part is omited or set to *, connection from any account will be accepted
481 */
482 bool AddExternalSubagent(const TCHAR *config)
483 {
484 TCHAR buffer[1024], user[256] = _T("*");
485
486 nx_strncpy(buffer, config, 1024);
487 TCHAR *ptr = _tcschr(buffer, _T(':'));
488 if (ptr != NULL)
489 {
490 *ptr = 0;
491 ptr++;
492 _tcsncpy(user, ptr, 256);
493 Trim(user);
494 }
495
496 ExternalSubagent *subagent = new ExternalSubagent(buffer, user);
497 s_subagents.add(subagent);
498 subagent->startListener();
499 return true;
500 }
501
502 /**
503 * Add parameters from external providers to NXCP message
504 */
505 void ListParametersFromExtSubagents(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
506 {
507 for(int i = 0; i < s_subagents.size(); i++)
508 {
509 if (s_subagents.get(i)->isConnected())
510 {
511 s_subagents.get(i)->listParameters(msg, baseId, count);
512 }
513 }
514 }
515
516 /**
517 * Add parameters from external subagents to string list
518 */
519 void ListParametersFromExtSubagents(StringList *list)
520 {
521 for(int i = 0; i < s_subagents.size(); i++)
522 {
523 if (s_subagents.get(i)->isConnected())
524 {
525 s_subagents.get(i)->listParameters(list);
526 }
527 }
528 }
529
530 /**
531 * Add lists from external providers to NXCP message
532 */
533 void ListListsFromExtSubagents(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
534 {
535 for(int i = 0; i < s_subagents.size(); i++)
536 {
537 if (s_subagents.get(i)->isConnected())
538 {
539 s_subagents.get(i)->listLists(msg, baseId, count);
540 }
541 }
542 }
543
544 /**
545 * Add lists from external subagents to string list
546 */
547 void ListListsFromExtSubagents(StringList *list)
548 {
549 for(int i = 0; i < s_subagents.size(); i++)
550 {
551 if (s_subagents.get(i)->isConnected())
552 {
553 s_subagents.get(i)->listLists(list);
554 }
555 }
556 }
557
558 /**
559 * Add tables from external providers to NXCP message
560 */
561 void ListTablesFromExtSubagents(NXCPMessage *msg, UINT32 *baseId, UINT32 *count)
562 {
563 for(int i = 0; i < s_subagents.size(); i++)
564 {
565 if (s_subagents.get(i)->isConnected())
566 {
567 s_subagents.get(i)->listTables(msg, baseId, count);
568 }
569 }
570 }
571
572 /**
573 * Add tables from external subagents to string list
574 */
575 void ListTablesFromExtSubagents(StringList *list)
576 {
577 for(int i = 0; i < s_subagents.size(); i++)
578 {
579 if (s_subagents.get(i)->isConnected())
580 {
581 s_subagents.get(i)->listTables(list);
582 }
583 }
584 }
585
586 /**
587 * Get value from external subagent
588 *
589 * @return agent error code
590 */
591 UINT32 GetParameterValueFromExtSubagent(const TCHAR *name, TCHAR *buffer)
592 {
593 UINT32 rc = ERR_UNKNOWN_PARAMETER;
594 for(int i = 0; i < s_subagents.size(); i++)
595 {
596 if (s_subagents.get(i)->isConnected())
597 {
598 rc = s_subagents.get(i)->getParameter(name, buffer);
599 if (rc == ERR_SUCCESS)
600 break;
601 }
602 }
603 return rc;
604 }
605
606 /**
607 * Get table from external subagent
608 *
609 * @return agent error code
610 */
611 UINT32 GetTableValueFromExtSubagent(const TCHAR *name, Table *value)
612 {
613 UINT32 rc = ERR_UNKNOWN_PARAMETER;
614 for(int i = 0; i < s_subagents.size(); i++)
615 {
616 if (s_subagents.get(i)->isConnected())
617 {
618 rc = s_subagents.get(i)->getTable(name, value);
619 if (rc == ERR_SUCCESS)
620 break;
621 }
622 }
623 return rc;
624 }
625
626 /**
627 * Get list from external subagent
628 *
629 * @return agent error code
630 */
631 UINT32 GetListValueFromExtSubagent(const TCHAR *name, StringList *value)
632 {
633 UINT32 rc = ERR_UNKNOWN_PARAMETER;
634 for(int i = 0; i < s_subagents.size(); i++)
635 {
636 if (s_subagents.get(i)->isConnected())
637 {
638 rc = s_subagents.get(i)->getList(name, value);
639 if (rc == ERR_SUCCESS)
640 break;
641 }
642 }
643 return rc;
644 }
645
646 /**
647 * Shutdown all connected external subagents
648 */
649 void ShutdownExtSubagents()
650 {
651 for(int i = 0; i < s_subagents.size(); i++)
652 {
653 if (s_subagents.get(i)->isConnected())
654 {
655 nxlog_debug(1, _T("Sending SHUTDOWN command to external subagent %s"), s_subagents.get(i)->getName());
656 s_subagents.get(i)->shutdown();
657 }
658 }
659 }
660
661 /**
662 * Restart all connected external subagents
663 */
664 void RestartExtSubagents()
665 {
666 for(int i = 0; i < s_subagents.size(); i++)
667 {
668 if (s_subagents.get(i)->isConnected())
669 {
670 nxlog_debug(1, _T("Sending RESTART command to external subagent %s"), s_subagents.get(i)->getName());
671 s_subagents.get(i)->restart();
672 }
673 }
674 }
675
676 /**
677 * Handler for Agent.IsExternalSubagentConnected
678 */
679 LONG H_IsExtSubagentConnected(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
680 {
681 TCHAR name[256];
682 if (!AgentGetParameterArg(cmd, 1, name, 256))
683 return SYSINFO_RC_UNSUPPORTED;
684 LONG rc = SYSINFO_RC_NO_SUCH_INSTANCE;
685 for(int i = 0; i < s_subagents.size(); i++)
686 {
687 if (!_tcsicmp(s_subagents.get(i)->getName(), name))
688 {
689 _tprintf(_T(">>> subagent %s connected %d\n"), s_subagents.get(i)->getName(), s_subagents.get(i)->isConnected());
690 ret_int(value, s_subagents.get(i)->isConnected() ? 1 : 0);
691 rc = SYSINFO_RC_SUCCESS;
692 break;
693 }
694 }
695 return rc;
696 }