implemented DB driver call DrvGetFieldUnbufferedUTF8 (for databases with native UTF...
[public/netxms.git] / src / db / dbdrv / pgsql / pgsql.cpp
1 /*
2 ** PostgreSQL Database Driver
3 ** Copyright (C) 2003 - 2016 Victor Kirhenshtein and Alex Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: pgsql.cpp
20 **
21 **/
22
23 #include "pgsqldrv.h"
24
25 #ifndef _WIN32
26 #include <dlfcn.h>
27 #endif
28
29 #ifdef _WIN32
30 #pragma warning(disable : 4996)
31 #endif
32
33 DECLARE_DRIVER_HEADER("PGSQL")
34
35 extern "C" void EXPORT DrvDisconnect(DBDRV_CONNECTION pConn);
36 static bool UnsafeDrvQuery(PG_CONN *pConn, const char *szQuery, WCHAR *errorText);
37
38 #ifndef _WIN32
39 static void *s_libpq = NULL;
40 static int (*s_PQsetSingleRowMode)(PGconn *) = NULL;
41 #endif
42
43 #if !HAVE_DECL_PGRES_SINGLE_TUPLE
44 #define PGRES_SINGLE_TUPLE 9
45 #endif
46
47 /**
48 * Statement ID
49 */
50 static VolatileCounter s_statementId = 0;
51
52 /**
53 * Prepare string for using in SQL query - enclose in quotes and escape as needed
54 */
55 extern "C" WCHAR EXPORT *DrvPrepareStringW(const WCHAR *str)
56 {
57 int len = (int)wcslen(str) + 3; // + two quotes and \0 at the end
58 int bufferSize = len + 128;
59 WCHAR *out = (WCHAR *)malloc(bufferSize * sizeof(WCHAR));
60 out[0] = L'\'';
61
62 const WCHAR *src = str;
63 int outPos;
64 for(outPos = 1; *src != 0; src++)
65 {
66 UINT32 chval = *src;
67 if (chval < 32)
68 {
69 WCHAR buffer[8];
70
71 swprintf(buffer, 8, L"\\%03o", chval);
72 len += 4;
73 if (len >= bufferSize)
74 {
75 bufferSize += 128;
76 out = (WCHAR *)realloc(out, bufferSize * sizeof(WCHAR));
77 }
78 memcpy(&out[outPos], buffer, 4 * sizeof(WCHAR));
79 outPos += 4;
80 }
81 else if (*src == L'\'')
82 {
83 len++;
84 if (len >= bufferSize)
85 {
86 bufferSize += 128;
87 out = (WCHAR *)realloc(out, bufferSize * sizeof(WCHAR));
88 }
89 out[outPos++] = L'\'';
90 out[outPos++] = L'\'';
91 }
92 else if (*src == L'\\')
93 {
94 len++;
95 if (len >= bufferSize)
96 {
97 bufferSize += 128;
98 out = (WCHAR *)realloc(out, bufferSize * sizeof(WCHAR));
99 }
100 out[outPos++] = L'\\';
101 out[outPos++] = L'\\';
102 }
103 else
104 {
105 out[outPos++] = *src;
106 }
107 }
108 out[outPos++] = L'\'';
109 out[outPos++] = 0;
110
111 return out;
112 }
113
114 /**
115 * Prepare string for using in SQL query - enclose in quotes and escape as needed
116 */
117 extern "C" char EXPORT *DrvPrepareStringA(const char *str)
118 {
119 int len = (int)strlen(str) + 3; // + two quotes and \0 at the end
120 int bufferSize = len + 128;
121 char *out = (char *)malloc(bufferSize);
122 out[0] = '\'';
123
124 const char *src = str;
125 int outPos;
126 for(outPos = 1; *src != 0; src++)
127 {
128 UINT32 chval = (UINT32)(*((unsigned char *)src));
129 if (chval < 32)
130 {
131 char buffer[8];
132
133 snprintf(buffer, 8, "\\%03o", chval);
134 len += 4;
135 if (len >= bufferSize)
136 {
137 bufferSize += 128;
138 out = (char *)realloc(out, bufferSize);
139 }
140 memcpy(&out[outPos], buffer, 4);
141 outPos += 4;
142 }
143 else if (*src == '\'')
144 {
145 len++;
146 if (len >= bufferSize)
147 {
148 bufferSize += 128;
149 out = (char *)realloc(out, bufferSize);
150 }
151 out[outPos++] = '\'';
152 out[outPos++] = '\'';
153 }
154 else if (*src == '\\')
155 {
156 len++;
157 if (len >= bufferSize)
158 {
159 bufferSize += 128;
160 out = (char *)realloc(out, bufferSize);
161 }
162 out[outPos++] = '\\';
163 out[outPos++] = '\\';
164 }
165 else
166 {
167 out[outPos++] = *src;
168 }
169 }
170 out[outPos++] = '\'';
171 out[outPos++] = 0;
172
173 return out;
174 }
175
176 /**
177 * Initialize driver
178 */
179 extern "C" bool EXPORT DrvInit(const char *cmdLine)
180 {
181 #ifndef _WIN32
182 s_libpq = dlopen("libpq.so.5", RTLD_NOW);
183 if (s_libpq != NULL)
184 s_PQsetSingleRowMode = (int (*)(PGconn *))dlsym(s_libpq, "PQsetSingleRowMode");
185 nxlog_debug(2, _T("PostgreSQL driver: single row mode %s"), (s_PQsetSingleRowMode != NULL) ? _T("enabled") : _T("disabled"));
186 #endif
187 return true;
188 }
189
190 /**
191 * Unload handler
192 */
193 extern "C" void EXPORT DrvUnload()
194 {
195 #ifndef _WIN32
196 if (s_libpq != NULL)
197 dlclose(s_libpq);
198 #endif
199 }
200
201 /**
202 * Connect to database
203 */
204 extern "C" DBDRV_CONNECTION EXPORT DrvConnect(const char *szHost, const char *szLogin, const char *szPassword,
205 const char *szDatabase, const char *schema, WCHAR *errorText)
206 {
207 PG_CONN *pConn;
208 char *port = NULL;
209
210 if (szDatabase == NULL || *szDatabase == 0)
211 {
212 wcscpy(errorText, L"Database name is empty");
213 return NULL;
214 }
215 if((port = (char *)strchr(szHost, ':'))!=NULL)
216 {
217 port[0]=0;
218 port++;
219 }
220
221 pConn = (PG_CONN *)malloc(sizeof(PG_CONN));
222
223 if (pConn != NULL)
224 {
225 // should be replaced with PQconnectdb();
226 pConn->handle = PQsetdbLogin(szHost, port, NULL, NULL, szDatabase, szLogin, szPassword);
227
228 if (PQstatus(pConn->handle) == CONNECTION_BAD)
229 {
230 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, errorText, DBDRV_MAX_ERROR_TEXT);
231 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
232 RemoveTrailingCRLFW(errorText);
233 PQfinish(pConn->handle);
234 free(pConn);
235 pConn = NULL;
236 }
237 else
238 {
239 PGresult *pResult;
240
241 pResult = PQexec(pConn->handle, "SET standard_conforming_strings TO off");
242 PQclear(pResult);
243
244 pResult = PQexec(pConn->handle, "SET escape_string_warning TO off");
245 PQclear(pResult);
246
247 PQsetClientEncoding(pConn->handle, "UTF8");
248
249 pConn->mutexQueryLock = MutexCreate();
250
251 if ((schema != NULL) && (schema[0] != 0))
252 {
253 char query[256];
254 snprintf(query, 256, "SET search_path=%s", schema);
255 if (!UnsafeDrvQuery(pConn, query, errorText))
256 {
257 DrvDisconnect(pConn);
258 pConn = NULL;
259 }
260 }
261 }
262 }
263 else
264 {
265 wcscpy(errorText, L"Memory allocation error");
266 }
267
268 return (DBDRV_CONNECTION)pConn;
269 }
270
271 /**
272 * Disconnect from database
273 */
274 extern "C" void EXPORT DrvDisconnect(DBDRV_CONNECTION pConn)
275 {
276 if (pConn != NULL)
277 {
278 PQfinish(((PG_CONN *)pConn)->handle);
279 MutexDestroy(((PG_CONN *)pConn)->mutexQueryLock);
280 free(pConn);
281 }
282 }
283
284 /**
285 * Convert query from NetXMS portable format to native PostgreSQL format
286 */
287 static char *ConvertQuery(WCHAR *query)
288 {
289 char *srcQuery = UTF8StringFromWideString(query);
290 int count = NumCharsA(srcQuery, '?');
291 if (count == 0)
292 return srcQuery;
293
294 char *dstQuery = (char *)malloc(strlen(srcQuery) + count * 3 + 1);
295 bool inString = false;
296 int pos = 1;
297 char *src, *dst;
298 for(src = srcQuery, dst = dstQuery; *src != 0; src++)
299 {
300 switch(*src)
301 {
302 case '\'':
303 *dst++ = *src;
304 inString = !inString;
305 break;
306 case '\\':
307 *dst++ = *src++;
308 *dst++ = *src;
309 break;
310 case '?':
311 if (inString)
312 {
313 *dst++ = '?';
314 }
315 else
316 {
317 *dst++ = '$';
318 if (pos < 10)
319 {
320 *dst++ = pos + '0';
321 }
322 else if (pos < 100)
323 {
324 *dst++ = pos / 10 + '0';
325 *dst++ = pos % 10 + '0';
326 }
327 else
328 {
329 *dst++ = pos / 100 + '0';
330 *dst++ = (pos % 100) / 10 + '0';
331 *dst++ = pos % 10 + '0';
332 }
333 pos++;
334 }
335 break;
336 default:
337 *dst++ = *src;
338 break;
339 }
340 }
341 *dst = 0;
342 free(srcQuery);
343 return dstQuery;
344 }
345
346 /**
347 * Prepare statement
348 */
349 extern "C" DBDRV_STATEMENT EXPORT DrvPrepare(PG_CONN *pConn, WCHAR *pwszQuery, DWORD *pdwError, WCHAR *errorText)
350 {
351 char *pszQueryUTF8 = ConvertQuery(pwszQuery);
352 PG_STATEMENT *hStmt = (PG_STATEMENT *)malloc(sizeof(PG_STATEMENT));
353 hStmt->connection = pConn;
354 snprintf(hStmt->name, 64, "netxms_stmt_%p_%d", hStmt, (int)InterlockedIncrement(&s_statementId));
355
356 MutexLock(pConn->mutexQueryLock);
357 PGresult *pResult = PQprepare(pConn->handle, hStmt->name, pszQueryUTF8, 0, NULL);
358 if ((pResult == NULL) || (PQresultStatus(pResult) != PGRES_COMMAND_OK))
359 {
360 free(hStmt);
361 hStmt = NULL;
362
363 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
364
365 if (errorText != NULL)
366 {
367 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, errorText, DBDRV_MAX_ERROR_TEXT);
368 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
369 RemoveTrailingCRLFW(errorText);
370 }
371 }
372 else
373 {
374 hStmt->allocated = 0;
375 hStmt->pcount = 0;
376 hStmt->buffers = NULL;
377 *pdwError = DBERR_SUCCESS;
378 }
379 MutexUnlock(pConn->mutexQueryLock);
380 if (pResult != NULL)
381 PQclear(pResult);
382 free(pszQueryUTF8);
383 return hStmt;
384 }
385
386 /**
387 * Bind parameter to prepared statement
388 */
389 extern "C" void EXPORT DrvBind(PG_STATEMENT *hStmt, int pos, int sqlType, int cType, void *buffer, int allocType)
390 {
391 if (pos <= 0)
392 return;
393
394 if (hStmt->allocated < pos)
395 {
396 int newAllocated = max(hStmt->allocated + 16, pos);
397 hStmt->buffers = (char **)realloc(hStmt->buffers, sizeof(char *) * newAllocated);
398 for(int i = hStmt->allocated; i < newAllocated; i++)
399 hStmt->buffers[i] = NULL;
400 hStmt->allocated = newAllocated;
401 }
402 if (hStmt->pcount < pos)
403 hStmt->pcount = pos;
404
405 free(hStmt->buffers[pos - 1]);
406
407 switch(cType)
408 {
409 case DB_CTYPE_STRING:
410 hStmt->buffers[pos - 1] = UTF8StringFromWideString((WCHAR *)buffer);
411 break;
412 case DB_CTYPE_UTF8_STRING:
413 if (allocType == DB_BIND_DYNAMIC)
414 {
415 hStmt->buffers[pos - 1] = (char *)buffer;
416 buffer = NULL; // prevent deallocation
417 }
418 else
419 {
420 hStmt->buffers[pos - 1] = strdup((char *)buffer);
421 }
422 break;
423 case DB_CTYPE_INT32:
424 hStmt->buffers[pos - 1] = (char *)malloc(16);
425 sprintf(hStmt->buffers[pos - 1], "%d", *((int *)buffer));
426 break;
427 case DB_CTYPE_UINT32:
428 hStmt->buffers[pos - 1] = (char *)malloc(16);
429 sprintf(hStmt->buffers[pos - 1], "%u", *((unsigned int *)buffer));
430 break;
431 case DB_CTYPE_INT64:
432 hStmt->buffers[pos - 1] = (char *)malloc(32);
433 sprintf(hStmt->buffers[pos - 1], INT64_FMTA, *((INT64 *)buffer));
434 break;
435 case DB_CTYPE_UINT64:
436 hStmt->buffers[pos - 1] = (char *)malloc(32);
437 sprintf(hStmt->buffers[pos - 1], UINT64_FMTA, *((QWORD *)buffer));
438 break;
439 case DB_CTYPE_DOUBLE:
440 hStmt->buffers[pos - 1] = (char *)malloc(32);
441 sprintf(hStmt->buffers[pos - 1], "%f", *((double *)buffer));
442 break;
443 default:
444 hStmt->buffers[pos - 1] = strdup("");
445 break;
446 }
447
448 if (allocType == DB_BIND_DYNAMIC)
449 free(buffer);
450 }
451
452 /**
453 * Execute prepared statement
454 */
455 extern "C" DWORD EXPORT DrvExecute(PG_CONN *pConn, PG_STATEMENT *hStmt, WCHAR *errorText)
456 {
457 DWORD rc;
458
459 MutexLock(pConn->mutexQueryLock);
460 bool retry;
461 int retryCount = 60;
462 do
463 {
464 retry = false;
465 PGresult *pResult = PQexecPrepared(pConn->handle, hStmt->name, hStmt->pcount, hStmt->buffers, NULL, NULL, 0);
466 if (pResult != NULL)
467 {
468 if (PQresultStatus(pResult) == PGRES_COMMAND_OK)
469 {
470 if (errorText != NULL)
471 *errorText = 0;
472 rc = DBERR_SUCCESS;
473 }
474 else
475 {
476 const char *sqlState = PQresultErrorField(pResult, PG_DIAG_SQLSTATE);
477 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
478 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
479 {
480 ThreadSleep(500);
481 retry = true;
482 retryCount--;
483 }
484 else
485 {
486 if (errorText != NULL)
487 {
488 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
489 int len = (int)wcslen(errorText);
490 if (len > 0)
491 {
492 errorText[len] = L' ';
493 len++;
494 }
495 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
496 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
497 RemoveTrailingCRLFW(errorText);
498 }
499 rc = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
500 }
501 }
502
503 PQclear(pResult);
504 }
505 else
506 {
507 if (errorText != NULL)
508 wcsncpy(errorText, L"Internal error (pResult is NULL in DrvExecute)", DBDRV_MAX_ERROR_TEXT);
509 rc = DBERR_OTHER_ERROR;
510 }
511 }
512 while(retry);
513 MutexUnlock(pConn->mutexQueryLock);
514 return rc;
515 }
516
517 /**
518 * Destroy prepared statement
519 */
520 extern "C" void EXPORT DrvFreeStatement(PG_STATEMENT *hStmt)
521 {
522 if (hStmt == NULL)
523 return;
524
525 char query[256];
526 snprintf(query, 256, "DEALLOCATE \"%s\"", hStmt->name);
527
528 MutexLock(hStmt->connection->mutexQueryLock);
529 UnsafeDrvQuery(hStmt->connection, query, NULL);
530 MutexUnlock(hStmt->connection->mutexQueryLock);
531
532 for(int i = 0; i < hStmt->allocated; i++)
533 safe_free(hStmt->buffers[i]);
534 safe_free(hStmt->buffers);
535
536 free(hStmt);
537 }
538
539 /**
540 * Perform non-SELECT query - internal implementation
541 */
542 static bool UnsafeDrvQuery(PG_CONN *pConn, const char *szQuery, WCHAR *errorText)
543 {
544 int retryCount = 60;
545
546 retry:
547 PGresult *pResult = PQexec(pConn->handle, szQuery);
548
549 if (pResult == NULL)
550 {
551 if (errorText != NULL)
552 wcsncpy(errorText, L"Internal error (pResult is NULL in UnsafeDrvQuery)", DBDRV_MAX_ERROR_TEXT);
553 return false;
554 }
555
556 if (PQresultStatus(pResult) != PGRES_COMMAND_OK)
557 {
558 const char *sqlState = PQresultErrorField(pResult, PG_DIAG_SQLSTATE);
559 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
560 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
561 {
562 ThreadSleep(500);
563 retryCount--;
564 PQclear(pResult);
565 goto retry;
566 }
567 else
568 {
569 if (errorText != NULL)
570 {
571 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
572 int len = (int)wcslen(errorText);
573 if (len > 0)
574 {
575 errorText[len] = L' ';
576 len++;
577 }
578 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
579 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
580 RemoveTrailingCRLFW(errorText);
581 }
582 }
583 PQclear(pResult);
584 return false;
585 }
586
587 PQclear(pResult);
588 if (errorText != NULL)
589 *errorText = 0;
590 return true;
591 }
592
593 /**
594 * Perform non-SELECT query
595 */
596 extern "C" DWORD EXPORT DrvQuery(PG_CONN *pConn, WCHAR *pwszQuery, WCHAR *errorText)
597 {
598 DWORD dwRet;
599
600 char *pszQueryUTF8 = UTF8StringFromWideString(pwszQuery);
601 MutexLock(pConn->mutexQueryLock);
602 if (UnsafeDrvQuery(pConn, pszQueryUTF8, errorText))
603 {
604 dwRet = DBERR_SUCCESS;
605 }
606 else
607 {
608 dwRet = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
609 }
610 MutexUnlock(pConn->mutexQueryLock);
611 free(pszQueryUTF8);
612
613 return dwRet;
614 }
615
616 /**
617 * Perform SELECT query - internal implementation
618 */
619 static DBDRV_RESULT UnsafeDrvSelect(PG_CONN *pConn, const char *szQuery, WCHAR *errorText)
620 {
621 int retryCount = 60;
622
623 retry:
624 PGresult *pResult = PQexec(((PG_CONN *)pConn)->handle, szQuery);
625
626 if (pResult == NULL)
627 {
628 if (errorText != NULL)
629 wcsncpy(errorText, L"Internal error (pResult is NULL in UnsafeDrvSelect)", DBDRV_MAX_ERROR_TEXT);
630 return NULL;
631 }
632
633 if ((PQresultStatus(pResult) != PGRES_COMMAND_OK) &&
634 (PQresultStatus(pResult) != PGRES_TUPLES_OK))
635 {
636 const char *sqlState = PQresultErrorField(pResult, PG_DIAG_SQLSTATE);
637 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
638 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
639 {
640 ThreadSleep(500);
641 retryCount--;
642 PQclear(pResult);
643 goto retry;
644 }
645 else
646 {
647 if (errorText != NULL)
648 {
649 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
650 int len = (int)wcslen(errorText);
651 if (len > 0)
652 {
653 errorText[len] = L' ';
654 len++;
655 }
656 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
657 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
658 RemoveTrailingCRLFW(errorText);
659 }
660 }
661 PQclear(pResult);
662 return NULL;
663 }
664
665 if (errorText != NULL)
666 *errorText = 0;
667 return (DBDRV_RESULT)pResult;
668 }
669
670 /**
671 * Perform SELECT query
672 */
673 extern "C" DBDRV_RESULT EXPORT DrvSelect(PG_CONN *pConn, WCHAR *pwszQuery, DWORD *pdwError, WCHAR *errorText)
674 {
675 DBDRV_RESULT pResult;
676 char *pszQueryUTF8;
677
678 pszQueryUTF8 = UTF8StringFromWideString(pwszQuery);
679 MutexLock(pConn->mutexQueryLock);
680 pResult = UnsafeDrvSelect(pConn, pszQueryUTF8, errorText);
681 if (pResult != NULL)
682 {
683 *pdwError = DBERR_SUCCESS;
684 }
685 else
686 {
687 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
688 }
689 MutexUnlock(pConn->mutexQueryLock);
690 free(pszQueryUTF8);
691
692 return pResult;
693 }
694
695 /**
696 * Perform SELECT query using prepared statement
697 */
698 extern "C" DBDRV_RESULT EXPORT DrvSelectPrepared(PG_CONN *pConn, PG_STATEMENT *hStmt, DWORD *pdwError, WCHAR *errorText)
699 {
700 PGresult *pResult = NULL;
701 bool retry;
702 int retryCount = 60;
703
704 MutexLock(pConn->mutexQueryLock);
705 do
706 {
707 retry = false;
708 pResult = PQexecPrepared(pConn->handle, hStmt->name, hStmt->pcount, hStmt->buffers, NULL, NULL, 0);
709 if (pResult != NULL)
710 {
711 if ((PQresultStatus(pResult) == PGRES_COMMAND_OK) ||
712 (PQresultStatus(pResult) == PGRES_TUPLES_OK))
713 {
714 if (errorText != NULL)
715 *errorText = 0;
716 *pdwError = DBERR_SUCCESS;
717 }
718 else
719 {
720 const char *sqlState = PQresultErrorField(pResult, PG_DIAG_SQLSTATE);
721 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
722 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
723 {
724 ThreadSleep(500);
725 retry = true;
726 retryCount--;
727 }
728 else
729 {
730 if (errorText != NULL)
731 {
732 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
733 int len = (int)wcslen(errorText);
734 if (len > 0)
735 {
736 errorText[len] = L' ';
737 len++;
738 }
739 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
740 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
741 RemoveTrailingCRLFW(errorText);
742 }
743 }
744 PQclear(pResult);
745 pResult = NULL;
746 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
747 }
748 }
749 else
750 {
751 if (errorText != NULL)
752 wcsncpy(errorText, L"Internal error (pResult is NULL in UnsafeDrvSelect)", DBDRV_MAX_ERROR_TEXT);
753 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
754 }
755 }
756 while(retry);
757 MutexUnlock(pConn->mutexQueryLock);
758
759 return (DBDRV_RESULT)pResult;
760 }
761
762 /**
763 * Get field length from result
764 */
765 extern "C" LONG EXPORT DrvGetFieldLength(DBDRV_RESULT pResult, int nRow, int nColumn)
766 {
767 if (pResult == NULL)
768 return -1;
769
770 const char *value = PQgetvalue((PGresult *)pResult, nRow, nColumn);
771 return (value != NULL) ? (LONG)strlen(value) : (LONG)-1;
772 }
773
774 /**
775 * Get field value from result
776 */
777 extern "C" WCHAR EXPORT *DrvGetField(DBDRV_RESULT pResult, int nRow, int nColumn, WCHAR *pBuffer, int nBufLen)
778 {
779 if (pResult == NULL)
780 return NULL;
781
782 if (PQfformat((PGresult *)pResult, nColumn) != 0)
783 return NULL;
784
785 const char *value = PQgetvalue((PGresult *)pResult, nRow, nColumn);
786 if (value == NULL)
787 return NULL;
788
789 MultiByteToWideChar(CP_UTF8, 0, value, -1, pBuffer, nBufLen);
790 pBuffer[nBufLen - 1] = 0;
791 return pBuffer;
792 }
793
794 /**
795 * Get field value from result as UTF8 string
796 */
797 extern "C" char EXPORT *DrvGetFieldUTF8(DBDRV_RESULT pResult, int nRow, int nColumn, char *pBuffer, int nBufLen)
798 {
799 if (pResult == NULL)
800 return NULL;
801
802 const char *value = PQgetvalue((PGresult *)pResult, nRow, nColumn);
803 if (value == NULL)
804 return NULL;
805
806 strncpy(pBuffer, value, nBufLen);
807 pBuffer[nBufLen - 1] = 0;
808 return pBuffer;
809 }
810
811 /**
812 * Get number of rows in result
813 */
814 extern "C" int EXPORT DrvGetNumRows(DBDRV_RESULT pResult)
815 {
816 return (pResult != NULL) ? PQntuples((PGresult *)pResult) : 0;
817 }
818
819 /**
820 * Get column count in query result
821 */
822 extern "C" int EXPORT DrvGetColumnCount(DBDRV_RESULT hResult)
823 {
824 return (hResult != NULL) ? PQnfields((PGresult *)hResult) : 0;
825 }
826
827 /**
828 * Get column name in query result
829 */
830 extern "C" const char EXPORT *DrvGetColumnName(DBDRV_RESULT hResult, int column)
831 {
832 return (hResult != NULL) ? PQfname((PGresult *)hResult, column) : NULL;
833 }
834
835 /**
836 * Free SELECT results
837 */
838 extern "C" void EXPORT DrvFreeResult(DBDRV_RESULT pResult)
839 {
840 if (pResult != NULL)
841 {
842 PQclear((PGresult *)pResult);
843 }
844 }
845
846 /**
847 * Perform unbuffered SELECT query
848 */
849 extern "C" DBDRV_UNBUFFERED_RESULT EXPORT DrvSelectUnbuffered(PG_CONN *pConn, WCHAR *pwszQuery, DWORD *pdwError, WCHAR *errorText)
850 {
851 if (pConn == NULL)
852 return NULL;
853
854 PG_UNBUFFERED_RESULT *result = (PG_UNBUFFERED_RESULT *)malloc(sizeof(PG_UNBUFFERED_RESULT));
855 result->conn = pConn;
856 result->fetchBuffer = NULL;
857 result->keepFetchBuffer = true;
858
859 MutexLock(pConn->mutexQueryLock);
860
861 bool success = false;
862 bool retry;
863 int retryCount = 60;
864 char *queryUTF8 = UTF8StringFromWideString(pwszQuery);
865 do
866 {
867 retry = false;
868 if (PQsendQuery(pConn->handle, queryUTF8))
869 {
870 #ifdef _WIN32
871 if (PQsetSingleRowMode(pConn->handle))
872 {
873 result->singleRowMode = true;
874 #else
875 if ((s_PQsetSingleRowMode == NULL) || s_PQsetSingleRowMode(pConn->handle))
876 {
877 result->singleRowMode = (s_PQsetSingleRowMode != NULL);
878 #endif
879 result->currRow = 0;
880
881 // Fetch first row (to check for errors in Select instead of Fetch call)
882 result->fetchBuffer = PQgetResult(pConn->handle);
883 if ((PQresultStatus(result->fetchBuffer) == PGRES_COMMAND_OK) ||
884 (PQresultStatus(result->fetchBuffer) == PGRES_TUPLES_OK) ||
885 (PQresultStatus(result->fetchBuffer) == PGRES_SINGLE_TUPLE))
886 {
887 if (errorText != NULL)
888 *errorText = 0;
889 *pdwError = DBERR_SUCCESS;
890 success = true;
891 }
892 else
893 {
894 const char *sqlState = PQresultErrorField(result->fetchBuffer, PG_DIAG_SQLSTATE);
895 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
896 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
897 {
898 ThreadSleep(500);
899 retry = true;
900 retryCount--;
901 }
902 else
903 {
904 if (errorText != NULL)
905 {
906 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
907 int len = (int)wcslen(errorText);
908 if (len > 0)
909 {
910 errorText[len] = L' ';
911 len++;
912 }
913 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
914 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
915 RemoveTrailingCRLFW(errorText);
916 }
917 }
918 PQclear(result->fetchBuffer);
919 result->fetchBuffer = NULL;
920 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
921 }
922 }
923 else
924 {
925 if (errorText != NULL)
926 wcsncpy(errorText, L"Internal error (call to PQsetSingleRowMode failed)", DBDRV_MAX_ERROR_TEXT);
927 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
928 }
929 }
930 else
931 {
932 if (errorText != NULL)
933 wcsncpy(errorText, L"Internal error (call to PQsendQuery failed)", DBDRV_MAX_ERROR_TEXT);
934 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
935 }
936 }
937 while(retry);
938 free(queryUTF8);
939
940 if (!success)
941 {
942 free(result);
943 result = NULL;
944 }
945 return (DBDRV_UNBUFFERED_RESULT)result;
946 }
947
948 /**
949 * Perform unbuffered SELECT query using prepared statement
950 */
951 extern "C" DBDRV_UNBUFFERED_RESULT EXPORT DrvSelectPreparedUnbuffered(PG_CONN *pConn, PG_STATEMENT *hStmt, DWORD *pdwError, WCHAR *errorText)
952 {
953 if (pConn == NULL)
954 return NULL;
955
956 PG_UNBUFFERED_RESULT *result = (PG_UNBUFFERED_RESULT *)malloc(sizeof(PG_UNBUFFERED_RESULT));
957 result->conn = pConn;
958 result->fetchBuffer = NULL;
959 result->keepFetchBuffer = true;
960
961 MutexLock(pConn->mutexQueryLock);
962
963 bool success = false;
964 bool retry;
965 int retryCount = 60;
966 do
967 {
968 retry = false;
969 if (PQsendQueryPrepared(pConn->handle, hStmt->name, hStmt->pcount, hStmt->buffers, NULL, NULL, 0))
970 {
971 #ifdef _WIN32
972 if (PQsetSingleRowMode(pConn->handle))
973 {
974 result->singleRowMode = true;
975 #else
976 if ((s_PQsetSingleRowMode == NULL) || s_PQsetSingleRowMode(pConn->handle))
977 {
978 result->singleRowMode = (s_PQsetSingleRowMode != NULL);
979 #endif
980 result->currRow = 0;
981
982 // Fetch first row (to check for errors in Select instead of Fetch call)
983 result->fetchBuffer = PQgetResult(pConn->handle);
984 if ((PQresultStatus(result->fetchBuffer) == PGRES_COMMAND_OK) ||
985 (PQresultStatus(result->fetchBuffer) == PGRES_TUPLES_OK) ||
986 (PQresultStatus(result->fetchBuffer) == PGRES_SINGLE_TUPLE))
987 {
988 if (errorText != NULL)
989 *errorText = 0;
990 *pdwError = DBERR_SUCCESS;
991 success = true;
992 }
993 else
994 {
995 const char *sqlState = PQresultErrorField(result->fetchBuffer, PG_DIAG_SQLSTATE);
996 if ((PQstatus(pConn->handle) != CONNECTION_BAD) &&
997 (sqlState != NULL) && (!strcmp(sqlState, "53000") || !strcmp(sqlState, "53200")) && (retryCount > 0))
998 {
999 ThreadSleep(500);
1000 retry = true;
1001 retryCount--;
1002 }
1003 else
1004 {
1005 if (errorText != NULL)
1006 {
1007 MultiByteToWideChar(CP_UTF8, 0, CHECK_NULL_EX_A(sqlState), -1, errorText, DBDRV_MAX_ERROR_TEXT);
1008 int len = (int)wcslen(errorText);
1009 if (len > 0)
1010 {
1011 errorText[len] = L' ';
1012 len++;
1013 }
1014 MultiByteToWideChar(CP_UTF8, 0, PQerrorMessage(pConn->handle), -1, &errorText[len], DBDRV_MAX_ERROR_TEXT - len);
1015 errorText[DBDRV_MAX_ERROR_TEXT - 1] = 0;
1016 RemoveTrailingCRLFW(errorText);
1017 }
1018 }
1019 PQclear(result->fetchBuffer);
1020 result->fetchBuffer = NULL;
1021 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
1022 }
1023 }
1024 else
1025 {
1026 if (errorText != NULL)
1027 wcsncpy(errorText, L"Internal error (call to PQsetSingleRowMode failed)", DBDRV_MAX_ERROR_TEXT);
1028 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
1029 }
1030 }
1031 else
1032 {
1033 if (errorText != NULL)
1034 wcsncpy(errorText, L"Internal error (call to PQsendQueryPrepared failed)", DBDRV_MAX_ERROR_TEXT);
1035 *pdwError = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
1036 }
1037 }
1038 while(retry);
1039
1040 if (!success)
1041 {
1042 free(result);
1043 result = NULL;
1044 }
1045 return (DBDRV_UNBUFFERED_RESULT)result;
1046 }
1047
1048 /**
1049 * Fetch next result line from asynchronous SELECT results
1050 */
1051 extern "C" bool EXPORT DrvFetch(PG_UNBUFFERED_RESULT *result)
1052 {
1053 if (result == NULL)
1054 return false;
1055
1056 if (!result->keepFetchBuffer)
1057 {
1058 if (result->singleRowMode)
1059 {
1060 if (result->fetchBuffer != NULL)
1061 PQclear(result->fetchBuffer);
1062 result->fetchBuffer = PQgetResult(result->conn->handle);
1063 }
1064 else
1065 {
1066 if (result->fetchBuffer != NULL)
1067 {
1068 result->currRow++;
1069 if (result->currRow >= PQntuples(result->fetchBuffer))
1070 {
1071 PQclear(result->fetchBuffer);
1072 result->fetchBuffer = PQgetResult(result->conn->handle);
1073 result->currRow = 0;
1074 }
1075 }
1076 else
1077 {
1078 result->currRow = 0;
1079 }
1080 }
1081 }
1082 else
1083 {
1084 result->keepFetchBuffer = false;
1085 }
1086
1087 if (result->fetchBuffer == NULL)
1088 return false;
1089
1090 bool success;
1091 if ((PQresultStatus(result->fetchBuffer) == PGRES_SINGLE_TUPLE) || (PQresultStatus(result->fetchBuffer) == PGRES_TUPLES_OK))
1092 {
1093 success = (PQntuples(result->fetchBuffer) > 0);
1094 }
1095 else
1096 {
1097 PQclear(result->fetchBuffer);
1098 result->fetchBuffer = NULL;
1099 success = false;
1100 }
1101
1102 return success;
1103 }
1104
1105 /**
1106 * Get field length from async quety result
1107 */
1108 extern "C" LONG EXPORT DrvGetFieldLengthUnbuffered(PG_UNBUFFERED_RESULT *result, int nColumn)
1109 {
1110 if ((result == NULL) || (result->fetchBuffer == NULL))
1111 return 0;
1112
1113 // validate column index
1114 if (nColumn >= PQnfields(result->fetchBuffer))
1115 return 0;
1116
1117 char *value = PQgetvalue(result->fetchBuffer, result->currRow, nColumn);
1118 if (value == NULL)
1119 return 0;
1120
1121 return (LONG)strlen(value);
1122 }
1123
1124 /**
1125 * Get field from current row in async query result
1126 */
1127 extern "C" WCHAR EXPORT *DrvGetFieldUnbuffered(PG_UNBUFFERED_RESULT *result, int nColumn, WCHAR *pBuffer, int nBufSize)
1128 {
1129 if ((result == NULL) || (result->fetchBuffer == NULL))
1130 return NULL;
1131
1132 // validate column index
1133 if (nColumn >= PQnfields(result->fetchBuffer))
1134 return NULL;
1135
1136 if (PQfformat(result->fetchBuffer, nColumn) != 0)
1137 return NULL;
1138
1139 char *value = PQgetvalue(result->fetchBuffer, result->currRow, nColumn);
1140 if (value == NULL)
1141 return NULL;
1142
1143 MultiByteToWideChar(CP_UTF8, 0, value, -1, pBuffer, nBufSize);
1144 pBuffer[nBufSize - 1] = 0;
1145
1146 return pBuffer;
1147 }
1148
1149 /**
1150 * Get field from current row in async query result as UTF-8 string
1151 */
1152 extern "C" char EXPORT *DrvGetFieldUnbufferedUTF8(PG_UNBUFFERED_RESULT *result, int nColumn, char *pBuffer, int nBufSize)
1153 {
1154 if ((result == NULL) || (result->fetchBuffer == NULL))
1155 return NULL;
1156
1157 // validate column index
1158 if (nColumn >= PQnfields(result->fetchBuffer))
1159 return NULL;
1160
1161 if (PQfformat(result->fetchBuffer, nColumn) != 0)
1162 return NULL;
1163
1164 char *value = PQgetvalue(result->fetchBuffer, result->currRow, nColumn);
1165 if (value == NULL)
1166 return NULL;
1167
1168 strncpy(pBuffer, value, nBufSize);
1169 pBuffer[nBufSize - 1] = 0;
1170
1171 return pBuffer;
1172 }
1173
1174 /**
1175 * Get column count in async query result
1176 */
1177 extern "C" int EXPORT DrvGetColumnCountUnbuffered(PG_UNBUFFERED_RESULT *result)
1178 {
1179 return ((result != NULL) && (result->fetchBuffer != NULL)) ? PQnfields(result->fetchBuffer) : 0;
1180 }
1181
1182 /**
1183 * Get column name in async query result
1184 */
1185 extern "C" const char EXPORT *DrvGetColumnNameUnbuffered(PG_UNBUFFERED_RESULT *result, int column)
1186 {
1187 return ((result != NULL) && (result->fetchBuffer != NULL))? PQfname(result->fetchBuffer, column) : NULL;
1188 }
1189
1190 /**
1191 * Destroy result of async query
1192 */
1193 extern "C" void EXPORT DrvFreeUnbufferedResult(PG_UNBUFFERED_RESULT *result)
1194 {
1195 if (result == NULL)
1196 return;
1197
1198 if (result->fetchBuffer != NULL)
1199 PQclear(result->fetchBuffer);
1200
1201 // read all outstanding results
1202 while(true)
1203 {
1204 result->fetchBuffer = PQgetResult(result->conn->handle);
1205 if (result->fetchBuffer == NULL)
1206 break;
1207 PQclear(result->fetchBuffer);
1208 }
1209
1210 MutexUnlock(result->conn->mutexQueryLock);
1211 free(result);
1212 }
1213
1214 /**
1215 * Begin transaction
1216 */
1217 extern "C" DWORD EXPORT DrvBegin(PG_CONN *pConn)
1218 {
1219 DWORD dwResult;
1220
1221 if (pConn == NULL)
1222 return DBERR_INVALID_HANDLE;
1223
1224 MutexLock(pConn->mutexQueryLock);
1225 if (UnsafeDrvQuery(pConn, "BEGIN", NULL))
1226 {
1227 dwResult = DBERR_SUCCESS;
1228 }
1229 else
1230 {
1231 dwResult = (PQstatus(pConn->handle) == CONNECTION_BAD) ? DBERR_CONNECTION_LOST : DBERR_OTHER_ERROR;
1232 }
1233 MutexUnlock(pConn->mutexQueryLock);
1234 return dwResult;
1235 }
1236
1237 /**
1238 * Commit transaction
1239 */
1240 extern "C" DWORD EXPORT DrvCommit(PG_CONN *pConn)
1241 {
1242 bool bRet;
1243
1244 if (pConn == NULL)
1245 return DBERR_INVALID_HANDLE;
1246
1247 MutexLock(pConn->mutexQueryLock);
1248 bRet = UnsafeDrvQuery(pConn, "COMMIT", NULL);
1249 MutexUnlock(pConn->mutexQueryLock);
1250 return bRet ? DBERR_SUCCESS : DBERR_OTHER_ERROR;
1251 }
1252
1253 /**
1254 * Rollback transaction
1255 */
1256 extern "C" DWORD EXPORT DrvRollback(PG_CONN *pConn)
1257 {
1258 bool bRet;
1259
1260 if (pConn == NULL)
1261 return DBERR_INVALID_HANDLE;
1262
1263 MutexLock(pConn->mutexQueryLock);
1264 bRet = UnsafeDrvQuery(pConn, "ROLLBACK", NULL);
1265 MutexUnlock(pConn->mutexQueryLock);
1266 return bRet ? DBERR_SUCCESS : DBERR_OTHER_ERROR;
1267 }
1268
1269 /**
1270 * Check if table exist
1271 */
1272 extern "C" int EXPORT DrvIsTableExist(PG_CONN *pConn, const WCHAR *name)
1273 {
1274 WCHAR query[256];
1275 swprintf(query, 256, L"SELECT count(*) FROM information_schema.tables WHERE table_catalog=current_database() AND table_schema=current_schema() AND lower(table_name)=lower('%ls')", name);
1276 DWORD error;
1277 WCHAR errorText[DBDRV_MAX_ERROR_TEXT];
1278 int rc = DBIsTableExist_Failure;
1279 DBDRV_RESULT hResult = DrvSelect(pConn, query, &error, errorText);
1280 if (hResult != NULL)
1281 {
1282 WCHAR buffer[64] = L"";
1283 DrvGetField(hResult, 0, 0, buffer, 64);
1284 rc = (wcstol(buffer, NULL, 10) > 0) ? DBIsTableExist_Found : DBIsTableExist_NotFound;
1285 DrvFreeResult(hResult);
1286 }
1287 return rc;
1288 }
1289
1290 #ifdef _WIN32
1291
1292 /**
1293 * DLL Entry point
1294 */
1295 bool WINAPI DllMain(HINSTANCE hInstance, DWORD dwReason, LPVOID lpReserved)
1296 {
1297 if (dwReason == DLL_PROCESS_ATTACH)
1298 DisableThreadLibraryCalls(hInstance);
1299 return true;
1300 }
1301
1302 #endif