added mutex around XMPP calls
[public/netxms.git] / src / libstrophe / event.c
CommitLineData
244c65ef
VK
1/* event.c
2** strophe XMPP client library -- event loop and management
3**
4** Copyright (C) 2005-2009 Collecta, Inc.
5**
6** This software is provided AS-IS with no warranty, either express
7** or implied.
8**
9** This software is distributed under license and may not be copied,
10** modified or distributed except as expressly authorized under the
11** terms of the license contained in the file LICENSE.txt in this
12** distribution.
13*/
14
15/** @file
16 * Event loop and management.
17 */
18
19/** @defgroup EventLoop Event loop
20 * These functions manage the Strophe event loop.
21 *
22 * Simple tools can use xmpp_run() and xmpp_stop() to manage the life
23 * cycle of the program. A common idiom is to set up a few initial
24 * event handers, call xmpp_run(), and then respond and react to
25 * events as they come in. At some point, one of the handlers will
26 * call xmpp_stop() to quit the event loop which leads to the program
27 * terminating.
28 *
29 * More complex programs will have their own event loops, and should
30 * ensure that xmpp_run_once() is called regularly from there. For
31 * example, a GUI program will already include an event loop to
32 * process UI events from users, and xmpp_run_once() would be called
33 * from an idle function.
34 */
35
36#include <nms_common.h>
37
38#ifdef _WIN32
39#define ETIMEDOUT WSAETIMEDOUT
40#define ECONNRESET WSAECONNRESET
41#define ECONNABORTED WSAECONNABORTED
42#endif
43
44#include <strophe.h>
45#include "common.h"
46#include "parser.h"
47
48#ifndef DEFAULT_TIMEOUT
49/** @def DEFAULT_TIMEOUT
50 * The default timeout in milliseconds for the event loop.
51 * This is set to 1 millisecond.
52 */
53#define DEFAULT_TIMEOUT 1
54#endif
55
56/** Run the event loop once.
57 * This function will run send any data that has been queued by
58 * xmpp_send and related functions and run through the Strophe even
59 * loop a single time, and will not wait more than timeout
60 * milliseconds for events. This is provided to support integration
61 * with event loops outside the library, and if used, should be
62 * called regularly to achieve low latency event handling.
63 *
64 * @param ctx a Strophe context object
65 * @param timeout time to wait for events in milliseconds
66 *
67 * @ingroup EventLoop
68 */
69void xmpp_run_once(xmpp_ctx_t *ctx, const unsigned long timeout)
70{
71 xmpp_connlist_t *connitem;
72 xmpp_conn_t *conn;
73 fd_set rfds, wfds;
74 sock_t max = 0;
75 int ret;
76 struct timeval tv;
77 xmpp_send_queue_t *sq, *tsq;
78 int towrite;
79 char buf[4096];
80 uint64_t next;
81 uint64_t usec;
82 int tls_read_bytes = 0;
83
84 if (ctx->loop_status == XMPP_LOOP_QUIT) return;
85 ctx->loop_status = XMPP_LOOP_RUNNING;
86
87 /* send queued data */
88 connitem = ctx->connlist;
89 while (connitem) {
90 conn = connitem->conn;
91 if (conn->state != XMPP_STATE_CONNECTED) {
92 connitem = connitem->next;
93 continue;
94 }
95
96 /* if we're running tls, there may be some remaining data waiting to
97 * be sent, so push that out */
98 if (conn->tls) {
99 ret = tls_clear_pending_write(conn->tls);
100
101 if (ret < 0 && !tls_is_recoverable(tls_error(conn->tls))) {
102 /* an error occured */
103 xmpp_debug(ctx, "xmpp", "Send error occured, disconnecting.");
104 conn->error = ECONNABORTED;
105 conn_disconnect(conn);
106 }
107 }
108
109 /* write all data from the send queue to the socket */
110 sq = conn->send_queue_head;
111 while (sq) {
112 towrite = (int)(sq->len - sq->written);
113
114 if (conn->tls) {
115 ret = tls_write(conn->tls, &sq->data[sq->written], towrite);
116
117 if (ret < 0 && !tls_is_recoverable(tls_error(conn->tls))) {
118 /* an error occured */
119 conn->error = tls_error(conn->tls);
120 break;
121 } else if (ret < towrite) {
122 /* not all data could be sent now */
123 if (ret >= 0) sq->written += ret;
124 break;
125 }
126
127 } else {
128 ret = sock_write(conn->sock, &sq->data[sq->written], towrite);
129
130 if (ret < 0 && !sock_is_recoverable(sock_error())) {
131 /* an error occured */
132 conn->error = sock_error();
133 break;
134 } else if (ret < towrite) {
135 /* not all data could be sent now */
136 if (ret >= 0) sq->written += ret;
137 break;
138 }
139 }
140
141 /* all data for this queue item written, delete and move on */
142 xmpp_free(ctx, sq->data);
143 tsq = sq;
144 sq = sq->next;
145 xmpp_free(ctx, tsq);
146
147 /* pop the top item */
148 conn->send_queue_head = sq;
149 /* if we've sent everything update the tail */
150 if (!sq) conn->send_queue_tail = NULL;
151 }
152
153 /* tear down connection on error */
154 if (conn->error) {
155 /* FIXME: need to tear down send queues and random other things
156 * maybe this should be abstracted */
157 xmpp_debug(ctx, "xmpp", "Send error occured, disconnecting.");
158 conn->error = ECONNABORTED;
159 conn_disconnect(conn);
160 }
161
162 connitem = connitem->next;
163 }
164
165 /* reset parsers if needed */
166 for (connitem = ctx->connlist; connitem; connitem = connitem->next) {
167 if (connitem->conn->reset_parser)
168 conn_parser_reset(connitem->conn);
169 }
170
171
172 /* fire any ready timed handlers, then
173 make sure we don't wait past the time when timed handlers need
174 to be called */
175 next = handler_fire_timed(ctx);
176
177 usec = ((next < timeout) ? next : timeout) * 1000;
178 tv.tv_sec = (long)(usec / 1000000);
179 tv.tv_usec = (long)(usec % 1000000);
180
181 FD_ZERO(&rfds);
182 FD_ZERO(&wfds);
183
184 /* find events to watch */
185 connitem = ctx->connlist;
186 while (connitem) {
187 conn = connitem->conn;
188
189 switch (conn->state) {
190 case XMPP_STATE_CONNECTING:
191 /* connect has been called and we're waiting for it to complete */
192 /* connection will give us write or error events */
193
194 /* make sure the timeout hasn't expired */
195 if (time_elapsed(conn->timeout_stamp, time_stamp()) <=
196 conn->connect_timeout)
197 FD_SET(conn->sock, &wfds);
198 else {
199 conn->error = ETIMEDOUT;
200 xmpp_info(ctx, "xmpp", "Connection attempt timed out.");
201 conn_disconnect(conn);
202 }
203 break;
204 case XMPP_STATE_CONNECTED:
205 FD_SET(conn->sock, &rfds);
206 break;
207 case XMPP_STATE_DISCONNECTED:
208 /* do nothing */
209 default:
210 break;
211 }
212
213 /* Check if there is something in the SSL buffer. */
214 if (conn->tls) {
215 tls_read_bytes += tls_pending(conn->tls);
216 }
217
218 if (conn->sock > max) max = conn->sock;
219
220 connitem = connitem->next;
221 }
222
223 /* check for events */
224 ret = select(SELECT_NFDS(max + 1), &rfds, &wfds, NULL, &tv);
225
226 /* select errored */
227 if (ret < 0)
228 {
229 if (!sock_is_recoverable(sock_error()))
230 {
231// xmpp_error(ctx, "xmpp", "unrecoverable socket error %d", sock_error());
232#ifdef _WIN32
233 Sleep(1000);
234#else
235 sleep(1);
236#endif
237 }
238 return;
239 }
240
241 /* no events happened */
242 if (ret == 0 && tls_read_bytes == 0) return;
243
244 /* process events */
245 connitem = ctx->connlist;
246 while (connitem) {
247 conn = connitem->conn;
248
249 switch (conn->state) {
250 case XMPP_STATE_CONNECTING:
251 if (FD_ISSET(conn->sock, &wfds)) {
252 /* connection complete */
253
254 /* check for error */
255 if (sock_connect_error(conn->sock) != 0) {
256 /* connection failed */
257 xmpp_debug(ctx, "xmpp", "connection failed");
258 conn_disconnect(conn);
259 break;
260 }
261
262 conn->state = XMPP_STATE_CONNECTED;
263 xmpp_debug(ctx, "xmpp", "connection successful");
264
265
266 /* send stream init */
267 conn_open_stream(conn);
268 }
269
270 break;
271 case XMPP_STATE_CONNECTED:
272 if (FD_ISSET(conn->sock, &rfds) || (conn->tls && tls_pending(conn->tls))) {
273 if (conn->tls) {
274 ret = tls_read(conn->tls, buf, 4096);
275 } else {
276 ret = sock_read(conn->sock, buf, 4096);
277 }
278
279 if (ret > 0) {
280 ret = parser_feed(conn->parser, buf, ret);
281 if (!ret) {
282 /* parse error, we need to shut down */
283 /* FIXME */
284 xmpp_debug(ctx, "xmpp", "parse error, disconnecting");
285 conn_disconnect(conn);
286 }
287 } else {
288 if (conn->tls) {
289 if (!tls_is_recoverable(tls_error(conn->tls)))
290 {
291 xmpp_debug(ctx, "xmpp", "Unrecoverable TLS error, %d.", tls_error(conn->tls));
292 conn->error = tls_error(conn->tls);
293 conn_disconnect(conn);
294 }
295 } else {
296 /* return of 0 means socket closed by server */
297 xmpp_debug(ctx, "xmpp", "Socket closed by remote host.");
298 conn->error = ECONNRESET;
299 conn_disconnect(conn);
300 }
301 }
302 }
303
304 break;
305 case XMPP_STATE_DISCONNECTED:
306 /* do nothing */
307 default:
308 break;
309 }
310
311 connitem = connitem->next;
312 }
313
314 /* fire any ready handlers */
315 handler_fire_timed(ctx);
316}
317
318/** Start the event loop.
319 * This function continuously calls xmpp_run_once and does not return
320 * until xmpp_stop has been called.
321 *
322 * @param ctx a Strophe context object
323 *
324 * @ingroup EventLoop
325 */
326void xmpp_run(xmpp_ctx_t *ctx)
327{
328 if (ctx->loop_status != XMPP_LOOP_NOTSTARTED) return;
329
330 ctx->loop_status = XMPP_LOOP_RUNNING;
331 while (ctx->loop_status == XMPP_LOOP_RUNNING) {
332 xmpp_run_once(ctx, DEFAULT_TIMEOUT);
333 }
334
335 xmpp_debug(ctx, "event", "Event loop completed.");
336}
337
338/** Stop the event loop.
339 * This will stop the event loop after the current iteration and cause
340 * xmpp_run to exit.
341 *
342 * @param ctx a Strophe context object
343 *
344 * @ingroup EventLoop
345 */
346void xmpp_stop(xmpp_ctx_t *ctx)
347{
348 xmpp_debug(ctx, "event", "Stopping event loop.");
349
350 if (ctx->loop_status == XMPP_LOOP_RUNNING)
351 ctx->loop_status = XMPP_LOOP_QUIT;
352}
f479c6bc
VK
353
354/**
355 * Set loop status
356 */
357void xmpp_set_loop_status(xmpp_ctx_t *ctx, xmpp_loop_status_t status)
358{
359 ctx->loop_status = status;
360}
361
362/**
363 * Get current loop status
364 */
365xmpp_loop_status_t xmpp_get_loop_status(xmpp_ctx_t *ctx)
366{
367 return ctx->loop_status;
368}