160 lines
5.8 KiB
Python

"""Module to check for the database with unified backend support."""
import os
from typing import Dict, Any
import yaml
from custom_functions.database.unified_db_ops import (
db_ops,
get_backend_info,
key_count,
)
from custom_functions.database.user_db import create_user_database
def get_database_config() -> Dict[str, Any]:
"""Get the database configuration from config.yaml."""
try:
config_path = os.path.join(os.getcwd(), "configs", "config.yaml")
with open(config_path, "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
return config
except (FileNotFoundError, KeyError, yaml.YAMLError) as e:
print(f"Warning: Could not load config.yaml: {e}")
return {"database_type": "sqlite"} # Default fallback
def check_for_sqlite_database() -> None:
"""Check for the SQLite database file and create if needed."""
config = get_database_config()
database_type = config.get("database_type", "sqlite").lower()
# Only check for SQLite file if we're using SQLite
if database_type == "sqlite":
sqlite_path = os.path.join(os.getcwd(), "databases", "sql", "key_cache.db")
if not os.path.exists(sqlite_path):
print("SQLite database not found, creating...")
# Ensure directory exists
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
db_ops.create_database()
print(f"SQLite database created at: {sqlite_path}")
else:
print(f"SQLite database found at: {sqlite_path}")
def check_for_mariadb_database() -> None:
"""Check for the MariaDB database and create if needed."""
config = get_database_config()
database_type = config.get("database_type", "sqlite").lower()
# Only check MariaDB if we're using MariaDB
if database_type == "mariadb":
try:
print("Checking MariaDB connection and creating database if needed...")
db_ops.create_database()
print("MariaDB database check completed successfully")
except Exception as e:
print(f"Error checking/creating MariaDB database: {e}")
print("Falling back to SQLite...")
# Fallback to SQLite if MariaDB fails
fallback_config_path = os.path.join(os.getcwd(), "configs", "config.yaml")
try:
with open(fallback_config_path, "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
config["database_type"] = "sqlite"
with open(fallback_config_path, "w", encoding="utf-8") as file:
yaml.safe_dump(config, file)
check_for_sqlite_database()
except Exception as fallback_error:
print(f"Error during fallback to SQLite: {fallback_error}")
def check_for_user_database() -> None:
"""Check for the user database and create if needed."""
user_db_path = os.path.join(os.getcwd(), "databases", "users.db")
if not os.path.exists(user_db_path):
print("User database not found, creating...")
# Ensure directory exists
os.makedirs(os.path.dirname(user_db_path), exist_ok=True)
create_user_database()
print(f"User database created at: {user_db_path}")
else:
print(f"User database found at: {user_db_path}")
def check_for_sql_database() -> None:
"""Check for the SQL database based on configuration."""
print("=== Database Check Starting ===")
# Get backend information
backend_info = get_backend_info()
print(f"Database backend: {backend_info['backend']}")
print(f"Using module: {backend_info['module']}")
config = get_database_config()
database_type = config.get("database_type", "sqlite").lower()
# Ensure databases directory exists
os.makedirs(os.path.join(os.getcwd(), "databases"), exist_ok=True)
os.makedirs(os.path.join(os.getcwd(), "databases", "sql"), exist_ok=True)
# Check main database based on type
if database_type == "mariadb":
check_for_mariadb_database()
else: # Default to SQLite
check_for_sqlite_database()
# Always check user database (always SQLite)
check_for_user_database()
print("=== Database Check Completed ===")
def get_database_status() -> Dict[str, Any]:
"""Get the current database status and configuration."""
config = get_database_config()
backend_info = get_backend_info()
status = {
"configured_backend": config.get("database_type", "sqlite").lower(),
"active_backend": backend_info["backend"],
"module_in_use": backend_info["module"],
"sqlite_file_exists": os.path.exists(
os.path.join(os.getcwd(), "databases", "sql", "key_cache.db")
),
"user_db_exists": os.path.exists(
os.path.join(os.getcwd(), "databases", "users.db")
),
}
# Try to get key count to verify database is working
try:
status["key_count"] = key_count()
status["database_operational"] = True
except Exception as e:
status["key_count"] = "Error"
status["database_operational"] = False
status["error"] = str(e)
return status
def print_database_status() -> None:
"""Print a formatted database status report."""
status = get_database_status()
print("\n=== Database Status Report ===")
print(f"Configured Backend: {status['configured_backend']}")
print(f"Active Backend: {status['active_backend']}")
print(f"Module in Use: {status['module_in_use']}")
print(f"SQLite File Exists: {status['sqlite_file_exists']}")
print(f"User DB Exists: {status['user_db_exists']}")
print(f"Database Operational: {status['database_operational']}")
print(f"Key Count: {status['key_count']}")
if not status["database_operational"]:
print(f"Error: {status.get('error', 'Unknown error')}")
print("==============================\n")