NamedPipe class splitted into NAmedPipe and NamedPipeListener; nxapush switched to...
authorVictor Kirhenshtein <victor@netxms.org>
Tue, 12 Sep 2017 11:08:16 +0000 (14:08 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Tue, 12 Sep 2017 11:08:16 +0000 (14:08 +0300)
include/nxproc.h
src/agent/core/extagent.cpp
src/agent/core/master.cpp
src/agent/core/nxagentd.h
src/agent/core/push.cpp
src/agent/tools/nxapush/nxapush.cpp
src/libnetxms/nxproc.cpp
src/libnetxms/nxproc_unix.cpp
src/libnetxms/nxproc_win32.cpp [copied from src/libnetxms/nxproc_unix.cpp with 50% similarity]

index d127cf9..e2252f9 100644 (file)
@@ -6,9 +6,32 @@
 /**
  * Max pipe name length
  */
-#define MAX_PIPE_NAME_LEN     64
+#define MAX_PIPE_NAME_LEN     128
 
-class NamedPipe;
+/**
+ * Named pipe class
+ */
+class LIBNETXMS_EXPORTABLE NamedPipe
+{
+   friend class NamedPipeListener;
+
+private:
+   TCHAR m_name[MAX_PIPE_NAME_LEN];
+   HPIPE m_handle;
+   MUTEX m_writeLock;
+
+   NamedPipe(const TCHAR *name, HPIPE handle);
+
+public:
+   ~NamedPipe();
+
+   const TCHAR *name() const { return m_name; }
+   HPIPE handle() { return m_handle; }
+
+   bool write(const void *data, size_t size);
+
+   static NamedPipe *connect(const TCHAR *name, UINT32 timeout = 1000);
+};
 
 /**
  * Client request handler
@@ -16,45 +39,32 @@ class NamedPipe;
 typedef void (*NamedPipeRequestHandler)(NamedPipe *pipe, void *userArg);
 
 /**
- * Named pipe class
+ * Named pipe listener (server)
  */
-class LIBNETXMS_EXPORTABLE NamedPipe
+class LIBNETXMS_EXPORTABLE NamedPipeListener
 {
 private:
    TCHAR m_name[MAX_PIPE_NAME_LEN];
    HPIPE m_handle;
-   bool m_isListener;
    NamedPipeRequestHandler m_reqHandler;
    void *m_userArg;
    THREAD m_serverThread;
    bool m_stop;
 
-   NamedPipe(const TCHAR *name, HPIPE handle, bool listener, NamedPipeRequestHandler reqHandler = NULL, void *userArg = NULL)
-   {
-      nx_strncpy(m_name, name, MAX_PIPE_NAME_LEN);
-      m_handle = handle;
-      m_isListener = listener;
-      m_reqHandler = reqHandler;
-      m_userArg = userArg;
-      m_serverThread = INVALID_THREAD_HANDLE;
-      m_stop = false;
-   }
-
    void serverThread();
    static THREAD_RESULT THREAD_CALL serverThreadStarter(void *arg);
 
+   NamedPipeListener(const TCHAR *name, HPIPE handle, NamedPipeRequestHandler reqHandler, void *userArg);
+
 public:
-   ~NamedPipe();
+   ~NamedPipeListener();
 
    const TCHAR *name() const { return m_name; }
-   HPIPE handle() { return m_handle; }
-   bool isListener() const { return m_isListener; }
 
-   void startServer();
-   void stopServer();
+   void start();
+   void stop();
 
-   static NamedPipe *createListener(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg);
-   static NamedPipe *connectTo(const TCHAR *name);
+   static NamedPipeListener *create(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg);
 };
 
 #endif   /* _nxproc_h_ */
index 1ce1a3a..598d669 100644 (file)
@@ -36,10 +36,9 @@ ExternalSubagent::ExternalSubagent(const TCHAR *name, const TCHAR *user)
        nx_strncpy(m_user, user, MAX_ESA_USER_NAME);
        m_connected = false;
        m_listener = NULL;
-       m_pipe = INVALID_PIPE_HANDLE;
+       m_pipe = NULL;
        m_msgQueue = new MsgWaitQueue();
        m_requestId = 1;
-       m_mutexPipeWrite = MutexCreate();
 }
 
 /**
@@ -48,7 +47,6 @@ ExternalSubagent::ExternalSubagent(const TCHAR *name, const TCHAR *user)
 ExternalSubagent::~ExternalSubagent()
 {
        delete m_msgQueue;
-       MutexDestroy(m_mutexPipeWrite);
        delete m_listener;
 }
 
@@ -57,7 +55,7 @@ ExternalSubagent::~ExternalSubagent()
  */
 static void RequestHandler(NamedPipe *pipe, void *userArg)
 {
-   static_cast<ExternalSubagent*>(userArg)->connect(pipe->handle());
+   static_cast<ExternalSubagent*>(userArg)->connect(pipe);
 }
 
 /**
@@ -67,9 +65,9 @@ void ExternalSubagent::startListener()
 {
    TCHAR name[MAX_PIPE_NAME_LEN];
    _sntprintf(name, MAX_PIPE_NAME_LEN, _T("nxagentd.subagent.%s"), m_name);
-   m_listener = NamedPipe::createListener(name, RequestHandler, this);
+   m_listener = NamedPipeListener::create(name, RequestHandler, this);
    if (m_listener != NULL)
-      m_listener->startServer();
+      m_listener->start();
 }
 
 /*
@@ -81,9 +79,7 @@ bool ExternalSubagent::sendMessage(NXCPMessage *msg)
        AgentWriteDebugLog(6, _T("ExternalSubagent::sendMessage(%s): sending message %s"), m_name, NXCPMessageCodeName(msg->getCode(), buffer));
 
        NXCP_MESSAGE *rawMsg = msg->createMessage();
-       MutexLock(m_mutexPipeWrite);
-   bool success = SendMessageToPipe(m_pipe, rawMsg);
-       MutexUnlock(m_mutexPipeWrite);
+       bool success = (m_pipe != NULL) ? m_pipe->write(rawMsg, ntohl(rawMsg->size)) : false;
        free(rawMsg);
        return success;
 }
@@ -99,15 +95,15 @@ NXCPMessage *ExternalSubagent::waitForMessage(WORD code, UINT32 id)
 /**
  * Main connection thread
  */
-void ExternalSubagent::connect(HPIPE hPipe)
+void ExternalSubagent::connect(NamedPipe *pipe)
 {
        TCHAR buffer[256];
    UINT32 i;
 
-       m_pipe = hPipe;
+       m_pipe = pipe;
        m_connected = true;
        AgentWriteDebugLog(2, _T("ExternalSubagent(%s): connection established"), m_name);
-   PipeMessageReceiver receiver(hPipe, 8192, 1048576);  // 8K initial, 1M max
+   PipeMessageReceiver receiver(pipe->handle(), 8192, 1048576);  // 8K initial, 1M max
        while(true)
        {
       MessageReceiverResult result;
@@ -144,6 +140,7 @@ void ExternalSubagent::connect(HPIPE hPipe)
        AgentWriteDebugLog(2, _T("ExternalSubagent(%s): connection closed"), m_name);
        m_connected = false;
        m_msgQueue->clear();
+       m_pipe = NULL;
 }
 
 /**
@@ -476,32 +473,6 @@ UINT32 ExternalSubagent::getList(const TCHAR *name, StringList *value)
        return rcc;
 }
 
-/*
- * Send message to external subagent
- */
-bool SendMessageToPipe(HPIPE hPipe, NXCP_MESSAGE *msg)
-{
-       bool success = false;
-
-#ifdef _WIN32
-   if (hPipe == INVALID_HANDLE_VALUE)
-      return false;
-
-       DWORD bytes = 0;
-   if (WriteFile(hPipe, msg, ntohl(msg->size), &bytes, NULL))
-       {
-               success = (bytes == ntohl(msg->size));
-       }
-#else
-   if (hPipe == -1)
-      return false;
-
-       int bytes = SendEx(hPipe, msg, ntohl(msg->size), 0, NULL); 
-       success = (bytes == ntohl(msg->size));
-#endif
-       return success;
-}
-
 /**
  * Add external subagent from config.
  * Each line in config should be in form 
index e272336..fbb840e 100644 (file)
@@ -82,12 +82,7 @@ static void H_GetList(NXCPMessage *pRequest, NXCPMessage *pMsg)
 /**
  * Pipe to master agent
  */
-#ifdef _WIN32
-static HANDLE s_pipe = INVALID_HANDLE_VALUE;
-#else
-static int s_pipe = -1;
-#endif
-static MUTEX s_mutexPipeWrite = MutexCreate();
+static NamedPipe *s_pipe = NULL;
 
 /**
  * Listener thread for master agent commands
@@ -96,39 +91,15 @@ THREAD_RESULT THREAD_CALL MasterAgentListener(void *arg)
 {
    while(!(g_dwFlags & AF_SHUTDOWN))
        {
-#ifdef _WIN32
       TCHAR pipeName[MAX_PATH];
-      _sntprintf(pipeName, MAX_PATH, _T("\\\\.\\pipe\\nxagentd.subagent.%s"), g_masterAgent);
+      _sntprintf(pipeName, MAX_PIPE_NAME_LEN, _T("nxagentd.subagent.%s"), g_masterAgent);
+      s_pipe = NamedPipe::connect(pipeName, 5000);
 
-               s_pipe = CreateFile(pipeName, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
-               if (s_pipe != INVALID_HANDLE_VALUE)
-               {
-                       DWORD pipeMode = PIPE_READMODE_MESSAGE;
-                       SetNamedPipeHandleState(s_pipe, &pipeMode, NULL, NULL);
-#else
-      s_pipe = socket(AF_UNIX, SOCK_STREAM, 0);
-      if (s_pipe != INVALID_SOCKET)
-      {
-             struct sockaddr_un remote;
-             remote.sun_family = AF_UNIX;
-#ifdef UNICODE
-         sprintf(remote.sun_path, "/tmp/.nxagentd.subagent.%S", g_masterAgent);
-#else
-         sprintf(remote.sun_path, "/tmp/.nxagentd.subagent.%s", g_masterAgent);
-#endif
-             if (connect(s_pipe, (struct sockaddr *)&remote, SUN_LEN(&remote)) == -1)
-         {
-            close(s_pipe);
-            s_pipe = -1;
-         }
-      }
-
-      if (s_pipe != -1)
+      if (s_pipe != NULL)
       {
-#endif
                        AgentWriteDebugLog(1, _T("Connected to master agent"));
 
-         PipeMessageReceiver receiver(s_pipe, 8192, 1048576);  // 8K initial, 1M max
+         PipeMessageReceiver receiver(s_pipe->handle(), 8192, 1048576);  // 8K initial, 1M max
                        while(!(g_dwFlags & AF_SHUTDOWN))
                        {
             MessageReceiverResult result;
@@ -186,31 +157,18 @@ THREAD_RESULT THREAD_CALL MasterAgentListener(void *arg)
 
                                // Send response to pipe
                                NXCP_MESSAGE *rawMsg = response.createMessage();
-            bool sendSuccess = SendMessageToPipe(s_pipe, rawMsg);
+            bool sendSuccess = s_pipe->write(rawMsg, ntohl(rawMsg->size));
             free(rawMsg);
             if (!sendSuccess)
                break;
                        }
-#ifdef _WIN32
-                       CloseHandle(s_pipe);
-#else
-         close(s_pipe);
-#endif
+                       delete_and_null(s_pipe);
                        AgentWriteDebugLog(1, _T("Disconnected from master agent"));
                }
                else
                {
-#ifdef _WIN32
-                       if (GetLastError() == ERROR_PIPE_BUSY)
-                       {
-                               WaitNamedPipe(pipeName, 5000);
-                       }
-                       else
-#endif
-                       {
-                               AgentWriteDebugLog(1, _T("Cannot connect to master agent, will retry in 5 seconds"));
-                               ThreadSleep(5);
-                       }
+         AgentWriteDebugLog(1, _T("Cannot connect to master agent, will retry in 5 seconds"));
+         ThreadSleep(5);
                }
        }
        AgentWriteDebugLog(1, _T("Master agent listener stopped"));
@@ -233,8 +191,7 @@ bool SendMessageToMasterAgent(NXCPMessage *msg)
  */
 bool SendRawMessageToMasterAgent(NXCP_MESSAGE *msg)
 {
-       MutexLock(s_mutexPipeWrite);
-   bool success = SendMessageToPipe(s_pipe, msg);
-       MutexUnlock(s_mutexPipeWrite);
-   return success;
+   if (s_pipe == NULL)
+      return false;
+   return s_pipe->write(msg, ntohl(msg->size));
 }
index df0f137..a3ac342 100644 (file)
@@ -225,12 +225,11 @@ class ExternalSubagent
 private:
        TCHAR m_name[MAX_SUBAGENT_NAME];
        TCHAR m_user[MAX_ESA_USER_NAME];
-       NamedPipe *m_listener;
-       HPIPE m_pipe;
+       NamedPipeListener *m_listener;
+       NamedPipe *m_pipe;
        bool m_connected;
        MsgWaitQueue *m_msgQueue;
        UINT32 m_requestId;
-       MUTEX m_mutexPipeWrite;
 
        bool sendMessage(NXCPMessage *msg);
        NXCPMessage *waitForMessage(WORD code, UINT32 id);
@@ -244,7 +243,7 @@ public:
        ~ExternalSubagent();
 
        void startListener();
-       void connect(HPIPE hPipe);
+       void connect(NamedPipe *pipe);
 
        bool isConnected() { return m_connected; }
        const TCHAR *getName() { return m_name; }
@@ -594,7 +593,6 @@ void ListListsFromExtSubagents(NXCPMessage *msg, UINT32 *baseId, UINT32 *count);
 void ListListsFromExtSubagents(StringList *list);
 void ListTablesFromExtSubagents(NXCPMessage *msg, UINT32 *baseId, UINT32 *count);
 void ListTablesFromExtSubagents(StringList *list);
-bool SendMessageToPipe(HPIPE hPipe, NXCP_MESSAGE *msg);
 bool SendMessageToMasterAgent(NXCPMessage *msg);
 bool SendRawMessageToMasterAgent(NXCP_MESSAGE *msg);
 void ShutdownExtSubagents();
index 678c2db..ff3affb 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS multiplatform core agent
-** Copyright (C) 2003-2013 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
@@ -67,12 +67,12 @@ bool PushData(const TCHAR *parameter, const TCHAR *value, UINT32 objectId, time_
 /**
  * Process push request
  */
-static void ProcessPushRequest(HPIPE hPipe)
+static void ProcessPushRequest(NamedPipe *pipe, void *arg)
 {
        TCHAR buffer[256];
 
        AgentWriteDebugLog(5, _T("ProcessPushRequest: connection established"));
-   PipeMessageReceiver receiver(hPipe, 8192, 1048576);  // 8K initial, 1M max
+   PipeMessageReceiver receiver(pipe->handle(), 8192, 1048576);  // 8K initial, 1M max
        while(true)
        {
       MessageReceiverResult result;
@@ -94,195 +94,22 @@ static void ProcessPushRequest(HPIPE hPipe)
                                PushData(name, value, objectId, timestamp);
                        }
                }
-               else
-               {
-               }
                delete msg;
        }
        AgentWriteDebugLog(5, _T("ProcessPushRequest: connection closed"));
 }
 
 /**
- * Connector thread for external push command
+ * Pipe listener for push requests
  */
-#ifdef _WIN32
-
-static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
-{
-       SECURITY_ATTRIBUTES sa;
-       PSECURITY_DESCRIPTOR sd = NULL;
-       SID_IDENTIFIER_AUTHORITY sidAuthWorld = SECURITY_WORLD_SID_AUTHORITY;
-       EXPLICIT_ACCESS ea;
-       PSID sidEveryone = NULL;
-       ACL *acl = NULL;
-       TCHAR errorText[1024];
-
-       // Create a well-known SID for the Everyone group.
-       if(!AllocateAndInitializeSid(&sidAuthWorld, 1, SECURITY_WORLD_RID, 0, 0, 0, 0, 0, 0, 0, &sidEveryone))
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: AllocateAndInitializeSid failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       // Initialize an EXPLICIT_ACCESS structure for an ACE.
-       // The ACE will allow either Everyone or given user to access pipe
-       ZeroMemory(&ea, sizeof(EXPLICIT_ACCESS));
-       ea.grfAccessPermissions = (FILE_GENERIC_READ | FILE_GENERIC_WRITE) & ~FILE_CREATE_PIPE_INSTANCE;
-       ea.grfAccessMode = SET_ACCESS;
-       ea.grfInheritance = NO_INHERITANCE;
-       const TCHAR *user = g_config->getValue(_T("/%agent/PushUser"), _T("*"));
-       if ((user[0] == 0) || !_tcscmp(user, _T("*")))
-       {
-               ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
-               ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
-               ea.Trustee.ptstrName  = (LPTSTR)sidEveryone;
-       }
-       else
-       {
-               ea.Trustee.TrusteeForm = TRUSTEE_IS_NAME;
-               ea.Trustee.TrusteeType = TRUSTEE_IS_USER;
-               ea.Trustee.ptstrName  = (LPTSTR)user;
-               AgentWriteDebugLog(2, _T("PushConnector: will allow connections only for user %s"), user);
-       }
-
-       // Create a new ACL that contains the new ACEs.
-       if (SetEntriesInAcl(1, &ea, NULL, &acl) != ERROR_SUCCESS)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: SetEntriesInAcl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       sd = (PSECURITY_DESCRIPTOR)LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH);
-       if (sd == NULL)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: LocalAlloc failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       if (!InitializeSecurityDescriptor(sd, SECURITY_DESCRIPTOR_REVISION))
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: InitializeSecurityDescriptor failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       // Add the ACL to the security descriptor. 
-   if (!SetSecurityDescriptorDacl(sd, TRUE, acl, FALSE))
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: SetSecurityDescriptorDacl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       sa.nLength = sizeof(SECURITY_ATTRIBUTES);
-       sa.bInheritHandle = FALSE;
-       sa.lpSecurityDescriptor = sd;
-       HANDLE hPipe = CreateNamedPipe(_T("\\\\.\\pipe\\nxagentd.push"), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, 1, 8192, 8192, 0, &sa);
-       if (hPipe == INVALID_HANDLE_VALUE)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: CreateNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-               goto cleanup;
-       }
-
-       AgentWriteDebugLog(2, _T("PushConnector: named pipe created, waiting for connection"));
-       int connectErrors = 0;
-       while(!(g_dwFlags & AF_SHUTDOWN))
-       {
-               BOOL connected = ConnectNamedPipe(hPipe, NULL);
-               if (connected || (GetLastError() == ERROR_PIPE_CONNECTED))
-               {
-                       ProcessPushRequest(hPipe);
-                       DisconnectNamedPipe(hPipe);
-                       connectErrors = 0;
-               }
-               else
-               {
-                       AgentWriteDebugLog(2, _T("PushConnector: ConnectNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
-                       connectErrors++;
-                       if (connectErrors > 10)
-                               break;  // Stop this connector if ConnectNamedPipe fails instantly
-               }
-       }
-
-cleanup:
-       if (hPipe != NULL)
-               CloseHandle(hPipe);
-
-       if (sd != NULL)
-               LocalFree(sd);
-
-       if (acl != NULL)
-               LocalFree(acl);
-
-       if (sidEveryone != NULL)
-               FreeSid(sidEveryone);
-
-       AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
-       return THREAD_OK;
-}
-
-#else
-
-static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
-{
-       mode_t prevMask = 0;
-
-       SOCKET hPipe = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (hPipe == INVALID_SOCKET)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: socket failed (%s)"), _tcserror(errno));
-               goto cleanup;
-       }
-       
-       struct sockaddr_un addrLocal;
-       addrLocal.sun_family = AF_UNIX;
-       strcpy(addrLocal.sun_path, "/tmp/.nxagentd.push");      
-       unlink(addrLocal.sun_path);
-       prevMask = umask(S_IWGRP | S_IWOTH);
-       if (bind(hPipe, (struct sockaddr *)&addrLocal, SUN_LEN(&addrLocal)) == -1)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: bind failed (%s)"), _tcserror(errno));
-               umask(prevMask);
-               goto cleanup;
-       }
-       umask(prevMask);
-
-       if (listen(hPipe, 5) == -1)
-       {
-               AgentWriteDebugLog(2, _T("PushConnector: listen failed (%s)"), _tcserror(errno));
-               goto cleanup;
-       }
-       
-       while(!(g_dwFlags & AF_SHUTDOWN))
-       {
-               struct sockaddr_un addrRemote;
-               socklen_t size = sizeof(struct sockaddr_un);
-               SOCKET cs = accept(hPipe, (struct sockaddr *)&addrRemote, &size);
-               if (cs > 0)
-               {
-                       ProcessPushRequest(cs);
-                       shutdown(cs, 2);
-                       close(cs);
-               }
-               else
-               {
-                       AgentWriteDebugLog(2, _T("PushConnector: accept failed (%s)"), _tcserror(errno));
-               }
-       }
-
-cleanup:
-       if (hPipe != -1)
-               close(hPipe);
-
-       AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
-       return THREAD_OK;
-}
-
-#endif
+static NamedPipeListener *s_listener;
 
 /**
  * Start push connector
  */
-
 void StartPushConnector()
 {
-       ThreadCreate(PushConnector, 0, NULL);
+   s_listener = NamedPipeListener::create(_T("nxagentd.push"), ProcessPushRequest, NULL);
+   if (s_listener != NULL)
+      s_listener->start();
 }
index e55a1c7..4f12218 100644 (file)
 #include <nms_agent.h>
 #include <nms_util.h>
 #include <nxcpapi.h>
+#include <nxproc.h>
 
 #if HAVE_GETOPT_H
 #include <getopt.h>
 #endif
 
-#ifndef _WIN32
-#include <sys/socket.h>
-#include <sys/un.h>
-#endif
-
-#ifndef SUN_LEN
-#define SUN_LEN(su) (sizeof(*(su)) - sizeof((su)->sun_path) + strlen((su)->sun_path))
-#endif
-
 /**
  * Pipe handle
  */
-#ifdef _WIN32
-static HANDLE s_hPipe = NULL;
-#else
-static int s_hPipe = -1;
-#endif
+static NamedPipe *s_pipe = NULL;
 
 /**
  * Data to send
@@ -95,36 +83,9 @@ static BOOL AddValue(TCHAR *pair)
  */
 static BOOL Startup()
 {
-#ifdef _WIN32
-reconnect:
-       s_hPipe = CreateFile(_T("\\\\.\\pipe\\nxagentd.push"), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
-       if (s_hPipe == INVALID_HANDLE_VALUE)
-       {
-               if (GetLastError() == ERROR_PIPE_BUSY)
-               {
-                       if (WaitNamedPipe(_T("\\\\.\\pipe\\nxagentd.push"), 5000))
-                               goto reconnect;
-               }
-               return FALSE;
-       }
-
-       DWORD pipeMode = PIPE_READMODE_MESSAGE;
-       SetNamedPipeHandleState(s_hPipe, &pipeMode, NULL, NULL);
-#else
-       s_hPipe = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (s_hPipe == INVALID_SOCKET)
-               return FALSE;
-
-       struct sockaddr_un remote;
-       remote.sun_family = AF_UNIX;
-       strcpy(remote.sun_path, "/tmp/.nxagentd.push");
-       if (connect(s_hPipe, (struct sockaddr *)&remote, SUN_LEN(&remote)) == -1)
-       {
-               close(s_hPipe);
-               s_hPipe = -1;
-               return FALSE;
-       }
-#endif
+   s_pipe = NamedPipe::connect(_T("nxagentd.push"));
+   if (s_pipe == NULL)
+      return FALSE;
 
        if (s_optVerbose > 2)
                _tprintf(_T("Connected to NetXMS agent\n"));
@@ -135,10 +96,8 @@ reconnect:
 /**
  * Send all DCIs
  */
-static BOOL Send()
+static bool Send()
 {
-       BOOL success = FALSE;
-
        NXCPMessage msg;
        msg.setCode(CMD_PUSH_DCI_DATA);
    msg.setField(VID_OBJECT_ID, s_optObjectId);
@@ -147,21 +106,8 @@ static BOOL Send()
 
        // Send response to pipe
        NXCP_MESSAGE *rawMsg = msg.createMessage();
-#ifdef _WIN32
-       DWORD bytes;
-       if (!WriteFile(s_hPipe, rawMsg, ntohl(rawMsg->size), &bytes, NULL))
-               goto cleanup;
-       if (bytes != ntohl(rawMsg->size))
-               goto cleanup;
-#else
-       int bytes = SendEx(s_hPipe, rawMsg, ntohl(rawMsg->size), 0, NULL); 
-       if (bytes != (int)ntohl(rawMsg->size))
-               goto cleanup;
-#endif
-
-       success = TRUE;
+       bool success = s_pipe->write(rawMsg, ntohl(rawMsg->size));
 
-cleanup:
        free(rawMsg);
        return success;
 }
@@ -171,17 +117,7 @@ cleanup:
  */
 static BOOL Teardown()
 {
-#ifdef _WIN32
-       if (s_hPipe != NULL)
-       {
-               CloseHandle(s_hPipe);
-       }
-#else
-       if (s_hPipe != -1)
-       {
-               close(s_hPipe);
-       }
-#endif
+       delete s_pipe;
        delete s_data;
        return TRUE;
 }
index f2ea049..655eefd 100644 (file)
 #include <nxproc.h>
 
 /**
+ * Named pipe listener constructor
+ */
+NamedPipeListener::NamedPipeListener(const TCHAR *name, HPIPE handle, NamedPipeRequestHandler reqHandler, void *userArg)
+{
+   nx_strncpy(m_name, name, MAX_PIPE_NAME_LEN);
+   m_handle = handle;
+   m_reqHandler = reqHandler;
+   m_userArg = userArg;
+   m_serverThread = INVALID_THREAD_HANDLE;
+   m_stop = false;
+}
+
+/**
  * Start named pipe server
  */
-void NamedPipe::startServer()
+void NamedPipeListener::start()
 {
    if (m_serverThread != INVALID_THREAD_HANDLE)
       return;  // already started
 
    m_stop = false;
-   m_serverThread = ThreadCreateEx(NamedPipe::serverThreadStarter, 0, this);
+   m_serverThread = ThreadCreateEx(NamedPipeListener::serverThreadStarter, 0, this);
 }
 
 /**
  * Stop named pipe server
  */
-void NamedPipe::stopServer()
+void NamedPipeListener::stop()
 {
    m_stop = true;
    ThreadJoin(m_serverThread);
@@ -49,8 +62,18 @@ void NamedPipe::stopServer()
 /**
  * Named pipe server thread starter
  */
-THREAD_RESULT THREAD_CALL NamedPipe::serverThreadStarter(void *arg)
+THREAD_RESULT THREAD_CALL NamedPipeListener::serverThreadStarter(void *arg)
 {
-   static_cast<NamedPipe*>(arg)->serverThread();
+   static_cast<NamedPipeListener*>(arg)->serverThread();
    return THREAD_OK;
 }
+
+/**
+ * Named pipe constructor
+ */
+NamedPipe::NamedPipe(const TCHAR *name, HPIPE handle)
+{
+   nx_strncpy(m_name, name, MAX_PIPE_NAME_LEN);
+   m_handle = handle;
+   m_writeLock = MutexCreate();
+}
index ce0dc71..34102b5 100644 (file)
 /**
  * Create listener end for named pipe
  */
-NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg)
+NamedPipeListener *NamedPipeListener::create(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg)
 {
    mode_t prevMask = 0;
 
    int s = socket(AF_UNIX, SOCK_STREAM, 0);
    if (s == INVALID_SOCKET)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): socket() call failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): socket() call failed (%s)"), name, _tcserror(errno));
       return NULL;
    }
 
@@ -49,7 +49,7 @@ NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler
    prevMask = umask(S_IWGRP | S_IWOTH);
    if (bind(s, (struct sockaddr *)&addrLocal, SUN_LEN(&addrLocal)) == -1)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): bind failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): bind failed (%s)"), name, _tcserror(errno));
       umask(prevMask);
       goto failure;
    }
@@ -57,11 +57,11 @@ NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler
 
    if (listen(s, 5) == -1)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): listen() call failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): listen() call failed (%s)"), name, _tcserror(errno));
       goto failure;
    }
 
-   return new NamedPipe(name, s, true, reqHandler, userArg);
+   return new NamedPipeListener(name, s, reqHandler, userArg);
 
 failure:
    close(s);
@@ -70,58 +70,27 @@ failure:
 }
 
 /**
- * Create client end for named pipe
- */
-NamedPipe *NamedPipe::connectTo(const TCHAR *name)
-{
-   int s = socket(AF_UNIX, SOCK_STREAM, 0);
-   if (s == INVALID_SOCKET)
-   {
-      nxlog_debug(2, _T("NamedPipe(%s): socket() call failed (%s)"), name, _tcserror(errno));
-      return NULL;
-   }
-
-   struct sockaddr_un remote;
-   remote.sun_family = AF_UNIX;
-#ifdef UNICODE
-   sprintf(remote.sun_path, "/tmp/.%S", name);
-#else
-   sprintf(remote.sun_path, "/tmp/.%s", name);
-#endif
-   if (connect(s, (struct sockaddr *)&remote, SUN_LEN(&remote)) == -1)
-   {
-      close(s);
-      return NULL;
-   }
-
-   return new NamedPipe(name, s, false);
-}
-
-/**
  * Pipe destructor
  */
-NamedPipe::~NamedPipe()
+NamedPipeListener::~NamedPipeListener()
 {
    close(m_handle);
-   if (m_isListener)
-   {
-      stopServer();
-      char path[MAX_PATH];
+   stop();
+   char path[MAX_PATH];
 #ifdef UNICODE
-      sprintf(path, "/tmp/.%S", m_name);
+   sprintf(path, "/tmp/.%S", m_name);
 #else
-      sprintf(path, "/tmp/.%s", m_name);
+   sprintf(path, "/tmp/.%s", m_name);
 #endif
-      unlink(path);
-   }
+   unlink(path);
 }
 
 /**
  * Named pipe server thread
  */
-void NamedPipe::serverThread()
+void NamedPipeListener::serverThread()
 {
-   nxlog_debug(2, _T("NamedPipe(%s): waiting for connection"), m_name);
+   nxlog_debug(2, _T("NamedPipeListener(%s): waiting for connection"), m_name);
    while(!m_stop)
    {
       struct sockaddr_un addrRemote;
@@ -129,13 +98,58 @@ void NamedPipe::serverThread()
       SOCKET cs = accept(m_handle, (struct sockaddr *)&addrRemote, &size);
       if (cs > 0)
       {
-         NamedPipe *cp = new NamedPipe(m_name, cs, false);
+         NamedPipe *cp = new NamedPipe(m_name, cs);
          m_reqHandler(cp, m_userArg);
          delete cp;
       }
       else
       {
-         nxlog_debug(2, _T("NamedPipe(%s): accept failed (%s)"), m_name, _tcserror(errno));
+         nxlog_debug(2, _T("NamedPipeListener(%s): accept failed (%s)"), m_name, _tcserror(errno));
       }
    }
 }
+
+/**
+ * Pipe destructor
+ */
+NamedPipe::~NamedPipe()
+{
+   close(m_handle);
+   MutexDestroy(m_writeLock);
+}
+
+/**
+ * Create client end for named pipe
+ */
+NamedPipe *NamedPipe::connect(const TCHAR *name, UINT32 timeout)
+{
+   int s = socket(AF_UNIX, SOCK_STREAM, 0);
+   if (s == INVALID_SOCKET)
+   {
+      nxlog_debug(2, _T("NamedPipe(%s): socket() call failed (%s)"), name, _tcserror(errno));
+      return NULL;
+   }
+
+   struct sockaddr_un remote;
+   remote.sun_family = AF_UNIX;
+#ifdef UNICODE
+   sprintf(remote.sun_path, "/tmp/.%S", name);
+#else
+   sprintf(remote.sun_path, "/tmp/.%s", name);
+#endif
+   if (::connect(s, (struct sockaddr *)&remote, SUN_LEN(&remote)) == -1)
+   {
+      close(s);
+      return NULL;
+   }
+
+   return new NamedPipe(name, s);
+}
+
+/**
+ * Write to pipe
+ */
+bool NamedPipe::write(const void *data, size_t size)
+{
+   return SendEx(m_handle, data, size, 0, m_writeLock) == (int)size;
+}
similarity index 50%
copy from src/libnetxms/nxproc_unix.cpp
copy to src/libnetxms/nxproc_win32.cpp
index ce0dc71..ac0d12e 100644 (file)
 /**
  * Create listener end for named pipe
  */
-NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg)
+NamedPipeListener *NamedPipeListener::create(const TCHAR *name, NamedPipeRequestHandler reqHandler, void *userArg)
 {
    mode_t prevMask = 0;
 
    int s = socket(AF_UNIX, SOCK_STREAM, 0);
    if (s == INVALID_SOCKET)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): socket() call failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): socket() call failed (%s)"), name, _tcserror(errno));
       return NULL;
    }
 
@@ -49,7 +49,7 @@ NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler
    prevMask = umask(S_IWGRP | S_IWOTH);
    if (bind(s, (struct sockaddr *)&addrLocal, SUN_LEN(&addrLocal)) == -1)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): bind failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): bind failed (%s)"), name, _tcserror(errno));
       umask(prevMask);
       goto failure;
    }
@@ -57,11 +57,11 @@ NamedPipe *NamedPipe::createListener(const TCHAR *name, NamedPipeRequestHandler
 
    if (listen(s, 5) == -1)
    {
-      nxlog_debug(2, _T("NamedPipe(%s): listen() call failed (%s)"), name, _tcserror(errno));
+      nxlog_debug(2, _T("NamedPipeListener(%s): listen() call failed (%s)"), name, _tcserror(errno));
       goto failure;
    }
 
-   return new NamedPipe(name, s, true, reqHandler, userArg);
+   return new NamedPipeListener(name, s, reqHandler, userArg);
 
 failure:
    close(s);
@@ -70,31 +70,23 @@ failure:
 }
 
 /**
- * Create client end for named pipe
+ * Pipe destructor
  */
-NamedPipe *NamedPipe::connectTo(const TCHAR *name)
+NamedPipeListener::~NamedPipeListener()
 {
-   int s = socket(AF_UNIX, SOCK_STREAM, 0);
-   if (s == INVALID_SOCKET)
-   {
-      nxlog_debug(2, _T("NamedPipe(%s): socket() call failed (%s)"), name, _tcserror(errno));
-      return NULL;
-   }
+   CloseHandle(m_handle);
+   stop();
+}
 
-   struct sockaddr_un remote;
-   remote.sun_family = AF_UNIX;
-#ifdef UNICODE
-   sprintf(remote.sun_path, "/tmp/.%S", name);
-#else
-   sprintf(remote.sun_path, "/tmp/.%s", name);
-#endif
-   if (connect(s, (struct sockaddr *)&remote, SUN_LEN(&remote)) == -1)
+/**
+ * Named pipe server thread
+ */
+void NamedPipeListener::serverThread()
+{
+   nxlog_debug(2, _T("NamedPipeListener(%s): waiting for connection"), m_name);
+   while(!m_stop)
    {
-      close(s);
-      return NULL;
    }
-
-   return new NamedPipe(name, s, false);
 }
 
 /**
@@ -102,40 +94,42 @@ NamedPipe *NamedPipe::connectTo(const TCHAR *name)
  */
 NamedPipe::~NamedPipe()
 {
-   close(m_handle);
-   if (m_isListener)
-   {
-      stopServer();
-      char path[MAX_PATH];
-#ifdef UNICODE
-      sprintf(path, "/tmp/.%S", m_name);
-#else
-      sprintf(path, "/tmp/.%s", m_name);
-#endif
-      unlink(path);
-   }
+   CloseHandle(m_handle);
+   MutexDestroy(m_writeLock);
 }
 
 /**
- * Named pipe server thread
+ * Create client end for named pipe
  */
-void NamedPipe::serverThread()
+NamedPipe *NamedPipe::connect(const TCHAR *name, UINT32 timeout)
 {
-   nxlog_debug(2, _T("NamedPipe(%s): waiting for connection"), m_name);
-   while(!m_stop)
-   {
-      struct sockaddr_un addrRemote;
-      socklen_t size = sizeof(struct sockaddr_un);
-      SOCKET cs = accept(m_handle, (struct sockaddr *)&addrRemote, &size);
-      if (cs > 0)
-      {
-         NamedPipe *cp = new NamedPipe(m_name, cs, false);
-         m_reqHandler(cp, m_userArg);
-         delete cp;
-      }
-      else
-      {
-         nxlog_debug(2, _T("NamedPipe(%s): accept failed (%s)"), m_name, _tcserror(errno));
-      }
-   }
+   TCHAR path[MAX_PATH];
+   _sntprintf(path, MAX_PATH, _T("\\\\.\\pipe\\%s"), name);
+
+reconnect:
+       HANDLE h = CreateFile(path, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
+       if (h == INVALID_HANDLE_VALUE)
+       {
+               if (GetLastError() == ERROR_PIPE_BUSY)
+               {
+                       if (WaitNamedPipe(path, timeout))
+                               goto reconnect;
+               }
+               return NULL;
+       }
+
+       DWORD pipeMode = PIPE_READMODE_MESSAGE;
+       SetNamedPipeHandleState(h, &pipeMode, NULL, NULL);
+   return new NamedPipe(name, h, false);
+}
+
+/**
+ * Write to pipe
+ */
+bool NamedPipe::write(const void *data, size_t size)
+{
+       DWORD bytes;
+       if (!WriteFile(s_handle, data, size, &bytes, NULL))
+               return false;
+   return bytes == (DWORD)size;
 }