import ast import csv import json import logging import re from playhouse.sqlite_ext import JSONField from .models import Archive, Autotests, Log, Recipes, Session, Steps, Users, db models_reference = { "archive": Archive, "autotests": Autotests, "log": Log, "recipes": Recipes, "steps": Steps, "users": Users, } db.connect() db.create_tables(list(models_reference.values())) log = logging.getLogger("db") def init_db(): tables = db.get_tables() tables.sort() for table in tables: count = 0 try: with open(f"src/lib/db/imports/{table}.csv", "r") as f: table = models_reference[table] fields = list(table._meta.fields) log.info(f"importing {table._meta.table_name}") reader = csv.DictReader(f) for row in reader: obj = {} for field in fields: if type(table._meta.fields[field]) is JSONField: obj[field] = json.loads(row[field]) else: try: obj[field] = ast.literal_eval(row[field]) except (SyntaxError, ValueError): obj[field] = row[field] table.insert(**obj).on_conflict_replace().execute() count += 1 log.info(f"{table._meta.table_name}: imported {count} rows.") except FileNotFoundError: pass # try: # with open("src/lib/db/imports/connectors.csv", "r") as f: # log.info("importing connectors") # reader = csv.DictReader(f) # count = 0 # for row in reader: # steps = {} # for step_type, step_spec in { # "connector": {"connector": row["connector"]}, # "barcodes": {}, # "leak_1": {"pre_filling_time": 1, "pre_filling_pressure": 1000, "filling_time": 1, "settling_time": 1, "settling_pressure_min_percent": 5, "settling_pressure_max_percent": 5, "test_time": 5, "test_pressure_min_delta": 100, "test_pressure": 1000, "test_pressure_max_delta": 100, "flush_time": 1, "flush_pressure": 100}, # "leak_2": {"pre_filling_time": 1, "pre_filling_pressure": 1000, "filling_time": 1, "settling_time": 1, "settling_pressure_min_percent": 5, "settling_pressure_max_percent": 5, "test_time": 5, "test_pressure_min_delta": 100, "test_pressure": 1000, "test_pressure_max_delta": 100, "flush_time": 1, "flush_pressure": 100}, # "resistance": {"scale": 500, "expected": row["resistance_expected"], "tolerance": row["resistance_tolerance"]}, # "vision": {}, # "print": {}, # }.items(): # steps[step_type] = Steps( # type=re.sub(r"^(.*)_[0-9]+$", r"\1", step_type), # spec=step_spec, # ) # steps[step_type].save() # recipe = Recipes( # name=row["name"], # client=row["client"], # part_number=row["part_number"], # spec={ # "connector": True, # "barcodes": True, # "leak_1": False, # "leak_2": False, # "resistance": True, # "vision": False, # "print": True, # "steps": [steps["connector"].id, steps["barcodes"].id, steps["resistance"].id, steps["print"].id, ], # ids of the enabled steps # "available_steps": { # "connector": steps["connector"].id, # "barcodes": steps["barcodes"].id, # "leak_1": steps["leak_1"].id, # "leak_2": steps["leak_2"].id, # "resistance": steps["resistance"].id, # "vision": steps["vision"].id, # "print": steps["print"].id, # }, # }, # ) # recipe.save() # count += 1 # log.info(f"connectors: imported {count} rows.") # except FileNotFoundError: # pass init_db() # register or reset admin admin = Users.get_user("ADMIN") if admin is None or not admin.is_admin: Users.register(username="ADMIN", password="123123", roles=["admin"]) # register or reset user Users.register(username="USER", password="user") if True: # crud_db must be imported after db and models_reference are available from .crud_db import Crud_DB