###################################################################### # Copyright (c) 2007, Petteri Aimonen # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice and this list of conditions. # * Redistributions in binary form must reproduce the above copyright # notice and this list of conditions in the documentation and/or other # materials provided with the distribution. '''Panel to execute SQL-queries''' import wx from pymisc import _ from database import db_conn, sqlite3 class ResultList(wx.ListCtrl): def OnGetItemText(self, row, col): if col == 0: return str(row + 1) elif self.query: return self.query.getitemtext(row, col - 1) elif col == 1: return _('Query did not return data') else: return '' class SelectQuery: def __init__(self, cursor, query): self.cursor = cursor self.query = query self.rowbuffer_start = None self.rowbuffer_end = None self.rowbuffer = [] if 'ORDER' in query.upper() or 'JOIN' in query.upper(): # Sorting has high penalty, so buffer more self.rowbuffer_size = 10000 else: self.rowbuffer_size = 1000 def getrows(self, offset, limit): '''Get raw SQL rows starting from offset, maximum limit''' self.cursor.execute("SELECT * FROM (" + self.query + ") LIMIT ?,?", (offset, limit)) return self.cursor.fetchall() def buffered_getrow(self, row): '''Get row from buffer. If row is not in buffer, fill buffer first.''' if (not self.rowbuffer or row < self.rowbuffer_start or row > self.rowbuffer_end): self.rowbuffer_start = max(0, row - self.rowbuffer_size // 2) self.rowbuffer_end = self.rowbuffer_start + self.rowbuffer_size - 1 self.rowbuffer = self.getrows(self.rowbuffer_start, self.rowbuffer_size) index = row - self.rowbuffer_start return self.rowbuffer[index] def columnlist(self): '''Get list of column names''' self.cursor.execute("SELECT * FROM (" + self.query + ") LIMIT 1") return [r[0] for r in self.cursor.description] def rowcount(self): '''Get rowcount''' self.cursor.execute("SELECT COUNT(*) FROM (" + self.query + ")") return self.cursor.fetchone()[0] def getitemtext(self, row, col): '''Get text for row, col of result list''' data = self.buffered_getrow(row) if not data: return '' return unicode(data[col]) def savetofile(self, file): '''Save query data to file object''' header = ';'.join(self.columnlist()) file.write('# ' + header + '\r\n') self.cursor.execute(self.query) for row in self.cursor.fetchall(): line = ';'.join(map(unicode, row)) file.write(line + '\r\n') file.flush() class QueryPanel(wx.Panel): def __init__(self, *args, **kwargs): wx.Panel.__init__(self, *args, **kwargs) self.cursor = db_conn.cursor() self.query = None self.querycmd = wx.TextCtrl(self, style = wx.TE_MULTILINE) self.execbutton = wx.Button(self, label = _("Execute")) self.savedbbutton = wx.Button(self, label = _("Save database")) self.saveresultbutton = wx.Button(self, label = _("Save result")) self.resultlist = ResultList(self, style = wx.LC_REPORT | wx.LC_VIRTUAL) self.resulttext = wx.TextCtrl(self, style = wx.TE_READONLY | wx.TE_MULTILINE) self.resultlist.Show(False) self.buttonsizer = wx.BoxSizer(wx.HORIZONTAL) self.buttonsizer.Add(wx.Panel(self), flag = wx.EXPAND, proportion = 1) self.buttonsizer.Add(self.execbutton) self.buttonsizer.Add(self.savedbbutton) self.buttonsizer.Add(self.saveresultbutton) self.sizer = wx.BoxSizer(wx.VERTICAL) self.sizer.Add(self.querycmd, flag = wx.EXPAND | wx.ALL, border = 5, proportion = 1) self.sizer.Add(self.buttonsizer, flag = wx.EXPAND | wx.ALL, border = 5) self.sizer.Add(self.resultlist, flag = wx.EXPAND | wx.ALL, border = 5, proportion = 5) self.sizer.Add(self.resulttext, flag = wx.EXPAND | wx.ALL, border = 5, proportion = 5) self.SetAutoLayout(True) self.SetSizer(self.sizer) self.Layout() self.setmode('text') self.execbutton.Bind(wx.EVT_BUTTON, self.OnExecute) self.savedbbutton.Bind(wx.EVT_BUTTON, self.OnSaveDatabase) self.saveresultbutton.Bind(wx.EVT_BUTTON, self.OnSaveResult) self.querycmd.Bind(wx.EVT_CHAR, self.OnChar) def setmode(self, mode): '''Select either 'list' or 'text' result mode.''' self.resultlist.Show(mode == 'list') self.resulttext.Show(mode == 'text') self.saveresultbutton.Enable(mode == 'list') self.Layout() def OnChar(self, evt): if evt.GetKeyCode() == 13: self.OnExecute() else: evt.Skip() def OnSaveDatabase(self, evt = None): db_conn.commit() self.setmode('text') self.resulttext.SetValue(_("Database saved")) def OnSaveResult(self, evt = None): if not self.query: return wc = _('CSV files (*.csv)|*.csv\n' \ 'All files (*.*)|*.*') dlg = wx.FileDialog(self, message = _('Select file to save results to'), defaultFile = 'results.csv', wildcard = wc, style = wx.SAVE) if dlg.ShowModal() == wx.ID_OK: file = open(dlg.GetPath(), 'w') self.query.savetofile(file) file.close() def OnExecute(self, evt = None): query = self.querycmd.GetValue() try: if query.strip().upper().startswith("SELECT"): # Test query for validness, shouldn't be too heavy self.cursor.execute(query) self.ExecSelect(query) else: self.ExecGeneric(query) except sqlite3.Error, exc: self.setmode('text') text = _('Error in query:\n') text += '\n'.join(exc.args) self.resulttext.SetValue(text) return def ExecGeneric(self, query): '''Run query without results''' self.setmode('text') self.cursor.execute(query) self.query = None self.resultlist.query = None self.resultlist.SetItemCount(0) self.resulttext.SetValue(_("Query ran successfully")) def ExecSelect(self, query): '''Run select-type query''' self.setmode('list') self.query = SelectQuery(self.cursor, query) self.resultlist.ClearAll() self.resultlist.InsertColumn(0, _("#")) width = self.resultlist.GetSize()[0] width -= 60 self.resultlist.SetColumnWidth(0, 60) columns = self.query.columnlist() cwidth = width // len(columns) - 10 for i, column in enumerate(columns): self.resultlist.InsertColumn(i + 1, column) self.resultlist.SetColumnWidth(i + 1, cwidth) self.resultlist.query = self.query self.resultlist.SetItemCount(self.query.rowcount()) if __name__ == '__main__': print "Unit testing" class MyWindow(wx.Frame): def __init__(self): wx.Frame.__init__(self, None) self.q = QueryPanel(self) class MyApp(wx.App): def OnInit(self): w = MyWindow() w.SetSize((500,500)) w.Show() return True app = MyApp(0) app.MainLoop()