st-ten-1/src/lib/db/__init__.py

119 lines
4.8 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
import ast
import csv
import json
import logging
import re
2022-06-01 16:37:19 +00:00
from playhouse.sqlite_ext import JSONField
2022-07-19 09:59:00 +00:00
from .models import Archive, Autotests, Log, Recipes, Session, Steps, Users, db
2022-06-01 16:37:19 +00:00
models_reference = {
"archive": Archive,
"autotests": Autotests,
"log": Log,
"recipes": Recipes,
2022-07-19 09:59:00 +00:00
"steps": Steps,
2022-06-01 16:37:19 +00:00
"users": Users,
}
db.connect()
db.create_tables(list(models_reference.values()))
log = logging.getLogger("db")
def init_db():
tables = db.get_tables()
tables.sort()
for table in tables:
count = 0
try:
with open(f"src/lib/db/imports/{table}.csv", "r") as f:
2022-06-01 16:37:19 +00:00
table = models_reference[table]
fields = list(table._meta.fields)
log.info(f"importing {table._meta.table_name}")
reader = csv.DictReader(f)
for row in reader:
obj = {}
for field in fields:
if type(table._meta.fields[field]) is JSONField:
obj[field] = json.loads(row[field])
else:
try:
obj[field] = ast.literal_eval(row[field])
except (SyntaxError, ValueError):
obj[field] = row[field]
table.insert(**obj).on_conflict_replace().execute()
count += 1
log.info(f"{table._meta.table_name}: imported {count} rows.")
except FileNotFoundError:
pass
# try:
# with open("src/lib/db/imports/connectors.csv", "r") as f:
# log.info("importing connectors")
# reader = csv.DictReader(f)
# count = 0
# for row in reader:
# steps = {}
# for step_type, step_spec in {
# "connector": {"connector": row["connector"]},
# "barcodes": {},
2022-09-06 15:35:49 +00:00
# "resistance": {"scale": 500, "expected": float(row["resistance_expected"]), "tolerance": float(row["resistance_tolerance"])},
# "leak_1": {"pre_filling_time": 1, "pre_filling_pressure": 1000, "filling_time": 1, "settling_time": 1, "settling_pressure_min_percent": 5, "settling_pressure_max_percent": 5, "test_time": 5, "test_pressure_min_delta": 100, "test_pressure": 1000, "test_pressure_max_delta": 100, "flush_time": 1, "flush_pressure": 100},
# "leak_2": {"pre_filling_time": 1, "pre_filling_pressure": 1000, "filling_time": 1, "settling_time": 1, "settling_pressure_min_percent": 5, "settling_pressure_max_percent": 5, "test_time": 5, "test_pressure_min_delta": 100, "test_pressure": 1000, "test_pressure_max_delta": 100, "flush_time": 1, "flush_pressure": 100},
# "vision": {},
2022-09-06 13:15:01 +00:00
# "print": {"template": "EtichettaR5", },
# }.items():
# steps[step_type] = Steps(
# type=re.sub(r"^(.*)_[0-9]+$", r"\1", step_type),
# spec=step_spec,
# )
# steps[step_type].save()
# recipe = Recipes(
# name=row["name"],
# client=row["client"],
# part_number=row["part_number"],
# spec={
# "connector": True,
# "barcodes": True,
2022-09-06 13:15:01 +00:00
# "resistance": True,
# "leak_1": False,
# "leak_2": False,
# "vision": False,
# "print": True,
2022-09-06 15:06:56 +00:00
# "steps": [steps["connector"].id, steps["resistance"].id, steps["print"].id, ], # ids of the enabled steps
# "available_steps": {
# "connector": steps["connector"].id,
# "barcodes": steps["barcodes"].id,
2022-09-06 13:15:01 +00:00
# "resistance": steps["resistance"].id,
# "leak_1": steps["leak_1"].id,
# "leak_2": steps["leak_2"].id,
# "vision": steps["vision"].id,
# "print": steps["print"].id,
# },
# },
# )
# recipe.save()
# count += 1
# log.info(f"connectors: imported {count} rows.")
# except FileNotFoundError:
# pass
2022-06-01 16:37:19 +00:00
init_db()
# register or reset admin
admin = Users.get_user("ADMIN")
if admin is None or not admin.is_admin:
Users.register(username="ADMIN", password="123123", roles=["admin"])
# register or reset user
Users.register(username="USER", password="user")
2022-07-19 09:59:00 +00:00
2022-06-01 16:37:19 +00:00
if True:
# crud_db must be imported after db and models_reference are available
from .crud_db import Crud_DB