#!/usr/bin/env python3 """ Database Analysis Script This script analyzes a SQLite database file for corruption and integrity issues. It performs various checks including SQLite's built-in integrity check, table structure validation, and attempts to identify specific errors. Usage: python analyze_db.py Example: python analyze_db.py /path/to/malformed.db """ import argparse import os import sqlite3 import sys import logging from datetime import datetime # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(f"db_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") ] ) logger = logging.getLogger(__name__) def check_file_exists(db_path): """Check if the database file exists.""" if not os.path.isfile(db_path): logger.error(f"Database file not found: {db_path}") return False return True def connect_to_database(db_path): """Attempt to connect to the database.""" try: conn = sqlite3.connect(db_path) logger.info(f"Successfully connected to database: {db_path}") return conn except sqlite3.Error as e: logger.error(f"Failed to connect to database: {e}") return None def run_integrity_check(conn): """Run SQLite's built-in integrity check.""" try: logger.info("Running SQLite integrity check...") cursor = conn.cursor() cursor.execute("PRAGMA integrity_check;") result = cursor.fetchall() if len(result) == 1 and result[0][0] == 'ok': logger.info("Integrity check passed: No corruption detected by SQLite") return True else: logger.error("Integrity check failed. Issues found:") for row in result: logger.error(f" - {row[0]}") return False except sqlite3.Error as e: logger.error(f"Error during integrity check: {e}") return False def check_foreign_keys(conn): """Check for foreign key constraint violations.""" try: logger.info("Checking foreign key constraints...") cursor = conn.cursor() cursor.execute("PRAGMA foreign_key_check;") result = cursor.fetchall() if not result: logger.info("Foreign key check passed: No violations found") return True else: logger.error("Foreign key violations found:") for row in result: logger.error(f" - Table: {row[0]}, Row ID: {row[1]}, Parent: {row[2]}, Foreign Key: {row[3]}") return False except sqlite3.Error as e: logger.error(f"Error during foreign key check: {e}") return False def check_database_structure(conn): """Check the structure of the database (tables, columns, etc.).""" try: logger.info("Analyzing database structure...") cursor = conn.cursor() # Get list of tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() if not tables: logger.warning("No tables found in the database") return False logger.info(f"Found {len(tables)} tables:") for table in tables: table_name = table[0] logger.info(f" - {table_name}") # Get table info try: cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() logger.info(f" Columns: {len(columns)}") # Try to count rows try: cursor.execute(f"SELECT COUNT(*) FROM {table_name};") row_count = cursor.fetchone()[0] logger.info(f" Rows: {row_count}") except sqlite3.Error as e: logger.error(f" Error counting rows in {table_name}: {e}") except sqlite3.Error as e: logger.error(f" Error getting structure for {table_name}: {e}") return True except sqlite3.Error as e: logger.error(f"Error analyzing database structure: {e}") return False def check_journal_mode(conn): """Check the journal mode of the database.""" try: cursor = conn.cursor() cursor.execute("PRAGMA journal_mode;") mode = cursor.fetchone()[0] logger.info(f"Journal mode: {mode}") if mode.lower() == 'wal': # Check for WAL and SHM files db_path = conn.execute("PRAGMA database_list;").fetchone()[2] wal_path = f"{db_path}-wal" shm_path = f"{db_path}-shm" if os.path.exists(wal_path): logger.info(f"WAL file exists: {wal_path}") wal_size = os.path.getsize(wal_path) logger.info(f"WAL file size: {wal_size} bytes") if wal_size > 1000000: # 1MB logger.warning("WAL file is large, which might indicate uncommitted transactions") else: logger.info("No WAL file found") if os.path.exists(shm_path): logger.info(f"SHM file exists: {shm_path}") else: logger.info("No SHM file found") except sqlite3.Error as e: logger.error(f"Error checking journal mode: {e}") def check_for_corruption(conn): """Perform additional checks for common corruption issues.""" try: logger.info("Checking for common corruption issues...") # Check for malformed indexes cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='index';") indexes = cursor.fetchall() for idx in indexes: index_name = idx[0] try: cursor.execute(f"PRAGMA index_info({index_name});") cursor.fetchall() logger.info(f"Index {index_name} appears to be valid") except sqlite3.Error as e: logger.error(f"Index {index_name} may be corrupted: {e}") # Check for database page size and free pages cursor.execute("PRAGMA page_size;") page_size = cursor.fetchone()[0] logger.info(f"Page size: {page_size} bytes") cursor.execute("PRAGMA page_count;") page_count = cursor.fetchone()[0] logger.info(f"Page count: {page_count}") cursor.execute("PRAGMA freelist_count;") freelist_count = cursor.fetchone()[0] logger.info(f"Free pages: {freelist_count}") # Check for auto_vacuum mode cursor.execute("PRAGMA auto_vacuum;") auto_vacuum = cursor.fetchone()[0] logger.info(f"Auto vacuum mode: {auto_vacuum}") return True except sqlite3.Error as e: logger.error(f"Error during corruption check: {e}") return False def attempt_recovery(db_path, advanced=False): """ Attempt recovery operations on the database. Args: db_path: Path to the database file advanced: Whether to use advanced recovery techniques Returns: bool: True if recovery was successful, False otherwise """ logger.info("Attempting database recovery operations...") # Create a backup of the original file backup_path = f"{db_path}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: import shutil shutil.copy2(db_path, backup_path) logger.info(f"Created backup at: {backup_path}") except Exception as e: logger.error(f"Failed to create backup: {e}") return False # List to track recovery attempts and their results recovery_results = [] # Method 1: Standard recovery - create new DB and copy data recovery_results.append(standard_recovery(db_path)) # If advanced recovery is requested, try additional methods if advanced: # Method 2: Try to repair with SQLite's dump and restore recovery_results.append(dump_and_restore_recovery(db_path)) # Method 3: Try to recover with VACUUM recovery_results.append(vacuum_recovery(db_path)) # Method 4: Try to recover individual tables recovery_results.append(table_by_table_recovery(db_path)) # Check if any recovery method was successful if any(recovery_results): logger.info("At least one recovery method was successful.") return True else: logger.error("All recovery methods failed.") return False def standard_recovery(db_path): """Standard recovery method - create new DB and copy data.""" try: recovery_path = f"{db_path}.recovered" logger.info(f"Method 1: Standard recovery - attempting to recover data to: {recovery_path}") # Connect to the original database in read-only mode if possible try: src_conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) except sqlite3.Error: logger.warning("Could not open database in read-only mode, trying normal mode") try: src_conn = sqlite3.connect(db_path) except sqlite3.Error as e: logger.error(f"Could not open source database: {e}") return False # Create a new database for recovery try: if os.path.exists(recovery_path): os.remove(recovery_path) dst_conn = sqlite3.connect(recovery_path) # Get schema from the original database cursor = src_conn.cursor() cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';") tables = cursor.fetchall() # Create tables in the new database dst_cursor = dst_conn.cursor() for table in tables: if table[0]: # Check if SQL statement is not None try: dst_cursor.execute(table[0]) logger.info(f"Created table using: {table[0][:50]}...") except sqlite3.Error as e: logger.error(f"Error creating table: {e}") # Copy data for each table cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';") table_names = cursor.fetchall() for table in table_names: table_name = table[0] try: cursor.execute(f"SELECT * FROM {table_name};") rows = cursor.fetchall() if rows: # Get column count for this table cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() placeholders = ','.join(['?' for _ in range(len(columns))]) # Insert data into the new database dst_conn.executemany( f"INSERT OR IGNORE INTO {table_name} VALUES ({placeholders});", rows ) dst_conn.commit() logger.info(f"Copied {len(rows)} rows from table {table_name}") except sqlite3.Error as e: logger.error(f"Error copying data from table {table_name}: {e}") # Create indexes cursor.execute("SELECT sql FROM sqlite_master WHERE type='index' AND sql IS NOT NULL;") indexes = cursor.fetchall() for idx in indexes: if idx[0]: try: dst_cursor.execute(idx[0]) logger.info(f"Created index using: {idx[0][:50]}...") except sqlite3.Error as e: logger.error(f"Error creating index: {e}") dst_conn.commit() src_conn.close() dst_conn.close() logger.info(f"Standard recovery completed. New database created at: {recovery_path}") return True except sqlite3.Error as e: logger.error(f"Standard recovery failed: {e}") return False except Exception as e: logger.error(f"Unexpected error during standard recovery: {e}") return False def dump_and_restore_recovery(db_path): """Recovery using SQLite's dump and restore functionality.""" try: import subprocess import tempfile recovery_path = f"{db_path}.dump_recovered" logger.info(f"Method 2: Dump and restore recovery - attempting to recover to: {recovery_path}") # Create a temporary SQL file for the dump with tempfile.NamedTemporaryFile(suffix='.sql', delete=False) as temp_file: dump_file = temp_file.name # Try to dump the database using the sqlite3 command-line tool try: subprocess.run( ['sqlite3', db_path, '.dump'], stdout=open(dump_file, 'w'), stderr=subprocess.PIPE, check=True, text=True ) logger.info(f"Database dumped to {dump_file}") except (subprocess.SubprocessError, OSError) as e: logger.error(f"Failed to dump database: {e}") if os.path.exists(dump_file): os.remove(dump_file) return False # Create a new database from the dump if os.path.exists(recovery_path): os.remove(recovery_path) try: subprocess.run( ['sqlite3', recovery_path, '.read ' + dump_file], stderr=subprocess.PIPE, check=True, text=True ) logger.info(f"Created new database from dump at {recovery_path}") # Clean up the temporary dump file if os.path.exists(dump_file): os.remove(dump_file) return True except (subprocess.SubprocessError, OSError) as e: logger.error(f"Failed to restore database from dump: {e}") if os.path.exists(dump_file): os.remove(dump_file) return False except Exception as e: logger.error(f"Unexpected error during dump and restore recovery: {e}") return False def vacuum_recovery(db_path): """Recovery using SQLite's VACUUM command.""" try: recovery_path = f"{db_path}.vacuum_recovered" logger.info(f"Method 3: VACUUM recovery - attempting to recover to: {recovery_path}") # Copy the original database to the recovery path import shutil shutil.copy2(db_path, recovery_path) # Try to run VACUUM on the copied database try: conn = sqlite3.connect(recovery_path) conn.execute("PRAGMA integrity_check;") # This might fail if the DB is corrupted conn.execute("VACUUM;") conn.close() logger.info(f"VACUUM completed successfully on {recovery_path}") return True except sqlite3.Error as e: logger.error(f"VACUUM recovery failed: {e}") return False except Exception as e: logger.error(f"Unexpected error during VACUUM recovery: {e}") return False def table_by_table_recovery(db_path): """Recovery by extracting each table individually.""" try: recovery_path = f"{db_path}.table_recovered" logger.info(f"Method 4: Table-by-table recovery - attempting to recover to: {recovery_path}") # Connect to the original database try: src_conn = sqlite3.connect(db_path) except sqlite3.Error as e: logger.error(f"Could not open source database: {e}") return False # Create a new database for recovery if os.path.exists(recovery_path): os.remove(recovery_path) dst_conn = sqlite3.connect(recovery_path) # Get list of tables try: cursor = src_conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';") tables = cursor.fetchall() except sqlite3.Error as e: logger.error(f"Could not get table list: {e}") src_conn.close() dst_conn.close() return False # Process each table individually success = False for table in tables: table_name = table[0] try: # Get table schema cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table_name}';") table_sql = cursor.fetchone()[0] # Create table in destination database dst_conn.execute(table_sql) # Try to copy data row by row cursor.execute(f"SELECT * FROM {table_name};") columns = [description[0] for description in cursor.description] column_count = len(columns) # Prepare insert statement with column names column_names = ', '.join(columns) placeholders = ', '.join(['?' for _ in range(column_count)]) insert_sql = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})" # Copy data in batches to handle large tables batch_size = 1000 rows = cursor.fetchmany(batch_size) total_rows = 0 while rows: dst_conn.executemany(insert_sql, rows) dst_conn.commit() total_rows += len(rows) rows = cursor.fetchmany(batch_size) logger.info(f"Recovered table {table_name} with {total_rows} rows") success = True except sqlite3.Error as e: logger.error(f"Error recovering table {table_name}: {e}") # Continue with next table # Close connections src_conn.close() dst_conn.close() if success: logger.info(f"Table-by-table recovery completed with at least one table recovered at {recovery_path}") return True else: logger.error("Table-by-table recovery failed to recover any tables") return False except Exception as e: logger.error(f"Unexpected error during table-by-table recovery: {e}") return False def analyze_database(db_path, auto_recovery=False, advanced_recovery=False): """ Main function to analyze the database. Args: db_path: Path to the database file auto_recovery: Whether to automatically attempt recovery if issues are found advanced_recovery: Whether to use advanced recovery techniques Returns: bool: True if analysis/recovery was successful, False otherwise """ logger.info(f"Starting analysis of database: {db_path}") if not check_file_exists(db_path): return False conn = connect_to_database(db_path) if not conn: logger.error("Could not connect to the database. It may be severely corrupted.") if auto_recovery: logger.info("Auto-recovery mode enabled. Attempting recovery...") return attempt_recovery(db_path, advanced=advanced_recovery) else: response = input("Would you like to attempt recovery? (y/n): ") if response.lower() == 'y': return attempt_recovery(db_path, advanced=advanced_recovery) return False # Run various checks integrity_ok = run_integrity_check(conn) foreign_keys_ok = check_foreign_keys(conn) structure_ok = check_database_structure(conn) check_journal_mode(conn) corruption_check_ok = check_for_corruption(conn) # Summarize findings logger.info("\n=== Analysis Summary ===") logger.info(f"Integrity check: {'PASSED' if integrity_ok else 'FAILED'}") logger.info(f"Foreign key check: {'PASSED' if foreign_keys_ok else 'FAILED'}") logger.info(f"Structure check: {'PASSED' if structure_ok else 'FAILED'}") logger.info(f"Corruption check: {'PASSED' if corruption_check_ok else 'FAILED'}") # Overall assessment if integrity_ok and foreign_keys_ok and structure_ok and corruption_check_ok: logger.info("\nOVERALL ASSESSMENT: The database appears to be in good condition.") return True else: logger.warning("\nOVERALL ASSESSMENT: Issues were detected in the database.") # Attempt recovery if auto_recovery is enabled or user confirms if auto_recovery: logger.info("Auto-recovery mode enabled. Attempting recovery...") return attempt_recovery(db_path, advanced=advanced_recovery) else: response = input("Would you like to attempt recovery? (y/n): ") if response.lower() == 'y': return attempt_recovery(db_path, advanced=advanced_recovery) return False def main(): """Parse command line arguments and run the analysis.""" parser = argparse.ArgumentParser(description='Analyze a SQLite database for corruption and integrity issues.') parser.add_argument('db_path', help='Path to the SQLite database file (.db)') parser.add_argument('--recovery', action='store_true', help='Automatically attempt recovery if issues are found') parser.add_argument('--advanced', action='store_true', help='Use advanced recovery techniques (multiple methods)') args = parser.parse_args() if not args.db_path.endswith('.db'): logger.warning(f"The specified file ({args.db_path}) does not have a .db extension. It may not be a SQLite database.") if not args.recovery: # Only prompt if not in auto-recovery mode response = input("Continue anyway? (y/n): ") if response.lower() != 'y': return else: logger.info("Auto-recovery mode enabled. Continuing despite non-standard extension.") # Run analysis with specified recovery options result = analyze_database(args.db_path, auto_recovery=args.recovery, advanced_recovery=args.advanced) # Provide a clear summary of the result if result: logger.info("Database analysis/recovery completed successfully.") else: logger.error("Database analysis/recovery failed.") return result if __name__ == "__main__": main()