316 lines
13 KiB
Python
316 lines
13 KiB
Python
"""
|
|
Database Manager for Groovy-Zilean
|
|
Centralizes all database operations and provides a clean interface.
|
|
Makes future PostgreSQL migration much easier.
|
|
"""
|
|
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from typing import Optional, List, Tuple, Any
|
|
import config
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Manages database connections and operations"""
|
|
|
|
def __init__(self):
|
|
self.db_path = config.get_db_path()
|
|
|
|
@contextmanager
|
|
def get_connection(self):
|
|
"""
|
|
Context manager for database connections.
|
|
Automatically handles commit/rollback and closing.
|
|
|
|
Usage:
|
|
with db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(...)
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
finally:
|
|
conn.close()
|
|
|
|
def initialize_tables(self):
|
|
"""Create database tables if they don't exist"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Create servers table
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
|
|
server_id TEXT PRIMARY KEY,
|
|
is_playing INTEGER DEFAULT 0,
|
|
song_name TEXT,
|
|
song_url TEXT,
|
|
song_thumbnail TEXT,
|
|
loop_mode TEXT DEFAULT 'off',
|
|
volume INTEGER DEFAULT 100,
|
|
effect TEXT DEFAULT 'none',
|
|
song_start_time REAL DEFAULT 0,
|
|
song_duration INTEGER DEFAULT 0
|
|
);''')
|
|
|
|
# Set all to not playing on startup
|
|
cursor.execute("UPDATE servers SET is_playing = 0;")
|
|
|
|
# Migrations for existing databases - add columns if missing
|
|
migrations = [
|
|
("loop_mode", "TEXT DEFAULT 'off'"),
|
|
("volume", "INTEGER DEFAULT 100"),
|
|
("effect", "TEXT DEFAULT 'none'"),
|
|
("song_start_time", "REAL DEFAULT 0"),
|
|
("song_duration", "INTEGER DEFAULT 0"),
|
|
("song_thumbnail", "TEXT DEFAULT ''"),
|
|
("song_url", "TEXT DEFAULT ''")
|
|
]
|
|
|
|
for col_name, col_type in migrations:
|
|
try:
|
|
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
|
|
except sqlite3.OperationalError:
|
|
# Column already exists, skip
|
|
pass
|
|
|
|
# Create songs/queue table
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
|
|
server_id TEXT NOT NULL,
|
|
song_link TEXT,
|
|
queued_by TEXT,
|
|
position INTEGER NOT NULL,
|
|
title TEXT,
|
|
thumbnail TEXT,
|
|
duration INTEGER,
|
|
PRIMARY KEY (position),
|
|
FOREIGN KEY (server_id) REFERENCES servers(server_id)
|
|
);''')
|
|
|
|
# Clear all songs on startup
|
|
cursor.execute("DELETE FROM songs;")
|
|
|
|
# ===================================
|
|
# Server Operations
|
|
# ===================================
|
|
|
|
def ensure_server_exists(self, server_id: str) -> None:
|
|
"""Add server to database if it doesn't exist"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
|
|
if cursor.fetchone()[0] == 0:
|
|
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
|
|
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
|
|
|
|
def set_server_playing(self, server_id: str, playing: bool) -> None:
|
|
"""Update server playing status"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
val = 1 if playing else 0
|
|
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
|
|
|
|
def is_server_playing(self, server_id: str) -> bool:
|
|
"""Check if server is currently playing"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
|
|
res = cursor.fetchone()
|
|
return True if res and res[0] == 1 else False
|
|
|
|
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
|
|
"""Update currently playing song information"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
# Ensure duration is an integer
|
|
try:
|
|
duration = int(duration)
|
|
except:
|
|
duration = 0
|
|
|
|
cursor.execute(''' UPDATE servers
|
|
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
|
|
WHERE server_id = ?''',
|
|
(title, url, thumbnail, start_time, duration, server_id))
|
|
|
|
def get_current_song(self, server_id: str) -> dict:
|
|
"""Get current song info"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
|
|
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
|
|
|
|
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
|
|
"""Get playback progress (elapsed, duration, percentage)"""
|
|
import time
|
|
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if not result or result[2] == 0:
|
|
return 0, 0, 0.0
|
|
|
|
start_time, duration, _ = result
|
|
|
|
if duration is None or duration == 0:
|
|
return 0, 0, 0.0
|
|
|
|
elapsed = int(time.time() - start_time)
|
|
elapsed = min(elapsed, duration)
|
|
percentage = (elapsed / duration) * 100 if duration > 0 else 0
|
|
|
|
return elapsed, duration, percentage
|
|
|
|
# ===================================
|
|
# Queue Operations
|
|
# ===================================
|
|
|
|
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
|
|
"""Add song to queue, returns position"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
|
|
if position is None:
|
|
# Add to end
|
|
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
|
max_pos = cursor.fetchone()[0]
|
|
position = (max_pos + 1) if max_pos is not None else 0
|
|
else:
|
|
# Insert at specific position (shift others down)
|
|
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
|
|
(server_id, position))
|
|
|
|
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(server_id, song_link, queued_by, position, title, thumbnail, duration))
|
|
|
|
return position
|
|
|
|
def get_next_song(self, server_id: str) -> Optional[Tuple]:
|
|
"""Get the next song in queue (doesn't remove it)"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
|
|
return cursor.fetchone()
|
|
|
|
def remove_song(self, server_id: str, position: int) -> None:
|
|
"""Remove song at position from queue"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
|
|
|
|
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
|
|
"""Get songs in queue (returns max_position, list of songs)"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
|
|
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
|
|
(server_id, limit))
|
|
songs = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
|
max_pos = cursor.fetchone()[0]
|
|
max_pos = max_pos if max_pos is not None else -1
|
|
|
|
return max_pos, songs
|
|
|
|
def clear_queue(self, server_id: str) -> None:
|
|
"""Clear all songs from queue"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
|
|
|
def shuffle_queue(self, server_id: str) -> bool:
|
|
"""Shuffle the queue randomly, returns success"""
|
|
import random
|
|
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
|
|
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
|
|
(server_id,))
|
|
songs = cursor.fetchall()
|
|
|
|
if len(songs) <= 1:
|
|
return False
|
|
|
|
random.shuffle(songs)
|
|
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
|
|
|
for i, s in enumerate(songs):
|
|
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
|
|
|
|
return True
|
|
|
|
# ===================================
|
|
# Settings Operations
|
|
# ===================================
|
|
|
|
def get_loop_mode(self, server_id: str) -> str:
|
|
"""Get loop mode: 'off', 'song', or 'queue'"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
|
|
res = cursor.fetchone()
|
|
return res[0] if res else 'off'
|
|
|
|
def set_loop_mode(self, server_id: str, mode: str) -> None:
|
|
"""Set loop mode: 'off', 'song', or 'queue'"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
|
|
|
|
def get_volume(self, server_id: str) -> int:
|
|
"""Get volume (0-200)"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
|
|
res = cursor.fetchone()
|
|
return res[0] if res else 100
|
|
|
|
def set_volume(self, server_id: str, volume: int) -> int:
|
|
"""Set volume (0-200), returns the set volume"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
|
|
return volume
|
|
|
|
def get_effect(self, server_id: str) -> str:
|
|
"""Get current audio effect"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
|
|
res = cursor.fetchone()
|
|
return res[0] if res else 'none'
|
|
|
|
def set_effect(self, server_id: str, effect: str) -> None:
|
|
"""Set audio effect"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
self.ensure_server_exists(server_id)
|
|
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
|
|
|
|
|
|
# Global instance
|
|
db = DatabaseManager()
|