Fixed problem with file sheduled file upload. Fixed some memory leaks Fixes #1187
[public/netxms.git] / src / server / include / nxcore_jobs.h
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2009 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: nxcore_jobs.h
20 **
21 **/
22
23 #ifndef _nxcore_jobs_h_
24 #define _nxcore_jobs_h_
25
26 #include "nxcore_schedule.h"
27
28 #define JOB_RESCHEDULE_OFSET 600
29
30 /**
31 * Job status
32 */
33 enum ServerJobStatus
34 {
35 JOB_PENDING = 0,
36 JOB_ACTIVE,
37 JOB_ON_HOLD,
38 JOB_COMPLETED,
39 JOB_FAILED,
40 JOB_CANCELLED,
41 JOB_CANCEL_PENDING
42 };
43
44
45 /**
46 * Job status
47 */
48 enum ServerJobResult
49 {
50 JOB_RESULT_SUCCESS = 0,
51 JOB_RESULT_FAILED,
52 JOB_RESULT_RESCHEDULE
53 };
54
55 /**
56 * Job class
57 */
58 class ServerJobQueue;
59 class NetObj;
60
61 class NXCORE_EXPORTABLE ServerJob
62 {
63 private:
64 UINT32 m_id;
65 UINT32 m_userId;
66 TCHAR *m_type;
67 UINT32 m_remoteNode;
68 TCHAR *m_description;
69 ServerJobStatus m_status;
70 int m_progress;
71 TCHAR *m_failureMessage;
72 THREAD m_workerThread;
73 ServerJobQueue *m_owningQueue;
74 time_t m_lastStatusChange;
75 int m_autoCancelDelay; // Interval in seconds to cancel failed job automatically (0 = disabled)
76 time_t m_lastNotification;
77 NetObj *m_resolvedObject;
78 MUTEX m_notificationLock;
79 NXCPMessage m_notificationMessage;
80 bool m_blockNextJobsOnFailure;
81 bool m_isValid;
82
83 static THREAD_RESULT THREAD_CALL WorkerThreadStarter(void *);
84 static void sendNotification(ClientSession *session, void *arg);
85
86 void createHistoryRecord();
87 void updateHistoryRecord(bool onStart);
88
89 protected:
90 int m_retryCount;
91
92 virtual ServerJobResult run();
93 virtual bool onCancel();
94 virtual const TCHAR *getAdditionalInfo();
95
96 void notifyClients(bool isStatusChange);
97 void changeStatus(ServerJobStatus newStatus);
98 void markProgress(int pctCompleted);
99 void setFailureMessage(const TCHAR *msg);
100
101 void setDescription(const TCHAR *description);
102
103 public:
104 ServerJob(const TCHAR *type, const TCHAR *description, UINT32 node, UINT32 userId, bool createOnHold, int retryCount = -1);
105 ServerJob(const TCHAR* params, UINT32 node, UINT32 userId);
106 virtual ~ServerJob();
107
108 void start();
109 bool cancel();
110 bool hold();
111 bool unhold();
112
113 void setAutoCancelDelay(int delay) { m_autoCancelDelay = delay; }
114 int getAutoCancelDelay() { return m_autoCancelDelay; }
115
116 void setBlockNextJobsOnFailure(bool flag) { m_blockNextJobsOnFailure = flag; }
117 bool isBlockNextJobsOnFailure() { return m_blockNextJobsOnFailure; }
118 void setIsValid(bool valid) { m_isValid = valid; }
119 bool isValid() { return m_isValid; }
120
121 UINT32 getId() { return m_id; }
122 UINT32 getUserId() { return m_userId; }
123 const TCHAR *getType() { return m_type; }
124 const TCHAR *getDescription() { return m_description; }
125 ServerJobStatus getStatus() { return m_status; }
126 int getProgress() { return m_progress; }
127 UINT32 getRemoteNode() { return m_remoteNode; }
128 const TCHAR *getFailureMessage() { return CHECK_NULL_EX(m_failureMessage); }
129 time_t getLastStatusChange() { return m_lastStatusChange; }
130
131 void setOwningQueue(ServerJobQueue *queue);
132
133 void fillMessage(NXCPMessage *msg);
134 virtual TCHAR *serializeParameters();
135 virtual void rescheduleExecution();
136 int getNextJobExecutionTime();
137 };
138
139 /**
140 * Job queue class
141 */
142 class ServerJobQueue
143 {
144 private:
145 int m_jobCount;
146 ServerJob **m_jobList;
147 MUTEX m_accessMutex;
148
149 public:
150 ServerJobQueue();
151 ~ServerJobQueue();
152
153 void add(ServerJob *job);
154 bool cancel(UINT32 jobId);
155 bool hold(UINT32 jobId);
156 bool unhold(UINT32 jobId);
157 void runNext();
158 void cleanup();
159
160 ServerJob *findJob(UINT32 jobId);
161 int getJobCount(const TCHAR *type = NULL);
162
163 void jobCompleted(ServerJob *job);
164
165 UINT32 fillMessage(NXCPMessage *msg, UINT32 *varIdBase);
166 };
167
168 /**
169 * Job manager API
170 */
171 bool NXCORE_EXPORTABLE AddJob(ServerJob *job);
172 void GetJobList(NXCPMessage *msg);
173 UINT32 NXCORE_EXPORTABLE CancelJob(UINT32 userId, NXCPMessage *msg);
174 UINT32 NXCORE_EXPORTABLE HoldJob(UINT32 userId, NXCPMessage *msg);
175 UINT32 NXCORE_EXPORTABLE UnholdJob(UINT32 userId, NXCPMessage *msg);
176
177 /**
178 * File upload job
179 */
180 class FileUploadJob : public ServerJob
181 {
182 protected:
183 static int m_activeJobs;
184 static int m_maxActiveJobs;
185 static MUTEX m_sharedDataMutex;
186
187 Node *m_node;
188 TCHAR *m_localFile;
189 TCHAR *m_localFileFullPath;
190 TCHAR *m_remoteFile;
191 TCHAR *m_info;
192 INT64 m_fileSize;
193
194 virtual ServerJobResult run();
195 virtual const TCHAR *getAdditionalInfo();
196 static void uploadCallback(INT64 size, void *arg);
197
198 public:
199 static void init();
200
201 FileUploadJob(Node *node, const TCHAR *localFile, const TCHAR *remoteFile, UINT32 userId, bool createOnHold);
202 FileUploadJob(TCHAR* params, UINT32 node, UINT32 userId);
203 virtual ~FileUploadJob();
204
205 virtual TCHAR *serializeParameters();
206 virtual void rescheduleExecution();
207 void setLocalFileFullPath();
208 };
209
210 /**
211 * File download job
212 */
213 class FileDownloadJob : public ServerJob
214 {
215 private:
216 Node *m_node;
217 ClientSession *m_session;
218 UINT32 m_requestId;
219 TCHAR *m_localFile;
220 TCHAR *m_remoteFile;
221 TCHAR *m_info;
222 INT64 m_fileSize;
223 INT64 m_currentSize;
224 SOCKET m_socket;
225 UINT32 m_maxFileSize;
226 bool m_follow;
227
228 protected:
229 virtual ServerJobResult run();
230 virtual bool onCancel();
231 virtual const TCHAR *getAdditionalInfo();
232
233 static void progressCallback(size_t size, void *arg);
234 static void fileResendCallback(NXCP_MESSAGE *msg, void *arg);
235
236 public:
237 FileDownloadJob(Node *node, const TCHAR *remoteName, UINT32 maxFileSize, bool follow, ClientSession *session, UINT32 requestId);
238 virtual ~FileDownloadJob();
239
240 static TCHAR *buildServerFileName(UINT32 nodeId, const TCHAR *remoteFile, TCHAR *buffer, size_t bufferSize);
241 TCHAR *getLocalFileName();
242 };
243
244 /**
245 * Agent policy deployment job
246 */
247 class AgentPolicy;
248
249 class PolicyDeploymentJob : public ServerJob
250 {
251 protected:
252 Node *m_node;
253 AgentPolicy *m_policy;
254
255 virtual ServerJobResult run();
256
257 public:
258 PolicyDeploymentJob(Node *node, AgentPolicy *policy, UINT32 userId);
259 PolicyDeploymentJob(const TCHAR* params, UINT32 node, UINT32 userId);
260 virtual ~PolicyDeploymentJob();
261
262 virtual TCHAR *serializeParameters();
263 virtual void rescheduleExecution();
264 };
265
266
267 /**
268 * Agent policy uninstall job
269 */
270 class PolicyUninstallJob : public ServerJob
271 {
272 protected:
273 Node *m_node;
274 AgentPolicy *m_policy;
275
276 virtual ServerJobResult run();
277
278 public:
279 PolicyUninstallJob(Node *node, AgentPolicy *policy, UINT32 userId);
280 PolicyUninstallJob(const TCHAR* params, UINT32 node, UINT32 userId);
281 virtual ~PolicyUninstallJob();
282
283 virtual TCHAR *serializeParameters();
284 virtual void rescheduleExecution();
285 };
286
287
288 #endif /* _nxcore_jobs_h_ */