Minor changes
[public/netxms.git] / src / agent / core / session.cpp
CommitLineData
e9580fef
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003, 2004 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** $module: session.cpp
20**
21**/
22
23#include "nxagentd.h"
24
25
0a147f4b
VK
26//
27// Constants
28//
29
30#define RAW_MSG_SIZE 262144
31
32
e9580fef
VK
33//
34// Client session class constructor
35//
36
37CommSession::CommSession(SOCKET hSocket, DWORD dwHostAddr)
38{
39 m_pSendQueue = new Queue;
40 m_pMessageQueue = new Queue;
41 m_hSocket = hSocket;
42 m_dwIndex = INVALID_INDEX;
43 m_pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
44 m_hCondWriteThreadStopped = ConditionCreate(FALSE);
45 m_hCondProcessingThreadStopped = ConditionCreate(FALSE);
46 m_dwHostAddr = dwHostAddr;
47}
48
49
50//
51// Destructor
52//
53
54CommSession::~CommSession()
55{
56 shutdown(m_hSocket, 2);
57 closesocket(m_hSocket);
58 delete m_pSendQueue;
59 delete m_pMessageQueue;
60 safe_free(m_pMsgBuffer);
61 ConditionDestroy(m_hCondWriteThreadStopped);
62 ConditionDestroy(m_hCondProcessingThreadStopped);
63}
0a147f4b
VK
64
65
66//
67// Reading thread
68//
69
70void CommSession::ReadThread(void)
71{
72 CSCP_MESSAGE *pRawMsg;
73 CSCPMessage *pMsg;
74 int iErr;
75
76 // Initialize raw message receiving function
77 RecvCSCPMessage(0, NULL, m_pMsgBuffer, 0);
78
79 pRawMsg = (CSCP_MESSAGE *)malloc(RAW_MSG_SIZE);
80 while(1)
81 {
82 if ((iErr = RecvCSCPMessage(m_hSocket, pRawMsg, m_pMsgBuffer, RAW_MSG_SIZE)) <= 0)
83 break;
84
85 // Check if message is too large
86 if (iErr == 1)
87 continue;
88
89 // Check that actual received packet size is equal to encoded in packet
90 if ((int)ntohl(pRawMsg->dwSize) != iErr)
91 {
92 DebugPrintf("Actual message size doesn't match wSize value (%d,%d)\n", iErr, ntohl(pRawMsg->dwSize));
93 continue; // Bad packet, wait for next
94 }
95
96 // Create message object from raw message
97 pMsg = new CSCPMessage(pRawMsg);
98 m_pMessageQueue->Put(pMsg);
99 }
100 if (iErr < 0)
101 WriteLog(MSG_SESSION_BROKEN, EVENTLOG_WARNING_TYPE, "e", WSAGetLastError());
102 free(pRawMsg);
103
104 // Notify other threads to exit
105 m_pSendQueue->Put(INVALID_POINTER_VALUE);
106 m_pMessageQueue->Put(INVALID_POINTER_VALUE);
107
108 // Wait for other threads to finish
109 ConditionWait(m_hCondWriteThreadStopped, INFINITE);
110 ConditionWait(m_hCondProcessingThreadStopped, INFINITE);
111}
112
113
114//
115// Writing thread
116//
117
118void CommSession::WriteThread(void)
119{
120 CSCP_MESSAGE *pMsg;
121 char szBuffer[128];
122
123 while(1)
124 {
125 pMsg = (CSCP_MESSAGE *)m_pSendQueue->GetOrBlock();
126 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
127 break;
128
129 DebugPrintf("Sending message %s\n", CSCPMessageCodeName(ntohs(pMsg->wCode), szBuffer));
130 if (send(m_hSocket, (const char *)pMsg, ntohl(pMsg->dwSize), 0) <= 0)
131 {
132 MemFree(pMsg);
133 break;
134 }
135 MemFree(pMsg);
136 }
137 ConditionSet(m_hCondWriteThreadStopped);
138}
139
140
141//
142// Message processing thread
143//
144
145void CommSession::ProcessingThread(void)
146{
147 CSCPMessage *pMsg;
148 char szBuffer[128];
149
150 while(1)
151 {
152 pMsg = (CSCPMessage *)m_pMessageQueue->GetOrBlock();
153 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
154 break;
155
156 DebugPrintf("Received message %s\n", CSCPMessageCodeName(pMsg->GetCode(), szBuffer));
157 delete pMsg;
158 }
159 ConditionSet(m_hCondProcessingThreadStopped);
160}