somer DB schema manipulation functions moved to libnxdb from nxdbmgr
[public/netxms.git] / src / server / tools / nxdbmgr / tdata_convert.cpp
1 /*
2 ** nxdbmgr - NetXMS database manager
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 **/
20
21 #include "nxdbmgr.h"
22
23 /**
24 * Read table object from tdata_* tables
25 */
26 static Table *ReadTable(UINT64 recordId, UINT32 tableId, UINT32 objectId)
27 {
28 TCHAR query[1024];
29 _sntprintf(query, 1024,
30 _T("SELECT r.row_id,w.column_id,n.column_name,c.flags,c.display_name,w.value FROM tdata_records_%d r ")
31 _T("INNER JOIN tdata_rows_%d w ON w.row_id = r.row_id ")
32 _T("INNER JOIN dct_column_names n ON n.column_id=w.column_id ")
33 _T("LEFT OUTER JOIN dc_table_columns c ON c.table_id=%d AND c.column_name=n.column_name ")
34 _T("WHERE r.record_id=") UINT64_FMT _T(" ")
35 _T("ORDER BY r.row_id"), objectId, objectId, tableId, recordId);
36 DB_RESULT hResult = DBSelect(g_hCoreDB, query);
37 if (hResult == NULL)
38 return NULL;
39
40 int count = DBGetNumRows(hResult);
41 Table *table = NULL;
42 if (count > 0)
43 {
44 table = new Table();
45 UINT64 currRowId = 0;
46 for(int i = 0; i < count; i++)
47 {
48 TCHAR columnName[MAX_COLUMN_NAME];
49 DBGetField(hResult, i, 2, columnName, MAX_COLUMN_NAME);
50 int columnIndex = table->getColumnIndex(columnName);
51 if (columnIndex == -1)
52 {
53 TCHAR displayName[256];
54 DBGetField(hResult, i, 4, displayName, 256);
55 UINT16 flags = (UINT16)DBGetFieldULong(hResult, i, 3);
56 columnIndex = table->addColumn(columnName, TCF_GET_DATA_TYPE(flags), displayName, (flags & TCF_INSTANCE_COLUMN) ? true : false);
57 }
58
59 UINT64 rowId = DBGetFieldUInt64(hResult, i, 0);
60 if (rowId != currRowId)
61 {
62 currRowId = rowId;
63 table->addRow();
64 }
65
66 TCHAR value[MAX_RESULT_LENGTH];
67 DBGetField(hResult, i, 5, value, MAX_RESULT_LENGTH);
68 table->set(columnIndex, value);
69 }
70 }
71 DBFreeResult(hResult);
72 return table;
73 }
74
75 /**
76 * Convert tdata table for given object
77 */
78 static bool ConvertTData(UINT32 id, int *skippedRecords)
79 {
80 TCHAR oldName[64], newName[64];
81 _sntprintf(oldName, 64, _T("tdata_%d"), id);
82 _sntprintf(newName, 64, _T("tdata_temp_%d"), id);
83 if (!DBRenameTable(g_hCoreDB, oldName, newName))
84 return false;
85
86 bool success = false;
87 if (CreateTDataTable(id))
88 {
89 int total = 0x07FFFFFF;
90 TCHAR query[256];
91 _sntprintf(query, 256, _T("SELECT count(*) FROM tdata_temp_%d"), id);
92 DB_RESULT hCountResult = DBSelect(g_hCoreDB, query);
93 if (hCountResult != NULL)
94 {
95 total = DBGetFieldLong(hCountResult, 0, 0);
96 if (total <= 0)
97 total = 0x07FFFFFF;
98 DBFreeResult(hCountResult);
99 }
100
101 // Open second connection to database to allow unbuffered query in parallel with inserts
102 DB_HANDLE hdb = ConnectToDatabase();
103 if (hdb != NULL)
104 {
105 _sntprintf(query, 256, _T("SELECT item_id,tdata_timestamp,record_id FROM tdata_temp_%d"), id);
106 DB_UNBUFFERED_RESULT hResult = DBSelectUnbuffered(hdb, query);
107 if (hResult != NULL)
108 {
109 _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,tdata_value) VALUES (?,?,?)"), id);
110 DB_STATEMENT hStmt = DBPrepare(g_hCoreDB, query);
111 if (hStmt != NULL)
112 {
113 success = true;
114 int converted = 0;
115 int skipped = 0;
116 DBBegin(g_hCoreDB);
117 while(DBFetch(hResult))
118 {
119 UINT32 tableId = DBGetFieldULong(hResult, 0);
120 UINT32 timestamp = DBGetFieldULong(hResult, 1);
121 UINT64 recordId = DBGetFieldUInt64(hResult, 2);
122 Table *value = ReadTable(recordId, tableId, id);
123 if (value != NULL)
124 {
125 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, tableId);
126 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, timestamp);
127 DBBind(hStmt, 3, DB_SQLTYPE_TEXT, DB_CTYPE_UTF8_STRING, value->createPackedXML(), DB_BIND_DYNAMIC);
128 if (!SQLExecute(hStmt))
129 {
130 delete value;
131 success = false;
132 break;
133 }
134 delete value;
135 }
136 else
137 {
138 skipped++;
139 }
140
141 converted++;
142 if (converted % 100 == 0)
143 {
144 int pct = (converted * 100) / total;
145 if (pct > 100)
146 pct = 100;
147 WriteToTerminalEx(_T("\b\b\b\b%3d%%"), pct);
148 fflush(stdout);
149 DBCommit(g_hCoreDB);
150 DBBegin(g_hCoreDB);
151 }
152 }
153 DBCommit(g_hCoreDB);
154 DBFreeStatement(hStmt);
155 *skippedRecords = skipped;
156 }
157 DBFreeResult(hResult);
158 }
159 DBDisconnect(hdb);
160 }
161 }
162
163 if (success)
164 {
165 TCHAR query[256];
166 _sntprintf(query, 256, _T("DROP TABLE tdata_rows_%d"), id);
167 SQLQuery(query);
168 _sntprintf(query, 256, _T("DROP TABLE tdata_records_%d"), id);
169 SQLQuery(query);
170 _sntprintf(query, 256, _T("DROP TABLE tdata_temp_%d"), id);
171 SQLQuery(query);
172 }
173 else
174 {
175 TCHAR query[256];
176 _sntprintf(query, 256, _T("DROP TABLE tdata_%d"), id);
177 SQLQuery(query);
178
179 _sntprintf(oldName, 64, _T("tdata_temp_%d"), id);
180 _sntprintf(newName, 64, _T("tdata_%d"), id);
181 DBRenameTable(g_hCoreDB, oldName, newName);
182 }
183
184 return success;
185 }
186
187 /**
188 * Check data tables for given o bject class
189 */
190 static bool ConvertTDataForClass(const TCHAR *className)
191 {
192 bool success = false;
193 TCHAR query[1024];
194 _sntprintf(query, 256, _T("SELECT id FROM %s"), className);
195 DB_RESULT hResult = SQLSelect(query);
196 if (hResult != NULL)
197 {
198 success = true;
199 int count = DBGetNumRows(hResult);
200 for(int i = 0; i < count; i++)
201 {
202 UINT32 id = DBGetFieldULong(hResult, i, 0);
203 if (IsDataTableExist(_T("tdata_%d"), id))
204 {
205 WriteToTerminalEx(_T("Converting table \x1b[1mtdata_%d\x1b[0m: 0%%"), id);
206 fflush(stdout);
207 int skippedRecords = 0;
208 if (ConvertTData(id, &skippedRecords))
209 {
210 if (skippedRecords == 0)
211 WriteToTerminalEx(_T("\b\b\b\b\x1b[32;1mdone\x1b[0m\n"));
212 else
213 WriteToTerminalEx(_T("\b\b\b\b\x1b[33;1mdone with %d records skipped\x1b[0m\n"), skippedRecords);
214 }
215 else
216 {
217 WriteToTerminalEx(_T("\b\b\b\b\x1b[31;1mfailed\x1b[0m\n"));
218 success = false;
219 if(g_bIgnoreErrors)
220 continue;
221 else
222 break;
223 }
224 }
225 else
226 {
227 CreateTDataTable(id);
228 WriteToTerminalEx(_T("Created empty table \x1b[1mtdata_%d\x1b[0m\n"), id);
229 }
230 }
231 DBFreeResult(hResult);
232 }
233 return success;
234 }
235
236 /**
237 * Convert tdata tables into new format
238 */
239 bool ConvertTDataTables()
240 {
241 CHK_EXEC(ConvertTDataForClass(_T("nodes")));
242 CHK_EXEC(ConvertTDataForClass(_T("clusters")));
243 CHK_EXEC(ConvertTDataForClass(_T("mobile_devices")));
244 CHK_EXEC(ConvertTDataForClass(_T("access_points")));
245 CHK_EXEC(ConvertTDataForClass(_T("chassis")));
246 return true;
247 }