- Thread functions was changed
authorVictor Kirhenshtein <victor@netxms.org>
Sun, 19 Sep 2004 20:59:32 +0000 (20:59 +0000)
committerVictor Kirhenshtein <victor@netxms.org>
Sun, 19 Sep 2004 20:59:32 +0000 (20:59 +0000)
- Added functions ThreadCreateEx() and ThreadJoin()
- Minor bugfixes

27 files changed:
include/nms_threads.h
include/nxcscpapi.h
src/agent/core/comm.cpp
src/agent/core/nxagentd.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/libnetxms/config.cpp
src/libnxcl/comm.cpp
src/libnxcl/objects.cpp
src/server/core/admin.cpp
src/server/core/client.cpp
src/server/core/datacoll.cpp
src/server/core/dbwrite.cpp
src/server/core/discovery.cpp
src/server/core/email.cpp
src/server/core/evproc.cpp
src/server/core/hk.cpp
src/server/core/main.cpp
src/server/core/nms_core.h
src/server/core/node.cpp
src/server/core/np.cpp
src/server/core/session.cpp
src/server/core/status.cpp
src/server/core/syncer.cpp
src/server/core/watchdog.cpp
src/server/include/nxsrvapi.h
src/server/libnxsrv/agent.cpp

index b85ca9c..c19f5c8 100644 (file)
 
 #define INVALID_MUTEX_HANDLE        INVALID_HANDLE_VALUE
 #define INVALID_CONDITION_HANDLE    INVALID_HANDLE_VALUE
+#define INVALID_THREAD_HANDLE       (NULL)
+
+typedef unsigned int THREAD_RESULT;
+
+#define THREAD_OK       0
+#define THREAD_CALL     __stdcall
 
 
 //
@@ -53,9 +59,22 @@ inline void ThreadSleepMs(DWORD dwMilliseconds)
    Sleep(dwMilliseconds);
 }
 
-inline THREAD ThreadCreate(void (__cdecl *start_address )(void *), int stack_size, void *args)
+inline BOOL ThreadCreate(THREAD_RESULT (THREAD_CALL *start_address )(void *), int stack_size, void *args)
 {
-   return (THREAD)_beginthread(start_address, stack_size, args);
+   HANDLE hThread;
+   unsigned int dwThreadId;
+
+   hThread = (HANDLE)_beginthreadex(NULL, stack_size, start_address, args, 0, &dwThreadId);
+   if (hThread != NULL)
+      CloseHandle(hThread);
+   return (hThread != NULL);
+}
+
+inline THREAD ThreadCreateEx(THREAD_RESULT (THREAD_CALL *start_address )(void *), int stack_size, void *args)
+{
+   unsigned int dwThreadId;
+
+   return (HANDLE)_beginthreadex(NULL, stack_size, start_address, args, 0, &dwThreadId);
 }
 
 inline void ThreadExit(void)
@@ -63,6 +82,15 @@ inline void ThreadExit(void)
    _endthread();
 }
 
+inline void ThreadJoin(THREAD hThread)
+{
+   if (hThread != INVALID_THREAD_HANDLE)
+   {
+      WaitForSingleObject(hThread, INFINITE);
+      CloseHandle(hThread);
+   }
+}
+
 inline MUTEX MutexCreate(void)
 {
    return CreateMutex(NULL, FALSE, NULL);
@@ -129,11 +157,18 @@ typedef struct condition_t * CONDITION;
 
 #define INVALID_MUTEX_HANDLE        (NULL)
 #define INVALID_CONDITION_HANDLE    (NULL)
+#define INVALID_THREAD_HANDLE       0
 
 #ifndef INFINITE
 # define INFINITE 0
 #endif
 
+typedef void *THREAD_RESULT;
+
+#define THREAD_OK       ((void *)0)
+#define THREAD_CALL
+
+
 //
 // Inline functions
 //
@@ -158,18 +193,32 @@ inline void ThreadSleepMs(DWORD dwMilliseconds)
    usleep(dwMilliseconds * 1000);   // Convert to microseconds
 }
 
-inline THREAD ThreadCreate(void (*start_address )(void *), int stack_size, void *args)
+inline BOOL ThreadCreate(THREAD_RESULT (THREAD_CALL *start_address )(void *), int stack_size, void *args)
 {
        THREAD id;
 
-       if (pthread_create(&id, NULL, (void *(*)(void *))start_address, args) == 0) 
+       if (pthread_create(&id, NULL, start_address, args) == 0) 
    {
       pthread_detach(id);
+               return TRUE;
+       } 
+   else 
+   {
+               return FALSE;
+       }
+}
+
+inline THREAD ThreadCreateEx(THREAD_RESULT (THREAD_CALL *start_address )(void *), int stack_size, void *args)
+{
+       THREAD id;
+
+       if pthread_create(&id, NULL, start_address, args) == 0) 
+   {
                return id;
        } 
    else 
    {
-               return 0;
+               return INVALID_THREAD_HANDLE;
        }
 }
 
@@ -178,6 +227,12 @@ inline void ThreadExit(void)
    pthread_exit(NULL);
 }
 
+inline void ThreadJoin(THREAD hThread)
+{
+   if (hThread != INVALID_THREAD_HANDLE)
+      pthread_join(hThread);
+}
+
 inline MUTEX MutexCreate(void)
 {
    MUTEX mutex;
index 4cba326..49d3d84 100644 (file)
@@ -129,19 +129,19 @@ class LIBNXCSCP_EXPORTABLE MsgWaitQueue
 {
 private:
    MUTEX m_hMutexDataAccess;
-   MUTEX m_hMutexIsRunning;
    CONDITION m_hStopCondition;
    DWORD m_dwMsgHoldTime;
    DWORD m_dwNumElements;
    WAIT_QUEUE_ELEMENT *m_pElements;
    BOOL m_bIsRunning;
+   THREAD m_hHkThread;
 
    void Lock(void) { MutexLock(m_hMutexDataAccess, INFINITE); }
    void Unlock(void) { MutexUnlock(m_hMutexDataAccess); }
    void HousekeeperThread(void);
    void *WaitForMessageInternal(WORD wIsBinary, WORD wCode, DWORD dwId, DWORD dwTimeOut);
    
-   static void MWQThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL MWQThreadStarter(void *);
 
 public:
    MsgWaitQueue();
index c7c307f..14c8bed 100644 (file)
@@ -90,7 +90,7 @@ static BOOL RegisterSession(CommSession *pSession)
 // Unregister session
 //
 
-static void UnregisterSession(DWORD dwIndex)
+void UnregisterSession(DWORD dwIndex)
 {
    MutexLock(m_hSessionListAccess, INFINITE);
    m_pSessionList[dwIndex] = NULL;
@@ -99,46 +99,10 @@ static void UnregisterSession(DWORD dwIndex)
 
 
 //
-// Client communication read thread
-//
-
-static void ReadThread(void *pArg)
-{
-   ((CommSession *)pArg)->ReadThread();
-
-   // When CommSession::ReadThread exits, all other session
-   // threads are already stopped, so we can safely destroy
-   // session object
-   UnregisterSession(((CommSession *)pArg)->GetIndex());
-   delete (CommSession *)pArg;
-}
-
-
-//
-// Client communication write thread
-//
-
-static void WriteThread(void *pArg)
-{
-   ((CommSession *)pArg)->WriteThread();
-}
-
-
-//
-// Received message processing thread
-//
-
-static void ProcessingThread(void *pArg)
-{
-   ((CommSession *)pArg)->ProcessingThread();
-}
-
-
-//
 // TCP/IP Listener
 //
 
-void ListenerThread(void *)
+THREAD_RESULT THREAD_CALL ListenerThread(void *)
 {
    SOCKET hSocket, hClientSocket;
    struct sockaddr_in servAddr;
@@ -209,9 +173,7 @@ void ListenerThread(void *)
          }
          else
          {
-            ThreadCreate(ReadThread, 0, (void *)pSession);
-            ThreadCreate(WriteThread, 0, (void *)pSession);
-            ThreadCreate(ProcessingThread, 0, (void *)pSession);
+            pSession->Run();
          }
       }
       else     // Unauthorized connection
@@ -224,4 +186,5 @@ void ListenerThread(void *)
    }
 
    MutexDestroy(m_hSessionListAccess);
+   return THREAD_OK;
 }
index 4c322ae..fb08afe 100644 (file)
@@ -35,7 +35,7 @@
 // Externals
 //
 
-void ListenerThread(void *);
+THREAD_RESULT THREAD_CALL ListenerThread(void *);
 
 
 //
index 52266e1..63dbd90 100644 (file)
@@ -209,8 +209,8 @@ private:
    Queue *m_pSendQueue;
    Queue *m_pMessageQueue;
    CSCP_BUFFER *m_pMsgBuffer;
-   MUTEX m_mutexWriteThreadRunning;
-   MUTEX m_mutexProcessingThreadRunning;
+   THREAD m_hWriteThread;
+   THREAD m_hProcessingThread;
    DWORD m_dwHostAddr;        // IP address of connected host (network byte order)
    DWORD m_dwIndex;
    BOOL m_bIsAuthenticated;
@@ -220,13 +220,19 @@ private:
    void GetList(CSCPMessage *pRequest, CSCPMessage *pMsg);
    void Action(CSCPMessage *pRequest, CSCPMessage *pMsg);
 
+   void ReadThread(void);
+   void WriteThread(void);
+   void ProcessingThread(void);
+
+   static THREAD_RESULT THREAD_CALL ReadThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL WriteThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL ProcessingThreadStarter(void *);
+
 public:
    CommSession(SOCKET hSocket, DWORD dwHostAddr);
    ~CommSession();
 
-   void ReadThread(void);
-   void WriteThread(void);
-   void ProcessingThread(void);
+   void Run(void);
 
    void SendMessage(CSCPMessage *pMsg) { m_pSendQueue->Put(pMsg->CreateMessage()); }
 
index 9128de6..bd36d38 100644 (file)
 
 
 //
+// Externals
+//
+
+void UnregisterSession(DWORD dwIndex);
+
+
+//
 // Constants
 //
 
 
 
 //
+// Client communication read thread
+//
+
+THREAD_RESULT THREAD_CALL CommSession::ReadThreadStarter(void *pArg)
+{
+   ((CommSession *)pArg)->ReadThread();
+
+   // When CommSession::ReadThread exits, all other session
+   // threads are already stopped, so we can safely destroy
+   // session object
+   UnregisterSession(((CommSession *)pArg)->GetIndex());
+   delete (CommSession *)pArg;
+   return THREAD_OK;
+}
+
+
+//
+// Client communication write thread
+//
+
+THREAD_RESULT THREAD_CALL CommSession::WriteThreadStarter(void *pArg)
+{
+   ((CommSession *)pArg)->WriteThread();
+   return THREAD_OK;
+}
+
+
+//
+// Received message processing thread
+//
+
+THREAD_RESULT THREAD_CALL CommSession::ProcessingThreadStarter(void *pArg)
+{
+   ((CommSession *)pArg)->ProcessingThread();
+   return THREAD_OK;
+}
+
+
+//
 // Client session class constructor
 //
 
@@ -41,8 +87,8 @@ CommSession::CommSession(SOCKET hSocket, DWORD dwHostAddr)
    m_hSocket = hSocket;
    m_dwIndex = INVALID_INDEX;
    m_pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
-   m_mutexWriteThreadRunning = MutexCreate();
-   m_mutexProcessingThreadRunning = MutexCreate();
+   m_hWriteThread = INVALID_THREAD_HANDLE;
+   m_hProcessingThread = INVALID_THREAD_HANDLE;
    m_dwHostAddr = dwHostAddr;
    m_bIsAuthenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? FALSE : TRUE;
 }
@@ -59,8 +105,18 @@ CommSession::~CommSession()
    delete m_pSendQueue;
    delete m_pMessageQueue;
    safe_free(m_pMsgBuffer);
-   MutexDestroy(m_mutexWriteThreadRunning);
-   MutexDestroy(m_mutexProcessingThreadRunning);
+}
+
+
+//
+// Start all threads
+//
+
+void CommSession::Run(void)
+{
+   m_hWriteThread = ThreadCreateEx(WriteThreadStarter, 0, this);
+   m_hProcessingThread = ThreadCreateEx(ProcessingThreadStarter, 0, this);
+   ThreadCreate(ReadThreadStarter, 0, this);
 }
 
 
@@ -108,11 +164,8 @@ void CommSession::ReadThread(void)
    m_pMessageQueue->Put(INVALID_POINTER_VALUE);
 
    // Wait for other threads to finish
-   MutexLock(m_mutexWriteThreadRunning, INFINITE);
-   MutexUnlock(m_mutexWriteThreadRunning);
-   MutexLock(m_mutexProcessingThreadRunning, INFINITE);
-   MutexUnlock(m_mutexProcessingThreadRunning);
+   ThreadJoin(m_hWriteThread);
+   ThreadJoin(m_hProcessingThread);
 
    DebugPrintf("Session with %s closed", IpToStr(m_dwHostAddr, szBuffer));
 }
@@ -127,7 +180,6 @@ void CommSession::WriteThread(void)
    CSCP_MESSAGE *pMsg;
    char szBuffer[128];
 
-   MutexLock(m_mutexWriteThreadRunning, INFINITE);
    while(1)
    {
       pMsg = (CSCP_MESSAGE *)m_pSendQueue->GetOrBlock();
@@ -142,7 +194,6 @@ void CommSession::WriteThread(void)
       }
       free(pMsg);
    }
-   MutexUnlock(m_mutexWriteThreadRunning);
 }
 
 
@@ -156,7 +207,6 @@ void CommSession::ProcessingThread(void)
    char szBuffer[128];
    CSCPMessage msg;
 
-   MutexLock(m_mutexProcessingThreadRunning, INFINITE);
    while(1)
    {
       pMsg = (CSCPMessage *)m_pMessageQueue->GetOrBlock();
@@ -203,7 +253,6 @@ void CommSession::ProcessingThread(void)
       SendMessage(&msg);
       msg.DeleteAllVariables();
    }
-   MutexUnlock(m_mutexProcessingThreadRunning);
 }
 
 
index b92f6a8..0aedd58 100644 (file)
@@ -64,8 +64,7 @@ DWORD LIBNETXMS_EXPORTABLE NxLoadConfig(char *pszFileName, char *pszSection,
       // Check if it's a section name
       if (szBuffer[0] == '*')
       {
-         if (!stricmp(&szBuffer[1], pszSection))
-            bActiveSection = TRUE;
+         bActiveSection = !stricmp(&szBuffer[1], pszSection);
       }
       else
       {
index 6a5a3ca..1c1a1ea 100644 (file)
@@ -72,7 +72,7 @@ BOOL SendMsg(CSCPMessage *pMsg)
 // Network receiver thread
 //
 
-static void NetReceiver(void *pArg)
+static THREAD_RESULT THREAD_CALL NetReceiver(void *pArg)
 {
    CSCPMessage *pMsg;
    CSCP_MESSAGE *pRawMsg;
@@ -201,6 +201,7 @@ static void NetReceiver(void *pArg)
    while(recv(m_hSocket, szBuffer, 128, 0) > 0);
    shutdown(m_hSocket, SHUT_RD);
    closesocket(m_hSocket);
+   return THREAD_OK;
 }
 
 
index 0fb1095..a3457cf 100644 (file)
@@ -500,7 +500,7 @@ DWORD LIBNXCL_EXPORTABLE NXCCreateObject(NXC_OBJECT_CREATE_INFO *pCreateInfo, DW
    {
       case OBJECT_NODE:
          msg.SetVariable(VID_IP_ADDRESS, pCreateInfo->cs.node.dwIpAddr);
-         msg.SetVariable(VID_IP_ADDRESS, pCreateInfo->cs.node.dwNetMask);
+         msg.SetVariable(VID_IP_NETMASK, pCreateInfo->cs.node.dwNetMask);
          break;
       case OBJECT_CONTAINER:
          msg.SetVariable(VID_CATEGORY, pCreateInfo->cs.container.dwCategory);
index bd6702a..f222925 100644 (file)
@@ -83,7 +83,7 @@ static BOOL SendString(SOCKET sock, char *szString)
 // Request processing thread
 //
 
-static void ProcessingThread(void *pArg)
+static THREAD_RESULT THREAD_CALL ProcessingThread(void *pArg)
 {
    SOCKET sock = (SOCKET)pArg;
    WORD wCmd;
@@ -171,6 +171,7 @@ static void ProcessingThread(void *pArg)
 close_connection:
    shutdown(sock, 2);
    closesocket(sock);
+   return THREAD_OK;
 }
 
 
@@ -178,7 +179,7 @@ close_connection:
 // Local administrative interface listener thread
 //
 
-void LocalAdminListener(void *pArg)
+THREAD_RESULT THREAD_CALL LocalAdminListener(void *pArg)
 {
    SOCKET sock, sockClient;
    struct sockaddr_in servAddr;
@@ -189,7 +190,7 @@ void LocalAdminListener(void *pArg)
    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
       WriteLog(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", "LocalAdminListener");
-      return;
+      return THREAD_OK;
    }
 
    // Fill in local address structure
@@ -240,4 +241,5 @@ void LocalAdminListener(void *pArg)
    }
 
    closesocket(sock);
+   return THREAD_OK;
 }
index c0a9986..7998324 100644 (file)
@@ -66,7 +66,7 @@ static BOOL RegisterSession(ClientSession *pSession)
 // Unregister session
 //
 
-static void UnregisterSession(DWORD dwIndex)
+void UnregisterSession(DWORD dwIndex)
 {
    MutexLock(m_hSessionListAccess, INFINITE);
    m_pSessionList[dwIndex] = NULL;
@@ -75,56 +75,10 @@ static void UnregisterSession(DWORD dwIndex)
 
 
 //
-// Client communication read thread
-//
-
-static void ReadThread(void *pArg)
-{
-   ((ClientSession *)pArg)->ReadThread();
-
-   // When ClientSession::ReadThread exits, all other session
-   // threads are already stopped, so we can safely destroy
-   // session object
-   UnregisterSession(((ClientSession *)pArg)->GetIndex());
-   delete (ClientSession *)pArg;
-}
-
-
-//
-// Client communication write thread
-//
-
-static void WriteThread(void *pArg)
-{
-   ((ClientSession *)pArg)->WriteThread();
-}
-
-
-//
-// Received message processing thread
-//
-
-static void ProcessingThread(void *pArg)
-{
-   ((ClientSession *)pArg)->ProcessingThread();
-}
-
-
-//
-// Information update processing thread
-//
-
-static void UpdateThread(void *pArg)
-{
-   ((ClientSession *)pArg)->UpdateThread();
-}
-
-
-//
 // Listener thread
 //
 
-void ClientListener(void *)
+THREAD_RESULT THREAD_CALL ClientListener(void *)
 {
    SOCKET sock, sockClient;
    struct sockaddr_in servAddr;
@@ -140,7 +94,7 @@ void ClientListener(void *)
    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
       WriteLog(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", "ClientListener");
-      return;
+      return THREAD_OK;
    }
 
    // Create session list access mutex
@@ -197,14 +151,12 @@ void ClientListener(void *)
       }
       else
       {
-         ThreadCreate(ReadThread, 0, (void *)pSession);
-         ThreadCreate(WriteThread, 0, (void *)pSession);
-         ThreadCreate(ProcessingThread, 0, (void *)pSession);
-         ThreadCreate(UpdateThread, 0, (void *)pSession);
+         pSession->Run();
       }
    }
 
    closesocket(sock);
+   return THREAD_OK;
 }
 
 
index dd8e889..c8374a5 100644 (file)
@@ -34,7 +34,7 @@ static Queue *m_pItemQueue = NULL;
 // Data collector
 //
 
-static void DataCollector(void *pArg)
+static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
 {
    DCItem *pItem;
    Node *pNode;
@@ -96,6 +96,7 @@ static void DataCollector(void *pArg)
 
    free(pBuffer);
    DbgPrintf(AF_DEBUG_DC, "Data collector thread terminated\n");
+   return THREAD_OK;
 }
 
 
@@ -104,7 +105,7 @@ static void DataCollector(void *pArg)
 // data collector queue when data polling required
 //
 
-static void ItemPoller(void *pArg)
+static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
 {
    DWORD i, dwElapsed, dwWatchdogId;
    INT64 qwStart;
@@ -126,6 +127,7 @@ static void ItemPoller(void *pArg)
       dwElapsed = (DWORD)(GetCurrentTimeMs() - qwStart);
    }
    DbgPrintf(AF_DEBUG_DC, "Item poller thread terminated\n");
+   return THREAD_OK;
 }
 
 
@@ -133,7 +135,7 @@ static void ItemPoller(void *pArg)
 // Statistics collection thread
 //
 
-static void StatCollector(void *pArg)
+static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
 {
    while(!ShutdownInProgress())
    {
@@ -146,6 +148,7 @@ static void StatCollector(void *pArg)
 //         printf("*** DB Writer Queue size: %d ***\n", g_pLazyRequestQueue->Size());
       }
    }
+   return THREAD_OK;
 }
 
 
index 2345b81..3be0e63 100644 (file)
@@ -34,7 +34,7 @@ Queue *g_pLazyRequestQueue = NULL;
 // Static data
 //
 
-static MUTEX m_mutexWriteThreadRunning = NULL;
+static THREAD m_hWriteThread = INVALID_THREAD_HANDLE;
 
 
 //
@@ -51,12 +51,10 @@ void QueueSQLRequest(char *szQuery)
 // Database "lazy" write thread
 //
 
-void DBWriteThread(void *pArg)
+static THREAD_RESULT THREAD_CALL DBWriteThread(void *pArg)
 {
    char *pQuery;
 
-   m_mutexWriteThreadRunning = MutexCreate();
-   MutexLock(m_mutexWriteThreadRunning, INFINITE);
    while(1)
    {
       pQuery = (char *)g_pLazyRequestQueue->GetOrBlock();
@@ -66,7 +64,17 @@ void DBWriteThread(void *pArg)
       DBQuery(g_hCoreDB, pQuery);
       free(pQuery);
    }
-   MutexUnlock(m_mutexWriteThreadRunning);
+   return THREAD_OK;
+}
+
+
+//
+// Start writer thread
+//
+
+void StartDBWriter(void)
+{
+   m_hWriteThread = ThreadCreateEx(DBWriteThread, 0, NULL);
 }
 
 
@@ -77,10 +85,5 @@ void DBWriteThread(void *pArg)
 void StopDBWriter(void)
 {
    g_pLazyRequestQueue->Put(INVALID_POINTER_VALUE);
-   if (m_mutexWriteThreadRunning != NULL)
-       {
-      MutexLock(m_mutexWriteThreadRunning, INFINITE);
-      MutexUnlock(m_mutexWriteThreadRunning);
-      MutexDestroy(m_mutexWriteThreadRunning);
-       }
+   ThreadJoin(m_hWriteThread);
 }
index 5b4aa57..6672669 100644 (file)
@@ -75,7 +75,7 @@ void CheckForMgmtNode(void)
 // Network discovery thread
 //
 
-void DiscoveryThread(void *arg)
+THREAD_RESULT THREAD_CALL DiscoveryThread(void *arg)
 {
    DWORD dwNewNodeId = 1, dwWatchdogId;
    Node *pNode;
@@ -135,4 +135,5 @@ void DiscoveryThread(void *arg)
    }
 
    DbgPrintf(AF_DEBUG_DISCOVERY, "Discovery thread terminated\n");
+   return THREAD_OK;
 }
index 6960898..b95e07f 100644 (file)
@@ -267,7 +267,7 @@ static DWORD SendMail(char *pszRcpt, char *pszSubject, char *pszText)
 // Mailer thread
 //
 
-static void MailerThread(void *pArg)
+static THREAD_RESULT THREAD_CALL MailerThread(void *pArg)
 {
    MAIL_ENVELOPE *pEnvelope;
    DWORD dwResult;
@@ -293,6 +293,7 @@ static void MailerThread(void *pArg)
       free(pEnvelope->pszText);
       free(pEnvelope);
    }
+   return THREAD_OK;
 }
 
 
index f4a60aa..92c940f 100644 (file)
@@ -38,7 +38,7 @@ static void BroadcastEvent(ClientSession *pSession, void *pArg)
 // Event processing thread
 //
 
-void EventProcessor(void *arg)
+THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
 {
    Event *pEvent;
 
@@ -83,4 +83,5 @@ void EventProcessor(void *arg)
    }
 
    DbgPrintf(AF_DEBUG_EVENTS, "Event processing thread #%d stopped\n", arg);
+   return THREAD_OK;
 }
index a944630..2ca6d2c 100644 (file)
@@ -90,7 +90,7 @@ static void DeleteEmptySubnets(void)
 // Housekeeper thread
 //
 
-void HouseKeeper(void *pArg)
+THREAD_RESULT THREAD_CALL HouseKeeper(void *pArg)
 {
    time_t currTime;
    char szQuery[256];
@@ -121,4 +121,5 @@ void HouseKeeper(void *pArg)
       // Remove deleted objects which are no longer referenced
       CleanDeletedObjects();
    }
+   return THREAD_OK;
 }
index eb7ae73..d7402ff 100644 (file)
 // Thread functions
 //
 
-void HouseKeeper(void *pArg);
-void DiscoveryThread(void *pArg);
-void Syncer(void *pArg);
-void NodePoller(void *pArg);
-void StatusPoller(void *pArg);
-void ConfigurationPoller(void *pArg);
-void EventProcessor(void *pArg);
-void WatchdogThread(void *pArg);
-void ClientListener(void *pArg);
-void LocalAdminListener(void *pArg);
-void DBWriteThread(void *pArg);
+THREAD_RESULT THREAD_CALL HouseKeeper(void *pArg);
+THREAD_RESULT THREAD_CALL DiscoveryThread(void *pArg);
+THREAD_RESULT THREAD_CALL Syncer(void *pArg);
+THREAD_RESULT THREAD_CALL NodePoller(void *pArg);
+THREAD_RESULT THREAD_CALL StatusPoller(void *pArg);
+THREAD_RESULT THREAD_CALL ConfigurationPoller(void *pArg);
+THREAD_RESULT THREAD_CALL EventProcessor(void *pArg);
+THREAD_RESULT THREAD_CALL WatchdogThread(void *pArg);
+THREAD_RESULT THREAD_CALL ClientListener(void *pArg);
+THREAD_RESULT THREAD_CALL LocalAdminListener(void *pArg);
 
 
 //
@@ -281,7 +280,7 @@ BOOL Initialize(void)
       ThreadCreate(EventProcessor, 0, (void *)(i + 1));
 
    // Start database "lazy" write thread
-   ThreadCreate(DBWriteThread, 0, NULL);
+   StartDBWriter();
 
    // Start local administartive interface listener if required
    if (ConfigReadInt("EnableAdminInterface", 1))
index 2f8ec18..c4fe665 100644 (file)
@@ -226,9 +226,9 @@ private:
    DWORD m_dwSystemAccess;    // User's system access rights
    DWORD m_dwFlags;           // Session flags
    CSCP_BUFFER *m_pMsgBuffer;
-   MUTEX m_mutexWriteThreadRunning;
-   MUTEX m_mutexProcessingThreadRunning;
-   MUTEX m_mutexUpdateThreadRunning;
+   THREAD m_hWriteThread;
+   THREAD m_hProcessingThread;
+   THREAD m_hUpdateThread;
    MUTEX m_mutexSendEvents;
    MUTEX m_mutexSendObjects;
    MUTEX m_mutexSendAlarms;
@@ -241,6 +241,16 @@ private:
    DWORD m_dwRecordsUploaded;
    EPRule **m_ppEPPRuleList;   // List of loaded EPP rules
 
+   static THREAD_RESULT THREAD_CALL ReadThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL WriteThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL ProcessingThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL UpdateThreadStarter(void *);
+
+   void ReadThread(void);
+   void WriteThread(void);
+   void ProcessingThread(void);
+   void UpdateThread(void);
+
    BOOL CheckSysAccessRights(DWORD dwRequiredAccess) 
    { 
       return m_dwUserId == 0 ? TRUE : 
@@ -290,10 +300,7 @@ public:
    ClientSession(SOCKET hSocket, DWORD dwHostAddr);
    ~ClientSession();
 
-   void ReadThread(void);
-   void WriteThread(void);
-   void ProcessingThread(void);
-   void UpdateThread(void);
+   void Run(void);
 
    void SendMessage(CSCPMessage *pMsg) { m_pSendQueue->Put(pMsg->CreateMessage()); }
 
@@ -361,6 +368,7 @@ void DBFreeAsyncResult(DB_ASYNC_RESULT hResult);
 void DBUnloadDriver(void);
 
 void QueueSQLRequest(char *szQuery);
+void StartDBWriter(void);
 void StopDBWriter(void);
 
 void SnmpInit(void);
index 936327a..9e20463 100644 (file)
@@ -723,6 +723,7 @@ void Node::ConfigurationPoll(void)
 
    m_tLastConfigurationPoll = time(NULL);
    PollerUnlock();
+   DbgPrintf(AF_DEBUG_DISCOVERY, "Finished configuration poll for node %s (ID: %d)\n", m_szName, m_dwId);
 
    if (bHasChanges)
    {
index 6e5ad53..41c3988 100644 (file)
@@ -67,7 +67,7 @@ NetObj *PollNewNode(DWORD dwIpAddr, DWORD dwNetMask, DWORD dwFlags)
 // Node poller thread (poll new nodes and put them into the database)
 //
 
-void NodePoller(void *arg)
+THREAD_RESULT THREAD_CALL NodePoller(void *arg)
 {
    DB_RESULT hResult;
    int iPollInterval;
@@ -111,4 +111,5 @@ void NodePoller(void *arg)
       }
    }
    DbgPrintf(AF_DEBUG_DISCOVERY, "Node poller thread terminated\n");
+   return THREAD_OK;
 }
index f8eb57f..2feadf8 100644 (file)
 
 
 //
+// Externals
+//
+
+void UnregisterSession(DWORD dwIndex);
+
+
+//
 // Fill CSCP message with user data
 //
 
@@ -64,6 +71,56 @@ static void FillGroupInfoMessage(CSCPMessage *pMsg, NMS_USER_GROUP *pGroup)
 
 
 //
+// Client communication read thread starter
+//
+
+THREAD_RESULT THREAD_CALL ClientSession::ReadThreadStarter(void *pArg)
+{
+   ((ClientSession *)pArg)->ReadThread();
+
+   // When ClientSession::ReadThread exits, all other session
+   // threads are already stopped, so we can safely destroy
+   // session object
+   UnregisterSession(((ClientSession *)pArg)->GetIndex());
+   delete (ClientSession *)pArg;
+   return THREAD_OK;
+}
+
+
+//
+// Client communication write thread starter
+//
+
+THREAD_RESULT THREAD_CALL ClientSession::WriteThreadStarter(void *pArg)
+{
+   ((ClientSession *)pArg)->WriteThread();
+   return THREAD_OK;
+}
+
+
+//
+// Received message processing thread starter
+//
+
+THREAD_RESULT THREAD_CALL ClientSession::ProcessingThreadStarter(void *pArg)
+{
+   ((ClientSession *)pArg)->ProcessingThread();
+   return THREAD_OK;
+}
+
+
+//
+// Information update processing thread starter
+//
+
+THREAD_RESULT THREAD_CALL ClientSession::UpdateThreadStarter(void *pArg)
+{
+   ((ClientSession *)pArg)->UpdateThread();
+   return THREAD_OK;
+}
+
+
+//
 // Client session class constructor
 //
 
@@ -76,9 +133,9 @@ ClientSession::ClientSession(SOCKET hSocket, DWORD dwHostAddr)
    m_dwIndex = INVALID_INDEX;
    m_iState = STATE_CONNECTED;
    m_pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
-   m_mutexWriteThreadRunning = MutexCreate();
-   m_mutexProcessingThreadRunning = MutexCreate();
-   m_mutexUpdateThreadRunning = MutexCreate();
+   m_hWriteThread = INVALID_THREAD_HANDLE;
+   m_hProcessingThread = INVALID_THREAD_HANDLE;
+   m_hUpdateThread = INVALID_THREAD_HANDLE;
    m_mutexSendEvents = MutexCreate();
    m_mutexSendObjects = MutexCreate();
    m_mutexSendAlarms = MutexCreate();
@@ -105,9 +162,6 @@ ClientSession::~ClientSession()
    delete m_pMessageQueue;
    delete m_pUpdateQueue;
    safe_free(m_pMsgBuffer);
-   MutexDestroy(m_mutexWriteThreadRunning);
-   MutexDestroy(m_mutexProcessingThreadRunning);
-   MutexDestroy(m_mutexUpdateThreadRunning);
    MutexDestroy(m_mutexSendEvents);
    MutexDestroy(m_mutexSendObjects);
    MutexDestroy(m_mutexSendAlarms);
@@ -126,6 +180,19 @@ ClientSession::~ClientSession()
 
 
 //
+// Start all threads
+//
+
+void ClientSession::Run(void)
+{
+   m_hWriteThread = ThreadCreateEx(WriteThreadStarter, 0, this);
+   m_hProcessingThread = ThreadCreateEx(ProcessingThreadStarter, 0, this);
+   m_hUpdateThread = ThreadCreateEx(UpdateThreadStarter, 0, this);
+   ThreadCreate(ReadThreadStarter, 0, this);
+}
+
+
+//
 // Print debug information
 //
 
@@ -192,14 +259,9 @@ void ClientSession::ReadThread(void)
    m_pUpdateQueue->Put(INVALID_POINTER_VALUE);
 
    // Wait for other threads to finish
-   MutexLock(m_mutexWriteThreadRunning, INFINITE);
-   MutexUnlock(m_mutexWriteThreadRunning);
-
-   MutexLock(m_mutexProcessingThreadRunning, INFINITE);
-   MutexUnlock(m_mutexProcessingThreadRunning);
-
-   MutexLock(m_mutexUpdateThreadRunning, INFINITE);
-   MutexUnlock(m_mutexUpdateThreadRunning);
+   ThreadJoin(m_hWriteThread);
+   ThreadJoin(m_hProcessingThread);
+   ThreadJoin(m_hUpdateThread);
 
    // Remove all locks created by this session
    RemoveAllSessionLocks(m_dwIndex);
@@ -224,7 +286,6 @@ void ClientSession::WriteThread(void)
    CSCP_MESSAGE *pMsg;
    char szBuffer[128];
 
-   MutexLock(m_mutexWriteThreadRunning, INFINITE);
    while(1)
    {
       pMsg = (CSCP_MESSAGE *)m_pSendQueue->GetOrBlock();
@@ -239,7 +300,6 @@ void ClientSession::WriteThread(void)
       }
       safe_free(pMsg);
    }
-   MutexUnlock(m_mutexWriteThreadRunning);
 }
 
 
@@ -252,7 +312,6 @@ void ClientSession::UpdateThread(void)
    UPDATE_INFO *pUpdate;
    CSCPMessage msg;
 
-   MutexLock(m_mutexUpdateThreadRunning, INFINITE);
    while(1)
    {
       pUpdate = (UPDATE_INFO *)m_pUpdateQueue->GetOrBlock();
@@ -309,7 +368,6 @@ void ClientSession::UpdateThread(void)
 
       free(pUpdate);
    }
-   MutexUnlock(m_mutexUpdateThreadRunning);
 }
 
 
@@ -322,7 +380,6 @@ void ClientSession::ProcessingThread(void)
    CSCPMessage *pMsg;
    char szBuffer[128];
 
-   MutexLock(m_mutexProcessingThreadRunning, INFINITE);
    while(1)
    {
       pMsg = (CSCPMessage *)m_pMessageQueue->GetOrBlock();
@@ -476,7 +533,6 @@ void ClientSession::ProcessingThread(void)
       }
       delete pMsg;
    }
-   MutexUnlock(m_mutexProcessingThreadRunning);
 }
 
 
@@ -2446,8 +2502,10 @@ void ClientSession::SendAllActions(DWORD dwRqId)
       msg.SetVariable(VID_RCC, RCC_SUCCESS);
       SendMessage(&msg);
       MutexLock(m_mutexSendActions, INFINITE);
+printf("Sending actions...\n");
       SendActionsToClient(this, dwRqId);
       MutexUnlock(m_mutexSendActions);
+printf("Send complete\n");
    }
    else
    {
index c95ab78..0bc260f 100644 (file)
@@ -27,7 +27,7 @@
 // Status poll thread
 //
 
-void StatusPoller(void *arg)
+THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
 {
    Node *pNode;
    DWORD dwWatchdogId;
@@ -51,6 +51,7 @@ void StatusPoller(void *arg)
          }
       }
    }
+   return THREAD_OK;
 }
 
 
@@ -58,7 +59,7 @@ void StatusPoller(void *arg)
 // Configuration poll thread
 //
 
-void ConfigurationPoller(void *arg)
+THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
 {
    Node *pNode;
    DWORD dwWatchdogId;
@@ -82,4 +83,5 @@ void ConfigurationPoller(void *arg)
          }
       }
    }
+   return THREAD_OK;
 }
index 05af99e..09ddbee 100644 (file)
@@ -61,7 +61,7 @@ void SaveObjects(void)
 // Syncer thread
 //
 
-void Syncer(void *arg)
+THREAD_RESULT THREAD_CALL Syncer(void *arg)
 {
    int iSyncInterval;
    DWORD dwWatchdogId;
@@ -81,4 +81,5 @@ void Syncer(void *arg)
       SaveUsers();
    }
    DbgPrintf(AF_DEBUG_HOUSEKEEPER, "Syncer thread terminated\n");
+   return THREAD_OK;
 }
index 760161e..a7666dd 100644 (file)
@@ -121,7 +121,7 @@ void WatchdogPrintStatus(void)
 // Watchdog thread
 //
 
-void WatchdogThread(void *arg)
+THREAD_RESULT THREAD_CALL WatchdogThread(void *arg)
 {
    DWORD i;
    time_t currTime;
@@ -148,4 +148,5 @@ void WatchdogThread(void *arg)
    MutexDestroy(m_mutexWatchdogAccess);
    m_mutexWatchdogAccess = NULL;
    DbgPrintf(AF_DEBUG_MISC, "Watchdog thread terminated\n");
+   return THREAD_OK;
 }
index 87a36c5..cac7d78 100644 (file)
@@ -113,10 +113,10 @@ private:
    MsgWaitQueue *m_pMsgWaitQueue;
    BOOL m_bIsConnected;
    MUTEX m_mutexDataLock;
-   MUTEX m_mutexReceiverThreadRunning;
+   THREAD m_hReceiverThread;
 
    void ReceiverThread(void);
-   static void ReceiverThreadStarter(void *);
+   static THREAD_RESULT THREAD_CALL ReceiverThreadStarter(void *);
 
 protected:
    void DestroyResultData(void);
index 156c594..9f7a038 100644 (file)
 // Receiver thread starter
 //
 
-void AgentConnection::ReceiverThreadStarter(void *pArg)
+THREAD_RESULT THREAD_CALL AgentConnection::ReceiverThreadStarter(void *pArg)
 {
    ((AgentConnection *)pArg)->ReceiverThread();
+   return THREAD_OK;
 }
 
 
@@ -61,7 +62,7 @@ AgentConnection::AgentConnection()
    m_dwCommandTimeout = 10000;   // Default timeout 10 seconds
    m_bIsConnected = FALSE;
    m_mutexDataLock = MutexCreate();
-   m_mutexReceiverThreadRunning = MutexCreate();
+   m_hReceiverThread = INVALID_THREAD_HANDLE;
 }
 
 
@@ -87,7 +88,7 @@ AgentConnection::AgentConnection(DWORD dwAddr, WORD wPort, int iAuthMethod, char
    m_dwCommandTimeout = 10000;   // Default timeout 10 seconds
    m_bIsConnected = FALSE;
    m_mutexDataLock = MutexCreate();
-   m_mutexReceiverThreadRunning = MutexCreate();
+   m_hReceiverThread = INVALID_THREAD_HANDLE;
 }
 
 
@@ -102,8 +103,7 @@ AgentConnection::~AgentConnection()
       closesocket(m_hSocket);
 
    // Wait for receiver thread termination
-   MutexLock(m_mutexReceiverThreadRunning, INFINITE);
-   MutexUnlock(m_mutexReceiverThreadRunning);
+   ThreadJoin(m_hReceiverThread);
 
    Lock();
    DestroyResultData();
@@ -112,7 +112,6 @@ AgentConnection::~AgentConnection()
    delete m_pMsgWaitQueue;
 
    MutexDestroy(m_mutexDataLock);
-   MutexDestroy(m_mutexReceiverThreadRunning);
 }
 
 
@@ -144,8 +143,6 @@ void AgentConnection::ReceiverThread(void)
    int iErr;
    char szBuffer[128];
 
-   MutexLock(m_mutexReceiverThreadRunning, INFINITE);
-
    // Initialize raw message receiving function
    pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
    RecvCSCPMessage(0, NULL, pMsgBuffer, 0);
@@ -194,8 +191,6 @@ void AgentConnection::ReceiverThread(void)
 
    free(pRawMsg);
    free(pMsgBuffer);
-
-   MutexUnlock(m_mutexReceiverThreadRunning);
 }
 
 
@@ -214,6 +209,10 @@ BOOL AgentConnection::Connect(BOOL bVerbose)
    if ((m_bIsConnected) || (m_hSocket != -1))
       return FALSE;
 
+   // Wait for receiver thread from previous connection, if any
+   ThreadJoin(m_hReceiverThread);
+   m_hReceiverThread = INVALID_THREAD_HANDLE;
+
    // Create socket
    m_hSocket = socket(AF_INET, SOCK_STREAM, 0);
    if (m_hSocket == -1)
@@ -237,7 +236,7 @@ BOOL AgentConnection::Connect(BOOL bVerbose)
    }
 
    // Start receiver thread
-   ThreadCreate(ReceiverThreadStarter, 0, this);
+   m_hReceiverThread = ThreadCreateEx(ReceiverThreadStarter, 0, this);
 
    // Authenticate itself to agent
    if ((dwError = Authenticate()) != ERR_SUCCESS)