st-ten-1/src/ui/crud/crud.py
matteo porta 90250d622e wip
2022-09-26 12:35:02 +02:00

558 lines
24 KiB
Python
Executable File

import ast
import json
import traceback
import weakref
from datetime import datetime
from lib.db import Crud_DB
from peewee import TextField
from PyQt5.QtCore import Qt, QTimer, pyqtSignal
from PyQt5.QtWidgets import (QAbstractItemView, QComboBox, QDialog,
QGridLayout, QHeaderView, QLineEdit, QMessageBox,
QPlainTextEdit, QPushButton)
from ui.dialog import Dialog
from ui.editor import Editor
from ui.widget import Widget
def to_str(data):
if data is None:
return None
elif type(data) is bytes:
data = data.decode("UTF-8", errors="replace")
elif type(data) is datetime:
data = data.strftime("%Y-%m-%d %H:%M:%S")
return str(data)
def from_str(data, field=None):
if data is not None and len(data):
if type(field) is TextField:
return data
try:
return ast.literal_eval(data)
except (SyntaxError, ValueError):
return data
return None
class Cell:
modified = pyqtSignal(bool)
def __init__(self, readonly=True, autocomplete=None, field_name=None, field_alias=None, field=None, row_number=None, crud=None):
self.readonly = readonly
self.autocomplete = autocomplete
self.field_name = field_name
self.field_alias = field_alias
self.field = field
self.row_number = row_number
if crud is None:
self.crud = None
elif type(crud) is weakref.ReferenceType:
self.crud = crud
else:
self.crud = weakref.ref(crud)
self.value = None
self.is_modified = False
self.connected_modified = False
self.set_readonly(self.readonly)
self.do_autocomplete(self.autocomplete)
def set_readonly(self, readonly=True):
raise NotImplementedError()
def do_autocomplete(self, autocomplete):
self.render(autocomplete)
def connect_modified(self):
raise NotImplementedError()
def _render(self, data, *args, **kwargs):
self.value = data
self.is_modified = False
self.render(data, *args, **kwargs)
if not self.connected_modified:
# only connect after first render
# to avoid false modified signals trigghered by autocomplete
self.connect_modified()
self.connected_modified = True
def render(self, data, field_name=None, row_number=None, crud=None):
raise NotImplementedError()
def check_modified(self, *args, **kwargs):
try:
value = self.parse()
fail = False
except Exception:
self.log.exception(traceback.format_exc())
value = None
fail = True
if fail or value != self.value:
self.is_modified = True
else:
self.is_modified = False
self.modified.emit(self.is_modified)
return self.is_modified
def _parse(self, *args, **kwargs):
if self.connected_modified and not self.is_modified:
return self.value
return self.parse(*args, **kwargs)
def parse(self, row_number=None, crud=None):
raise NotImplementedError()
class Line_Edit_Cell_Widget(QLineEdit, Cell):
def __init__(self, readonly=True, autocomplete=None, field_name=None, field_alias=None, field=None, row_number=None, crud=None):
super().__init__()
Cell.__init__(self, readonly=readonly, autocomplete=autocomplete, field_name=field_name, field_alias=field_alias, field=field, row_number=row_number, crud=crud)
def set_readonly(self, readonly=True):
self.setReadOnly(readonly)
def connect_modified(self):
self.textChanged.connect(self.check_modified)
def render(self, data, field_name=None, row_number=None, crud=None):
self.value = data
self.setText(to_str(data))
def parse(self, row_number=None, crud=None):
return from_str(self.text(), self.field)
class Combo_Box_Cell_Widget(QComboBox, Cell):
def __init__(self, readonly=True, autocomplete=None, field_name=None, field_alias=None, field=None, row_number=None, crud=None):
super().__init__()
Cell.__init__(self, readonly=readonly, autocomplete=autocomplete, field_name=field_name, field_alias=field_alias, field=field, row_number=row_number, crud=crud)
def set_readonly(self, readonly=True):
self.setEditable(not readonly)
def do_autocomplete(self, autocomplete):
if autocomplete is not None:
self.addItems(list(map(str, autocomplete)))
def connect_modified(self):
self.editTextChanged.connect(self.check_modified)
def render(self, data, field_name=None, row_number=None, crud=None):
self.setCurrentText(to_str(data))
def parse(self, row_number=None, crud=None):
return from_str(self.currentText(), self.field)
class External_Dialog_Cell_Widget(QPushButton, Cell):
def __init__(self, readonly=True, autocomplete=None, field_name=None, field_alias=None, field=None, row_number=None, crud=None):
self.dialog = QDialog()
self.editor = QPlainTextEdit()
self.dialog.setLayout(QGridLayout())
self.dialog.layout().setSpacing(0)
self.dialog.layout().setContentsMargins(0, 0, 0, 0)
self.dialog.layout().addWidget(self.editor, 0, 0, -1, -1)
if crud() is None or crud().readonly is True or (crud().readonly is list and field_name in crud().readonly):
super().__init__(u"\u238B visualizza")
else:
super().__init__(u"\u238B modifica")
Cell.__init__(self, readonly=readonly, autocomplete=autocomplete, field_name=field_name, field_alias=field_alias, field=field, row_number=row_number, crud=crud)
self.dialog.setWindowTitle(self.field_alias)
self.clicked.connect(self.show_dialog)
def set_readonly(self, readonly=True):
self.editor.setReadOnly(readonly)
def show_dialog(self):
self.dialog.show()
def connect_modified(self):
self.editor.textChanged.connect(self.check_modified)
def render(self, data, field_name=None, row_number=None, crud=None):
self.editor.setPlainText(to_str(data))
def parse(self, row_number=None, crud=None):
return from_str(self.editor.toPlainText(), self.field)
class Json_External_Dialog_Cell_Widget(External_Dialog_Cell_Widget):
def render(self, data, field_name=None, row_number=None, crud=None):
self.editor.setPlainText(json.dumps(data, indent="\t"))
def parse(self, row_number=None, crud=None):
return json.loads(self.editor.toPlainText())
class Json_External_Dialog_Editor_Cell_Widget(QPushButton, Cell):
def __init__(self, editor_widget_class, *args, readonly=True, autocomplete=None, field_name=None, field_alias=None, field=None, row_number=None, crud=None, **kwargs):
if not issubclass(editor_widget_class, Editor):
raise AssertionError(f"editor_widget_class {editor_widget_class!r} must be a subclass of {Editor!r}")
self.editor = editor_widget_class(*args, cell_widget=weakref.ref(self), **kwargs)
if crud() is None or crud().readonly is True or (crud().readonly is list and field_name in crud().readonly):
super().__init__(u"\u238B visualizza")
else:
super().__init__(u"\u238B modifica")
Cell.__init__(self, readonly=readonly, autocomplete=autocomplete, field_name=field_name, field_alias=field_alias, field=field, row_number=row_number, crud=crud)
self.dialog = Dialog()
self.dialog.setAttribute(Qt.WA_DeleteOnClose, on=False)
self.dialog.setCentralWidget(self.editor)
self.dialog.setWindowTitle(self.field_alias)
self.clicked.connect(self.show_dialog)
if hasattr(self.editor, "init"):
self.editor.init()
def set_readonly(self, readonly=True):
self.editor.set_readonly(readonly=readonly)
def do_autocomplete(self, autocomplete):
self.editor.do_autocomplete(autocomplete)
def show_dialog(self):
if hasattr(self.editor, "showing_dialog"):
self.editor.showing_dialog()
self.dialog.show()
def connect_modified(self):
self.editor.connect_modified(self.check_modified)
def render(self, data, field_name=None, row_number=None, crud=None):
self.editor.render(data, field_name=field_name, row_number=row_number, crud=crud)
def parse(self, row_number=None, crud=None):
return self.editor.parse(row_number=row_number, crud=crud)
class Crud(Widget):
modified = pyqtSignal(bool)
selected = pyqtSignal(object)
def __init__(self, table_name, readonly=False, select=None, filters=None, fields_aliases=None, autocomplete=None, sort=None, pagination=250, display_name=None, row_upgrader=None, widget_classes=None, row_filter=None):
super().__init__()
self.table_name = table_name
self.readonly = readonly
self.db_gb.setTitle(display_name if display_name is not None else self.table_name)
self.db = Crud_DB(self.table_name, filters=filters, pagination=pagination)
if select is not None and len(select):
self.select = select
else:
self.select = self.db.table_fields
self.select_index = {f: i for i, f in enumerate(self.select)}
if fields_aliases is not None:
self.fields_aliases = {fn: fields_aliases.get(fn, fn) for fn in self.select}
else:
self.fields_aliases = {fn: fn for fn in self.select}
self.autocomplete = autocomplete if autocomplete is not None else {}
self.default_sorting = {}
if sort is not None:
for fn, s in sort.items():
if fn not in self.db.table_fields:
raise AssertionError(f"sorting column {fn!r} does not exist")
if s is True:
s = 0
elif s is False:
s = 1
else:
raise AssertionError(f"bad sorting spec for column {fn!r}: {s!r}, must be True or False")
self.default_sorting[fn] = s
self.sorting = self.default_sorting.copy()
self.pagination = pagination
self.row_upgrader = row_upgrader if row_upgrader is not None else self.default_row_upgrader
self.default_widget_class = Line_Edit_Cell_Widget
self.widget_classes = widget_classes if widget_classes is not None else {}
self.row_filter = row_filter if row_filter is not None else self.default_row_filter
self.page = 0
self.deleted_rows = set()
self.filters = {}
self.filter_cn = None
self.filter_delay = QTimer()
self.filter_delay.setSingleShot(True)
self.filter_delay.setInterval(500)
self.filter_delay.timeout.connect(lambda self=weakref.ref(self): self().do_filter(self().filter_cn))
self.db_tw.crud = self
self.refresh("init")
self.db_tw.horizontalHeader().sectionClicked.connect(self.toggle_sort)
# self.db_tw.horizontalHeader().setStretchLastSection(True)
self.db_tw.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
# self.db_tw.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
self.db_tw.setSelectionBehavior(QAbstractItemView.SelectRows)
if self.readonly is None or self.readonly is True:
self.db_tw.setSelectionMode(QAbstractItemView.SingleSelection)
else:
self.db_tw.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.db_tw.itemSelectionChanged.connect(self.show_selection)
self.revert_b.clicked.connect(lambda checked, self=weakref.ref(self): self().refresh("revert"))
if self.pagination is not False:
self.start_b.clicked.connect(lambda checked, self=weakref.ref(self): self().refresh("pagination", page=0))
self.previous_b.clicked.connect(lambda checked, self=weakref.ref(self): self().refresh("pagination", page=max(self().page - 1, 0)))
self.page_n_sb.valueChanged.connect(lambda page, self=weakref.ref(self): self().refresh("pagination", page=page - 1))
self.next_b.clicked.connect(lambda checked, self=weakref.ref(self): self().refresh("pagination", page=self().page + 1))
self.end_b.clicked.connect(lambda checked, self=weakref.ref(self): self().refresh("pagination", page=-1))
else:
self.next_b.setHidden(True)
self.previous_b.setHidden(True)
self.start_b.setHidden(True)
self.end_b.setHidden(True)
self.page_n_sb.setHidden(True)
if self.readonly is None or self.readonly is True:
self.cancel_b.setEnabled(False)
self.cancel_b.setHidden(True)
self.commit_b.setEnabled(False)
self.commit_b.setHidden(True)
self.add_b.setEnabled(False)
self.add_b.setHidden(True)
self.delete_b.setEnabled(False)
self.delete_b.setHidden(True)
else:
self.cancel_b.clicked.connect(self.cancel)
self.commit_b.clicked.connect(self.commit)
self.add_b.clicked.connect(self.add)
self.delete_b.clicked.connect(self.delete)
sort_cycle = [True, False, None]
sort_symbol = [u" \u25B4", u" \u25BE", u""]
def toggle_sort(self, cn=None, refresh=True):
if cn is not None:
fn = self.select[cn]
if fn not in self.db.table_fields:
return
self.db_tw.setHorizontalHeaderLabels(self.fields_aliases.values())
current = self.sorting.get(fn, -1)
current = (current + 1) % len(self.sort_cycle)
self.sorting.clear() # for single column sorting
self.db.sort(None) # for single column sorting
self.sorting[fn] = current
current = self.sort_cycle[current]
self.db.sort(fn, is_ascending=current)
self.db_tw.horizontalHeaderItem(cn).setText(f"{self.fields_aliases[fn]}{self.sort_symbol[self.sorting[fn]]}")
else:
self.db_tw.setHorizontalHeaderLabels(self.fields_aliases.values())
self.sorting = self.default_sorting.copy()
# self.sorting.clear() # for single column sorting
if len(self.sorting):
for fn, s in reversed(self.sorting.items()):
self.db.sort(fn, is_ascending=self.sort_cycle[s])
fn = list(self.sorting)[0]
cn = self.select_index[fn]
self.db_tw.horizontalHeaderItem(cn).setText(f"{self.fields_aliases[fn]}{self.sort_symbol[self.sorting[fn]]}")
else:
self.db.sort(None) # for single column sorting
if refresh:
self.refresh()
def filter_edited(self, cn):
# self.do_filter(cn)
self.filter_cn = cn
self.filter_delay.start()
def do_filter(self, cn):
self.filter_delay.stop() # avoid delayed textChanged filter updates after editingFinished filter updates
if cn is None:
return
text = self.db_tw.cellWidget(0, cn).text()
if not len(text):
text = None
fn = self.select[cn]
if text != self.filters.get(fn, None):
self.filters[fn] = text
self.db.filter(self.select[cn], text)
self.refresh()
@staticmethod
def default_row_upgrader(row, row_number, crud):
"""should return the edited_row"""
return row
@staticmethod
def default_row_filter(row, row_number, crud):
"""should return a tuple: commit_this_row_bool, edited_row, fail_current_commit_bool"""
return True, row, False
def set_modified(self, is_modified=True):
self._modified = self._modified or is_modified
self.modified.emit(self._modified)
def parse_row(self, rn, fail=False):
r = {}
fails = []
for fn, cn in self.select_index.items():
w = self.db_tw.cellWidget(rn, cn)
try:
r[fn] = w._parse(row_number=rn, crud=weakref.ref(self))
except Exception as e:
self.log.exception(traceback.format_exc())
self.set_row_color(rn, "red")
fail = True
fails.append(str(e))
return r, fail, fails
def refresh(self, action=None, page=0):
# IGNORE SIGNALS WHILE UPDATING
self.db_tw.blockSignals(True)
self._modified = False # force reset modified status
self.set_modified(self._modified)
if action == "init":
self.previous_selection = []
self.db_tw.setColumnCount(len(self.select))
self.db_tw.setHorizontalHeaderLabels(self.fields_aliases.values())
self.db_tw.setRowCount(1)
if self.readonly is None or self.readonly is True:
self.db_tw.verticalHeader().setHidden(True)
else:
self.db_tw.verticalHeader().setHidden(False)
self.toggle_sort(None, refresh=False)
elif action == "cancel":
# REVERT TABLE CHANGES
pass
elif action == "revert":
# REVERT SORTING AND FILTERS
self.toggle_sort(None, refresh=False)
self.filters.clear()
self.db.revert()
elif action == "commit":
if self.readonly is None or self.readonly is True:
self.log.error(f"tried to {action!r} with readonly = {self.readonly}")
return False
# COMMIT CHANGES
fail = False
fails = []
data = []
for rn in range(1, self.db_tw.rowCount()): # start=1 because rn starts from 1 (filters line)
r, fail, r_fails = self.parse_row(rn, fail=fail)
fails += r_fails
add_row, r, filter_fail = self.row_filter(r, rn, self)
if filter_fail:
fail = True
fails.append(str("filter fail"))
if add_row:
data.append(r)
if fail:
QMessageBox.critical(None, "Errore Salvataggio DB", "\n".join(fails))
return False
if self.db.table_pk.name not in self.select:
for rn, r in enumerate(data):
r[self.db.table_pk.name] = self.data_index[rn]
# INDEX DATA WITH PK
try:
self.db.commit(data, deleted_rows=self.deleted_rows)
except Exception as e:
self.log.exception(traceback.format_exc())
QMessageBox.critical(None, "Errore Salvataggio DB", str(e))
return False
# GET DATA
data, self.data_total_count, self.page, self.last_page = self.db.get(page)
self.data_index = [r[self.db.table_pk.name] for r in data]
self.page_n_sb.blockSignals(True)
self.page_n_sb.setRange(1, self.last_page + 1)
self.page_n_sb.setValue(self.page + 1)
self.page_n_sb.setSuffix(" / {}".format(self.last_page + 1))
self.page_n_sb.blockSignals(False)
# RESET DELETED ROWS
self.deleted_rows.clear()
# CLEAR CURRENT VALUES
self.db_tw.clearContents()
# SET TABLE ROW COUNT ACCORDINGLY
self.db_tw.setRowCount(len(data) + 1) # + 1 because rn starts from 1 (filters line)
# RESTORE FILTERS
for cn, fn in enumerate(self.select):
w = QLineEdit()
if fn in self.db.table_fields:
w.setPlaceholderText("Filtro")
w.setText(self.filters.get(fn, None))
w.editingFinished.connect(lambda cn=cn, self=weakref.ref(self): self().do_filter(cn))
w.textChanged.connect(lambda text, cn=cn, self=weakref.ref(self): self().filter_edited(cn))
else:
w.setPlaceholderText("Non filtrabile")
w.setEnabled(False)
self.db_tw.setCellWidget(0, cn, w)
# INSERT UPDATED DATA
for rn, r in enumerate(data, start=1): # start=1 because rn starts from 1 (filters line)
r = self.row_upgrader(r, rn, self)
for fn, cn in self.select_index.items():
readonly = self.readonly is None or self.readonly is True or (self.readonly is not False and fn in self.readonly)
w = self.widget_classes.get(fn, self.default_widget_class)(readonly=readonly, autocomplete=self.autocomplete.get(fn, None), field_name=fn, field_alias=self.fields_aliases[fn], field=self.db.table_model._meta.fields.get(fn, None), row_number=rn, crud=weakref.ref(self))
w.set_readonly(readonly=w.readonly)
w.do_autocomplete(w.autocomplete)
w.modified.connect(self.set_modified)
if fn in r:
w._render(data=r[fn], row_number=rn, crud=weakref.ref(self))
self.db_tw.setCellWidget(rn, cn, w)
# REENABLE EVENTS AFTER UPDATE
self.db_tw.blockSignals(False)
self.show_selection()
return True
def cancel(self):
return self.refresh("cancel")
def commit(self):
return self.refresh("commit")
def add(self):
self.set_modified(True)
self.data_index.append(None)
rn = self.db_tw.rowCount()
self.db_tw.setRowCount(rn + 1)
for fn, cn in self.select_index.items():
readonly = self.readonly is None or self.readonly is True or (self.readonly is not False and fn in self.readonly)
w = self.widget_classes.get(fn, self.default_widget_class)(readonly=readonly, autocomplete=self.autocomplete.get(fn, None), field_name=fn, field_alias=self.fields_aliases[fn], field=self.db.table_model._meta.fields.get(fn, None), row_number=rn, crud=weakref.ref(self))
w.set_readonly(readonly=w.readonly)
w.do_autocomplete(w.autocomplete)
w.modified.connect(self.set_modified)
self.db_tw.setCellWidget(rn, cn, w)
self.db_tw.scrollToBottom()
def delete(self):
self.set_modified(True)
selected = self.get_selected_rows()
if len(selected) == 0:
return
# ret = QMessageBox.warning(
# None,
# u"Conferma rimozione linee",
# u"Si \u00E8 sicuri di voler eliminare le linee selezionate?",
# buttons=QMessageBox.Ok | QMessageBox.Cancel,
# defaultButton=QMessageBox.Cancel
# )
# if ret == QMessageBox.Ok:
if True:
for rn in reversed(selected):
pk = self.data_index.pop(rn - 1) # - 1 because rn starts from 1 (filters line)
if pk is not None:
self.deleted_rows.add(pk)
self.db_tw.removeRow(rn)
def get_selected_rows(self):
selected = self.db_tw.selectedRanges()
rows = set()
for s in selected:
rows.update(range(s.topRow(), s.bottomRow() + 1))
rows.discard(0) # ship filters
return sorted(rows)
def show_selection(self):
selected = self.get_selected_rows()
self.selected.emit(selected)
if self.previous_selection == selected:
return
for row_number in self.previous_selection:
self.set_row_color(row_number)
for row_number in selected:
self.set_row_color(row_number, "cyan")
self.previous_selection = selected
def set_row_color(self, row_number, color=None):
for i in range(self.db_tw.columnCount()):
w = self.db_tw.cellWidget(row_number, i)
if w is not None:
if color is not None:
w.setStyleSheet(f"background-color: {color};")
else:
w.setStyleSheet("")
def emit(self):
self.set_modified(self._modified)
self.show_selection()