2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
33 void UnregisterSession(UINT32 dwIndex
);
34 UINT32
DeployPolicy(CommSession
*session
, NXCPMessage
*request
);
35 UINT32
UninstallPolicy(CommSession
*session
, NXCPMessage
*request
);
36 UINT32
GetPolicyInventory(CommSession
*session
, NXCPMessage
*msg
);
37 void ClearDataCollectionConfiguration();
42 #define MAX_MSG_SIZE 4194304
45 * SNMP proxy thread pool
47 ThreadPool
*g_snmpProxyThreadPool
= NULL
;
50 * Agent proxy statistics
52 static UINT64 s_proxyConnectionRequests
= 0;
53 static VolatileCounter s_activeProxySessions
= 0;
56 * Handler for agent proxy stats parameters
58 LONG
H_AgentProxyStats(const TCHAR
*cmd
, const TCHAR
*arg
, TCHAR
*value
, AbstractCommSession
*session
)
63 ret_uint(value
, (UINT32
)s_activeProxySessions
);
66 ret_uint64(value
, s_proxyConnectionRequests
);
69 return SYSINFO_RC_UNSUPPORTED
;
71 return SYSINFO_RC_SUCCESS
;
75 * Client communication read thread
77 THREAD_RESULT THREAD_CALL
CommSession::readThreadStarter(void *pArg
)
79 ((CommSession
*)pArg
)->readThread();
81 // When CommSession::ReadThread exits, all other session
82 // threads are already stopped, so we can safely destroy
84 UnregisterSession(((CommSession
*)pArg
)->getIndex());
85 ((CommSession
*)pArg
)->decRefCount();
90 * Client communication write thread
92 THREAD_RESULT THREAD_CALL
CommSession::writeThreadStarter(void *pArg
)
94 ((CommSession
*)pArg
)->writeThread();
99 * Received message processing thread
101 THREAD_RESULT THREAD_CALL
CommSession::processingThreadStarter(void *pArg
)
103 ((CommSession
*)pArg
)->processingThread();
108 * Client communication write thread
110 THREAD_RESULT THREAD_CALL
CommSession::proxyReadThreadStarter(void *pArg
)
112 ((CommSession
*)pArg
)->proxyReadThread();
117 * Client session class constructor
119 CommSession::CommSession(SOCKET hSocket
, const InetAddress
&serverAddr
, bool masterServer
, bool controlServer
)
121 m_sendQueue
= new Queue
;
122 m_processingQueue
= new Queue
;
124 m_hProxySocket
= INVALID_SOCKET
;
125 m_dwIndex
= INVALID_INDEX
;
126 m_hWriteThread
= INVALID_THREAD_HANDLE
;
127 m_hProcessingThread
= INVALID_THREAD_HANDLE
;
128 m_hProxyReadThread
= INVALID_THREAD_HANDLE
;
130 m_serverAddr
= serverAddr
;
131 m_authenticated
= (g_dwFlags
& AF_REQUIRE_AUTH
) ? false : true;
132 m_masterServer
= masterServer
;
133 m_controlServer
= controlServer
;
134 m_proxyConnection
= false;
135 m_acceptTraps
= false;
136 m_acceptData
= false;
137 m_acceptFileUpdates
= false;
139 m_bulkReconciliationSupported
= false;
145 m_socketWriteMutex
= MutexCreate();
146 m_responseQueue
= new MsgWaitQueue();
153 CommSession::~CommSession()
155 if (m_proxyConnection
)
156 InterlockedDecrement(&s_activeProxySessions
);
158 shutdown(m_hSocket
, SHUT_RDWR
);
159 closesocket(m_hSocket
);
160 if (m_hProxySocket
!= INVALID_SOCKET
)
161 closesocket(m_hProxySocket
);
164 while((p
= m_sendQueue
->get()) != NULL
)
165 if (p
!= INVALID_POINTER_VALUE
)
169 while((p
= m_processingQueue
->get()) != NULL
)
170 if (p
!= INVALID_POINTER_VALUE
)
171 delete (NXCPMessage
*)p
;
172 delete m_processingQueue
;
174 if (m_hCurrFile
!= -1)
177 if ((m_pCtx
!= NULL
) && (m_pCtx
!= PROXY_ENCRYPTION_CTX
))
178 m_pCtx
->decRefCount();
179 MutexDestroy(m_socketWriteMutex
);
180 delete m_responseQueue
;
186 void CommSession::run()
188 m_hWriteThread
= ThreadCreateEx(writeThreadStarter
, 0, this);
189 m_hProcessingThread
= ThreadCreateEx(processingThreadStarter
, 0, this);
190 ThreadCreate(readThreadStarter
, 0, this);
196 void CommSession::disconnect()
198 DebugPrintf(m_dwIndex
, 5, _T("CommSession::disconnect()"));
199 shutdown(m_hSocket
, SHUT_RDWR
);
200 if (m_hProxySocket
!= -1)
201 shutdown(m_hProxySocket
, SHUT_RDWR
);
207 void CommSession::readThread()
209 SocketMessageReceiver
receiver(m_hSocket
, 4096, MAX_MSG_SIZE
);
212 if (!m_proxyConnection
)
214 MessageReceiverResult result
;
215 NXCPMessage
*msg
= receiver
.readMessage((g_dwIdleTimeout
+ 1) * 1000, &result
);
217 // Check for decryption error
218 if (result
== MSGRECV_DECRYPTION_FAILURE
)
220 DebugPrintf(m_dwIndex
, 4, _T("Unable to decrypt received message"));
225 if (result
== MSGRECV_TIMEOUT
)
227 if (m_ts
< time(NULL
) - (time_t)g_dwIdleTimeout
)
229 DebugPrintf(m_dwIndex
, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts
);
238 DebugPrintf(m_dwIndex
, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result
));
242 // Update activity timestamp
245 if (nxlog_get_debug_level() >= 8)
247 String msgDump
= NXCPMessage::dump(receiver
.getRawMessageBuffer(), NXCP_VERSION
);
248 DebugPrintf(m_dwIndex
, 8, _T("Message dump:\n%s"), (const TCHAR
*)msgDump
);
254 DebugPrintf(m_dwIndex
, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg
->getCode(), buffer
));
256 if (msg
->getCode() == CMD_FILE_DATA
)
258 if ((m_hCurrFile
!= -1) && (m_fileRqId
== msg
->getId()))
262 if (msg
->isCompressed())
264 const BYTE
*in
= msg
->getBinaryData();
265 if (m_compressor
== NULL
)
267 NXCPCompressionMethod method
= (NXCPCompressionMethod
)(*in
);
268 m_compressor
= StreamCompressor::create(method
, false, FILE_BUFFER_SIZE
);
269 if (m_compressor
== NULL
)
271 DebugPrintf(m_dwIndex
, 5, _T("Unable to create stream compressor for method %d"), (int)method
);
277 if (m_compressor
!= NULL
)
279 dataSize
= (int)m_compressor
->decompress(in
+ 4, msg
->getBinaryDataSize() - 4, &data
);
280 if (dataSize
!= (int)ntohs(*((UINT16
*)(in
+ 2))))
282 // decompressed block size validation failed
289 data
= msg
->getBinaryData();
290 dataSize
= (int)msg
->getBinaryDataSize();
293 if ((dataSize
>= 0) && (write(m_hCurrFile
, data
, dataSize
) == dataSize
))
295 if (msg
->isEndOfFile())
297 NXCPMessage response
;
301 delete_and_null(m_compressor
);
303 response
.setCode(CMD_REQUEST_COMPLETED
);
304 response
.setId(msg
->getId());
305 response
.setField(VID_RCC
, ERR_SUCCESS
);
306 sendMessage(&response
);
312 NXCPMessage response
;
316 delete_and_null(m_compressor
);
318 response
.setCode(CMD_REQUEST_COMPLETED
);
319 response
.setId(msg
->getId());
320 response
.setField(VID_RCC
, ERR_IO_FAILURE
);
321 sendMessage(&response
);
326 else if (msg
->isControl())
329 DebugPrintf(m_dwIndex
, 6, _T("Received control message %s"), NXCPMessageCodeName(msg
->getCode(), buffer
));
331 if (msg
->getCode() == CMD_GET_NXCP_CAPS
)
333 NXCP_MESSAGE
*pMsg
= (NXCP_MESSAGE
*)malloc(NXCP_HEADER_SIZE
);
334 pMsg
->id
= htonl(msg
->getId());
335 pMsg
->code
= htons((WORD
)CMD_NXCP_CAPS
);
336 pMsg
->flags
= htons(MF_CONTROL
);
337 pMsg
->numFields
= htonl(NXCP_VERSION
<< 24);
338 pMsg
->size
= htonl(NXCP_HEADER_SIZE
);
339 sendRawMessage(pMsg
, m_pCtx
);
346 DebugPrintf(m_dwIndex
, 6, _T("Received message %s"), NXCPMessageCodeName(msg
->getCode(), buffer
));
349 switch(msg
->getCode())
351 case CMD_REQUEST_COMPLETED
:
352 m_responseQueue
->put(msg
);
354 case CMD_REQUEST_SESSION_KEY
:
357 NXCPMessage
*pResponse
;
358 SetupEncryptionContext(msg
, &m_pCtx
, &pResponse
, NULL
, NXCP_VERSION
);
359 sendMessage(pResponse
);
361 receiver
.setEncryptionContext(m_pCtx
);
365 case CMD_SETUP_PROXY_CONNECTION
:
366 s_proxyConnectionRequests
++;
367 rcc
= setupProxyConnection(msg
);
368 // When proxy session established incoming messages will
369 // not be processed locally. Acknowledgment message sent
370 // by setupProxyConnection() in case of success.
371 if (rcc
== ERR_SUCCESS
)
373 InterlockedIncrement(&s_activeProxySessions
);
374 m_processingQueue
->put(INVALID_POINTER_VALUE
);
378 NXCPMessage response
;
379 response
.setCode(CMD_REQUEST_COMPLETED
);
380 response
.setId(msg
->getId());
381 response
.setField(VID_RCC
, rcc
);
382 sendMessage(&response
);
386 case CMD_SNMP_REQUEST
:
387 if (m_masterServer
&& (g_dwFlags
& AF_ENABLE_SNMP_PROXY
))
390 ThreadPoolExecute(g_snmpProxyThreadPool
, this, &CommSession::proxySnmpRequest
, msg
);
394 NXCPMessage response
;
395 response
.setCode(CMD_REQUEST_COMPLETED
);
396 response
.setId(msg
->getId());
397 response
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
398 sendMessage(&response
);
403 m_processingQueue
->put(msg
);
408 else // m_proxyConnection
415 FD_SET(m_hSocket
, &rdfs
);
416 tv
.tv_sec
= g_dwIdleTimeout
+ 1;
418 int rc
= select(SELECT_NFDS(m_hSocket
+ 1), &rdfs
, NULL
, NULL
, &tv
);
423 // Update activity timestamp
426 rc
= recv(m_hSocket
, buffer
, 32768, 0);
429 SendEx(m_hProxySocket
, buffer
, rc
, 0, NULL
);
434 // Notify other threads to exit
435 m_sendQueue
->put(INVALID_POINTER_VALUE
);
436 m_processingQueue
->put(INVALID_POINTER_VALUE
);
437 if (m_hProxySocket
!= INVALID_SOCKET
)
438 shutdown(m_hProxySocket
, SHUT_RDWR
);
440 // Wait for other threads to finish
441 ThreadJoin(m_hWriteThread
);
442 ThreadJoin(m_hProcessingThread
);
443 if (m_proxyConnection
)
444 ThreadJoin(m_hProxyReadThread
);
446 DebugPrintf(m_dwIndex
, 5, _T("Session with %s closed"), (const TCHAR
*)m_serverAddr
.toString());
450 * Send prepared raw message over the network and destroy it
452 BOOL
CommSession::sendRawMessage(NXCP_MESSAGE
*pMsg
, NXCPEncryptionContext
*pCtx
)
457 DebugPrintf(m_dwIndex
, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg
->code
), szBuffer
), ntohl(pMsg
->size
));
458 if (nxlog_get_debug_level() >= 8)
460 String msgDump
= NXCPMessage::dump(pMsg
, NXCP_VERSION
);
461 DebugPrintf(m_dwIndex
, 8, _T("Outgoing message dump:\n%s"), (const TCHAR
*)msgDump
);
463 if ((pCtx
!= NULL
) && (pCtx
!= PROXY_ENCRYPTION_CTX
))
465 NXCP_ENCRYPTED_MESSAGE
*enMsg
= pCtx
->encryptMessage(pMsg
);
468 if (SendEx(m_hSocket
, (const char *)enMsg
, ntohl(enMsg
->size
), 0, m_socketWriteMutex
) <= 0)
477 if (SendEx(m_hSocket
, (const char *)pMsg
, ntohl(pMsg
->size
), 0, m_socketWriteMutex
) <= 0)
483 DebugPrintf(m_dwIndex
, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg
->code
), szBuffer
), ntohl(pMsg
->size
));
491 void CommSession::writeThread()
497 pMsg
= (NXCP_MESSAGE
*)m_sendQueue
->getOrBlock();
498 if (pMsg
== INVALID_POINTER_VALUE
) // Session termination indicator
501 if (!sendRawMessage(pMsg
, m_pCtx
))
507 * Message processing thread
509 void CommSession::processingThread()
517 pMsg
= (NXCPMessage
*)m_processingQueue
->getOrBlock();
518 if (pMsg
== INVALID_POINTER_VALUE
) // Session termination indicator
520 dwCommand
= pMsg
->getCode();
522 // Prepare response message
523 msg
.setCode(CMD_REQUEST_COMPLETED
);
524 msg
.setId(pMsg
->getId());
526 // Check if authentication required
527 if ((!m_authenticated
) && (dwCommand
!= CMD_AUTHENTICATE
))
529 DebugPrintf(m_dwIndex
, 6, _T("Authentication required"));
530 msg
.setField(VID_RCC
, ERR_AUTH_REQUIRED
);
532 else if ((g_dwFlags
& AF_REQUIRE_ENCRYPTION
) && (m_pCtx
== NULL
))
534 DebugPrintf(m_dwIndex
, 6, _T("Encryption required"));
535 msg
.setField(VID_RCC
, ERR_ENCRYPTION_REQUIRED
);
541 case CMD_AUTHENTICATE
:
542 authenticate(pMsg
, &msg
);
544 case CMD_GET_PARAMETER
:
545 getParameter(pMsg
, &msg
);
551 getTable(pMsg
, &msg
);
554 msg
.setField(VID_RCC
, ERR_SUCCESS
);
559 case CMD_TRANSFER_FILE
:
560 recvFile(pMsg
, &msg
);
562 case CMD_UPGRADE_AGENT
:
563 msg
.setField(VID_RCC
, upgrade(pMsg
));
565 case CMD_GET_PARAMETER_LIST
:
566 msg
.setField(VID_RCC
, ERR_SUCCESS
);
567 GetParameterList(&msg
);
569 case CMD_GET_ENUM_LIST
:
570 msg
.setField(VID_RCC
, ERR_SUCCESS
);
573 case CMD_GET_TABLE_LIST
:
574 msg
.setField(VID_RCC
, ERR_SUCCESS
);
577 case CMD_GET_AGENT_CONFIG
:
580 case CMD_UPDATE_AGENT_CONFIG
:
581 updateConfig(pMsg
, &msg
);
583 case CMD_ENABLE_AGENT_TRAPS
:
586 m_acceptTraps
= true;
587 msg
.setField(VID_RCC
, ERR_SUCCESS
);
591 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
594 case CMD_ENABLE_FILE_UPDATES
:
597 m_acceptFileUpdates
= true;
598 msg
.setField(VID_RCC
, ERR_SUCCESS
);
602 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
605 case CMD_DEPLOY_AGENT_POLICY
:
608 msg
.setField(VID_RCC
, DeployPolicy(this, pMsg
));
612 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
615 case CMD_UNINSTALL_AGENT_POLICY
:
618 msg
.setField(VID_RCC
, UninstallPolicy(this, pMsg
));
622 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
625 case CMD_GET_POLICY_INVENTORY
:
628 msg
.setField(VID_RCC
, GetPolicyInventory(this, &msg
));
632 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
635 case CMD_TAKE_SCREENSHOT
:
638 TCHAR sessionName
[256];
639 pMsg
->getFieldAsString(VID_NAME
, sessionName
, 256);
640 DebugPrintf(m_dwIndex
, 6, _T("Take snapshot from session \"%s\""), sessionName
);
641 SessionAgentConnector
*conn
= AcquireSessionAgentConnector(sessionName
);
644 DebugPrintf(m_dwIndex
, 6, _T("Session agent connector acquired"));
645 conn
->takeScreenshot(&msg
);
650 msg
.setField(VID_RCC
, ERR_NO_SESSION_AGENT
);
655 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
658 case CMD_SET_SERVER_CAPABILITIES
:
659 // Servers before 2.0 use VID_ENABLED
660 m_ipv6Aware
= pMsg
->isFieldExist(VID_IPV6_SUPPORT
) ? pMsg
->getFieldAsBoolean(VID_IPV6_SUPPORT
) : pMsg
->getFieldAsBoolean(VID_ENABLED
);
661 m_bulkReconciliationSupported
= pMsg
->getFieldAsBoolean(VID_BULK_RECONCILIATION
);
662 msg
.setField(VID_RCC
, ERR_SUCCESS
);
664 case CMD_SET_SERVER_ID
:
665 m_serverId
= pMsg
->getFieldAsUInt64(VID_SERVER_ID
);
666 DebugPrintf(m_dwIndex
, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId
);
667 msg
.setField(VID_RCC
, ERR_SUCCESS
);
669 case CMD_DATA_COLLECTION_CONFIG
:
672 ConfigureDataCollection(m_serverId
, pMsg
);
674 msg
.setField(VID_RCC
, ERR_SUCCESS
);
678 DebugPrintf(m_dwIndex
, 1, _T("Data collection configuration command received but server ID is not set"));
679 msg
.setField(VID_RCC
, ERR_SERVER_ID_UNSET
);
682 case CMD_CLEAN_AGENT_DCI_CONF
:
685 ClearDataCollectionConfiguration();
686 msg
.setField(VID_RCC
, ERR_SUCCESS
);
690 msg
.setField(VID_RCC
, ERR_ACCESS_DENIED
);
694 // Attempt to process unknown command by subagents
695 if (!ProcessCmdBySubAgent(dwCommand
, pMsg
, &msg
, this))
696 msg
.setField(VID_RCC
, ERR_UNKNOWN_COMMAND
);
704 msg
.deleteAllFields();
711 void CommSession::authenticate(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
715 // Already authenticated
716 pMsg
->setField(VID_RCC
, (g_dwFlags
& AF_REQUIRE_AUTH
) ? ERR_ALREADY_AUTHENTICATED
: ERR_AUTH_NOT_REQUIRED
);
720 TCHAR szSecret
[MAX_SECRET_LENGTH
];
724 wAuthMethod
= pRequest
->getFieldAsUInt16(VID_AUTH_METHOD
);
728 pRequest
->getFieldAsString(VID_SHARED_SECRET
, szSecret
, MAX_SECRET_LENGTH
);
729 if (!_tcscmp(szSecret
, g_szSharedSecret
))
731 m_authenticated
= true;
732 pMsg
->setField(VID_RCC
, ERR_SUCCESS
);
736 nxlog_write(MSG_AUTH_FAILED
, EVENTLOG_WARNING_TYPE
, "Is", &m_serverAddr
, "PLAIN");
737 pMsg
->setField(VID_RCC
, ERR_AUTH_FAILED
);
741 pRequest
->getFieldAsBinary(VID_SHARED_SECRET
, (BYTE
*)szSecret
, MD5_DIGEST_SIZE
);
744 char sharedSecret
[256];
745 WideCharToMultiByte(CP_ACP
, WC_COMPOSITECHECK
| WC_DEFAULTCHAR
, g_szSharedSecret
, -1, sharedSecret
, 256, NULL
, NULL
);
746 sharedSecret
[255] = 0;
747 CalculateMD5Hash((BYTE
*)sharedSecret
, strlen(sharedSecret
), hash
);
750 CalculateMD5Hash((BYTE
*)g_szSharedSecret
, strlen(g_szSharedSecret
), hash
);
752 if (!memcmp(szSecret
, hash
, MD5_DIGEST_SIZE
))
754 m_authenticated
= true;
755 pMsg
->setField(VID_RCC
, ERR_SUCCESS
);
759 nxlog_write(MSG_AUTH_FAILED
, EVENTLOG_WARNING_TYPE
, "Is", &m_serverAddr
, _T("MD5"));
760 pMsg
->setField(VID_RCC
, ERR_AUTH_FAILED
);
764 pRequest
->getFieldAsBinary(VID_SHARED_SECRET
, (BYTE
*)szSecret
, SHA1_DIGEST_SIZE
);
767 char sharedSecret
[256];
768 WideCharToMultiByte(CP_ACP
, WC_COMPOSITECHECK
| WC_DEFAULTCHAR
, g_szSharedSecret
, -1, sharedSecret
, 256, NULL
, NULL
);
769 sharedSecret
[255] = 0;
770 CalculateSHA1Hash((BYTE
*)sharedSecret
, strlen(sharedSecret
), hash
);
773 CalculateSHA1Hash((BYTE
*)g_szSharedSecret
, strlen(g_szSharedSecret
), hash
);
775 if (!memcmp(szSecret
, hash
, SHA1_DIGEST_SIZE
))
777 m_authenticated
= true;
778 pMsg
->setField(VID_RCC
, ERR_SUCCESS
);
782 nxlog_write(MSG_AUTH_FAILED
, EVENTLOG_WARNING_TYPE
, "Is", &m_serverAddr
, _T("SHA1"));
783 pMsg
->setField(VID_RCC
, ERR_AUTH_FAILED
);
787 pMsg
->setField(VID_RCC
, ERR_NOT_IMPLEMENTED
);
794 * Get parameter's value
796 void CommSession::getParameter(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
798 TCHAR szParameter
[MAX_PARAM_NAME
], szValue
[MAX_RESULT_LENGTH
];
801 pRequest
->getFieldAsString(VID_PARAMETER
, szParameter
, MAX_PARAM_NAME
);
802 dwErrorCode
= GetParameterValue(m_dwIndex
, szParameter
, szValue
, this);
803 pMsg
->setField(VID_RCC
, dwErrorCode
);
804 if (dwErrorCode
== ERR_SUCCESS
)
805 pMsg
->setField(VID_VALUE
, szValue
);
811 void CommSession::getList(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
813 TCHAR szParameter
[MAX_PARAM_NAME
];
814 pRequest
->getFieldAsString(VID_PARAMETER
, szParameter
, MAX_PARAM_NAME
);
817 UINT32 dwErrorCode
= GetListValue(m_dwIndex
, szParameter
, &value
, this);
818 pMsg
->setField(VID_RCC
, dwErrorCode
);
819 if (dwErrorCode
== ERR_SUCCESS
)
821 value
.fillMessage(pMsg
, VID_ENUM_VALUE_BASE
, VID_NUM_STRINGS
);
828 void CommSession::getTable(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
830 TCHAR szParameter
[MAX_PARAM_NAME
];
832 pRequest
->getFieldAsString(VID_PARAMETER
, szParameter
, MAX_PARAM_NAME
);
835 UINT32 dwErrorCode
= GetTableValue(m_dwIndex
, szParameter
, &value
, this);
836 pMsg
->setField(VID_RCC
, dwErrorCode
);
837 if (dwErrorCode
== ERR_SUCCESS
)
839 value
.fillMessage(*pMsg
, 0, -1); // no row limit
844 * Perform action on request
846 void CommSession::action(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
848 if ((g_dwFlags
& AF_ENABLE_ACTIONS
) && m_controlServer
)
850 // Get action name and arguments
851 TCHAR action
[MAX_PARAM_NAME
];
852 pRequest
->getFieldAsString(VID_ACTION_NAME
, action
, MAX_PARAM_NAME
);
854 int numArgs
= pRequest
->getFieldAsInt32(VID_NUM_ARGS
);
855 StringList
*args
= new StringList
;
856 for(int i
= 0; i
< numArgs
; i
++)
857 args
->addPreallocated(pRequest
->getFieldAsString(VID_ACTION_ARG_BASE
+ i
));
860 if (pRequest
->getFieldAsBoolean(VID_RECEIVE_OUTPUT
))
862 UINT32 rcc
= ExecActionWithOutput(this, pRequest
->getId(), action
, args
);
863 pMsg
->setField(VID_RCC
, rcc
);
867 UINT32 rcc
= ExecAction(action
, args
, this);
868 pMsg
->setField(VID_RCC
, rcc
);
874 pMsg
->setField(VID_RCC
, ERR_ACCESS_DENIED
);
879 * Prepare for receiving file
881 void CommSession::recvFile(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
883 TCHAR szFileName
[MAX_PATH
], szFullPath
[MAX_PATH
];
888 pRequest
->getFieldAsString(VID_FILE_NAME
, szFileName
, MAX_PATH
);
889 DebugPrintf(m_dwIndex
, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName
);
890 BuildFullPath(szFileName
, szFullPath
);
892 // Check if for some reason we have already opened file
893 pMsg
->setField(VID_RCC
, openFile(szFullPath
, pRequest
->getId()));
897 pMsg
->setField(VID_RCC
, ERR_ACCESS_DENIED
);
902 * Open file for writing
904 UINT32
CommSession::openFile(TCHAR
*szFullPath
, UINT32 requestId
)
906 if (m_hCurrFile
!= -1)
908 return ERR_RESOURCE_BUSY
;
912 DebugPrintf(m_dwIndex
, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath
);
913 m_hCurrFile
= _topen(szFullPath
, O_CREAT
| O_TRUNC
| O_WRONLY
| O_BINARY
, 0600);
914 if (m_hCurrFile
== -1)
916 DebugPrintf(m_dwIndex
, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath
, _tcserror(errno
));
917 return ERR_IO_FAILURE
;
921 m_fileRqId
= requestId
;
928 * Progress callback for file sending
930 static void SendFileProgressCallback(INT64 bytesTransferred
, void *cbArg
)
932 ((CommSession
*)cbArg
)->updateTimeStamp();
936 * Send file to server
938 bool CommSession::sendFile(UINT32 requestId
, const TCHAR
*file
, long offset
)
940 return SendFileOverNXCP(m_hSocket
, requestId
, file
, m_pCtx
, offset
, SendFileProgressCallback
, this, m_socketWriteMutex
) ? true : false;
944 * Upgrade agent from package in the file store
946 UINT32
CommSession::upgrade(NXCPMessage
*pRequest
)
950 TCHAR szPkgName
[MAX_PATH
], szFullPath
[MAX_PATH
];
953 pRequest
->getFieldAsString(VID_FILE_NAME
, szPkgName
, MAX_PATH
);
954 BuildFullPath(szPkgName
, szFullPath
);
956 //Create line in registry file with upgrade file name to delete it after system start
957 DB_HANDLE hdb
= GetLocalDatabaseHandle();
960 TCHAR upgradeFileInsert
[256];
961 _sntprintf(upgradeFileInsert
, 256, _T("INSERT INTO registry (attribute,value) VALUES ('upgrade.file',%s)"), szPkgName
);
962 DBQuery(hdb
, upgradeFileInsert
);
965 return UpgradeAgent(szFullPath
);
969 return ERR_ACCESS_DENIED
;
974 * Get agent's configuration file
976 void CommSession::getConfig(NXCPMessage
*pMsg
)
980 pMsg
->setField(VID_RCC
,
981 pMsg
->setFieldFromFile(VID_CONFIG_FILE
, g_szConfigFile
) ? ERR_SUCCESS
: ERR_IO_FAILURE
);
985 pMsg
->setField(VID_RCC
, ERR_ACCESS_DENIED
);
990 * Update agent's configuration file
992 void CommSession::updateConfig(NXCPMessage
*pRequest
, NXCPMessage
*pMsg
)
1000 if (pRequest
->isFieldExist(VID_CONFIG_FILE
))
1002 size
= pRequest
->getFieldAsBinary(VID_CONFIG_FILE
, NULL
, 0);
1003 pConfig
= (BYTE
*)malloc(size
);
1004 pRequest
->getFieldAsBinary(VID_CONFIG_FILE
, pConfig
, size
);
1005 hFile
= _topen(g_szConfigFile
, O_CREAT
| O_TRUNC
| O_WRONLY
, 0644);
1010 for(UINT32 i
= 0; i
< size
- 1; i
++)
1011 if (pConfig
[i
] == 0x0D)
1014 memmove(&pConfig
[i
], &pConfig
[i
+ 1], size
- i
);
1018 if (write(hFile
, pConfig
, size
) == size
)
1019 pMsg
->setField(VID_RCC
, ERR_SUCCESS
);
1021 pMsg
->setField(VID_RCC
, ERR_IO_FAILURE
);
1026 DebugPrintf(m_dwIndex
, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
1027 g_szConfigFile
, _tcserror(errno
));
1028 pMsg
->setField(VID_RCC
, ERR_FILE_OPEN_ERROR
);
1034 pMsg
->setField(VID_RCC
, ERR_MALFORMED_COMMAND
);
1039 pMsg
->setField(VID_RCC
, ERR_ACCESS_DENIED
);
1044 * Setup proxy connection
1046 UINT32
CommSession::setupProxyConnection(NXCPMessage
*pRequest
)
1048 UINT32 dwResult
, dwAddr
;
1050 struct sockaddr_in sa
;
1051 NXCPEncryptionContext
*pSavedCtx
;
1054 if (m_masterServer
&& (g_dwFlags
& AF_ENABLE_PROXY
))
1056 dwAddr
= pRequest
->getFieldAsUInt32(VID_IP_ADDRESS
);
1057 wPort
= pRequest
->getFieldAsUInt16(VID_AGENT_PORT
);
1058 m_hProxySocket
= socket(AF_INET
, SOCK_STREAM
, 0);
1059 if (m_hProxySocket
!= INVALID_SOCKET
)
1061 // Fill in address structure
1062 memset(&sa
, 0, sizeof(sa
));
1063 sa
.sin_addr
.s_addr
= htonl(dwAddr
);
1064 sa
.sin_family
= AF_INET
;
1065 sa
.sin_port
= htons(wPort
);
1066 if (connect(m_hProxySocket
, (struct sockaddr
*)&sa
, sizeof(sa
)) != -1)
1069 NXCP_MESSAGE
*pRawMsg
;
1071 // Stop writing thread
1072 m_sendQueue
->put(INVALID_POINTER_VALUE
);
1074 // Wait while all queued messages will be sent
1075 while(m_sendQueue
->size() > 0)
1078 // Finish proxy connection setup
1080 m_pCtx
= PROXY_ENCRYPTION_CTX
;
1081 m_proxyConnection
= true;
1082 dwResult
= ERR_SUCCESS
;
1083 m_hProxyReadThread
= ThreadCreateEx(proxyReadThreadStarter
, 0, this);
1085 // Send confirmation message
1086 // We cannot use sendMessage() and writing thread, because
1087 // encryption context already overriden, and writing thread
1089 msg
.setCode(CMD_REQUEST_COMPLETED
);
1090 msg
.setId(pRequest
->getId());
1091 msg
.setField(VID_RCC
, RCC_SUCCESS
);
1092 pRawMsg
= msg
.createMessage();
1093 sendRawMessage(pRawMsg
, pSavedCtx
);
1094 if (pSavedCtx
!= NULL
)
1095 pSavedCtx
->decRefCount();
1097 DebugPrintf(m_dwIndex
, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr
, szBuffer
), wPort
);
1101 dwResult
= ERR_CONNECT_FAILED
;
1106 dwResult
= ERR_SOCKET_ERROR
;
1111 dwResult
= ERR_ACCESS_DENIED
;
1117 * Proxy reading thread
1119 void CommSession::proxyReadThread()
1129 FD_SET(m_hProxySocket
, &rdfs
);
1131 tv
.tv_usec
= 5000000; // Half-second timeout
1132 nRet
= select(SELECT_NFDS(m_hProxySocket
+ 1), &rdfs
, NULL
, NULL
, &tv
);
1137 nRet
= recv(m_hProxySocket
, buffer
, 32768, 0);
1140 SendEx(m_hSocket
, buffer
, nRet
, 0, m_socketWriteMutex
);
1147 * Wait for request completion
1149 UINT32
CommSession::doRequest(NXCPMessage
*msg
, UINT32 timeout
)
1152 NXCPMessage
*response
= m_responseQueue
->waitForMessage(CMD_REQUEST_COMPLETED
, msg
->getId(), timeout
);
1154 if (response
!= NULL
)
1156 rcc
= response
->getFieldAsUInt32(VID_RCC
);
1161 rcc
= ERR_REQUEST_TIMEOUT
;
1167 * Wait for request completion
1169 NXCPMessage
*CommSession::doRequestEx(NXCPMessage
*msg
, UINT32 timeout
)
1172 return m_responseQueue
->waitForMessage(CMD_REQUEST_COMPLETED
, msg
->getId(), timeout
);
1176 * Generate new request ID
1178 UINT32
CommSession::generateRequestId()
1180 return (UINT32
)InterlockedIncrement(&m_requestId
);