Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4ef4bdf309 | |||
| 7c6249b120 | |||
| 09fa7988f1 | |||
| b3d618b337 |
67
.env_example
Normal file
67
.env_example
Normal file
@@ -0,0 +1,67 @@
|
||||
# Groovy-Zilean Configuration
|
||||
# Copy this file to .env and fill in your values
|
||||
# NEVER commit .env to git!
|
||||
|
||||
# ===================================
|
||||
# Environment Selection
|
||||
# ===================================
|
||||
# Set to "dev" for development bot, "live" for production
|
||||
ENVIRONMENT=dev
|
||||
|
||||
# ===================================
|
||||
# Discord Bot Tokens
|
||||
# ===================================
|
||||
DISCORD_TOKEN_DEV=
|
||||
DISCORD_TOKEN_LIVE=
|
||||
|
||||
# Bot settings
|
||||
DISCORD_PREFIX=
|
||||
|
||||
# ===================================
|
||||
# Spotify Integration
|
||||
# ===================================
|
||||
SPOTIFY_CLIENT_ID=
|
||||
SPOTIFY_CLIENT_SECRET=
|
||||
|
||||
# ===================================
|
||||
# Database
|
||||
# ===================================
|
||||
# For now (SQLite)
|
||||
DB_PATH=./data/music.db
|
||||
|
||||
# For future (PostgreSQL)
|
||||
# DB_HOST=localhost
|
||||
# DB_PORT=5432
|
||||
# DB_NAME=groovy_zilean
|
||||
# DB_USER=groovy
|
||||
# DB_PASSWORD=your_db_password
|
||||
|
||||
# ===================================
|
||||
# Bot Status/Presence
|
||||
# ===================================
|
||||
# Types: playing, listening, watching, streaming, competing
|
||||
STATUS_TYPE=listening
|
||||
STATUS_TEXT==help | /help
|
||||
# STATUS_URL= # Only needed for streaming type
|
||||
|
||||
# ===================================
|
||||
# Color Scheme (hex colors)
|
||||
# ===================================
|
||||
COLOR_PRIMARY=#7289DA
|
||||
COLOR_SUCCESS=#43B581
|
||||
COLOR_ERROR=#F04747
|
||||
COLOR_WARNING=#FAA61A
|
||||
|
||||
# ===================================
|
||||
# Web Dashboard (Future)
|
||||
# ===================================
|
||||
# DISCORD_CLIENT_ID=your_oauth_client_id
|
||||
# DISCORD_CLIENT_SECRET=your_oauth_secret
|
||||
# DISCORD_REDIRECT_URI=http://localhost:8000/callback
|
||||
# WEB_SECRET_KEY=random_secret_for_sessions
|
||||
|
||||
# ===================================
|
||||
# Logging
|
||||
# ===================================
|
||||
LOG_LEVEL=INFO
|
||||
# Options: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -160,4 +160,4 @@ cython_debug/
|
||||
#.idea/
|
||||
|
||||
# My stuff
|
||||
data/
|
||||
data/*.db
|
||||
|
||||
201
SETUP.md
Normal file
201
SETUP.md
Normal file
@@ -0,0 +1,201 @@
|
||||
# Groovy-Zilean Setup Guide
|
||||
|
||||
Quick start guide for getting groovy-zilean running locally or in production.
|
||||
|
||||
---
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **Python 3.11+** (3.9 deprecated by yt-dlp)
|
||||
- **FFmpeg** installed on your system
|
||||
- Discord Bot Token ([Discord Developer Portal](https://discord.com/developers/applications))
|
||||
- Spotify API Credentials ([Spotify Developer Dashboard](https://developer.spotify.com/dashboard))
|
||||
|
||||
---
|
||||
|
||||
## 1. Clone & Setup Environment
|
||||
|
||||
```bash
|
||||
# Clone the repository
|
||||
git clone <your-repo-url>
|
||||
cd groovy-zilean
|
||||
|
||||
# Create virtual environment (keeps dependencies isolated)
|
||||
python3.12 -m venv venv
|
||||
|
||||
# Activate virtual environment
|
||||
source venv/bin/activate # Linux/Mac
|
||||
# OR
|
||||
venv\Scripts\activate # Windows
|
||||
|
||||
# Install dependencies
|
||||
pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. Configuration
|
||||
|
||||
### Create `.env` file
|
||||
|
||||
```bash
|
||||
# Copy the example file
|
||||
cp .env.example .env
|
||||
|
||||
# Edit with your favorite editor
|
||||
nano .env
|
||||
# OR
|
||||
vim .env
|
||||
# OR
|
||||
code .env # VS Code
|
||||
```
|
||||
|
||||
### Fill in Required Values
|
||||
|
||||
At minimum, you need:
|
||||
|
||||
```env
|
||||
# Choose environment: "dev" or "live"
|
||||
ENVIRONMENT=dev
|
||||
|
||||
# Discord bot tokens (get from Discord Developer Portal)
|
||||
DISCORD_TOKEN_DEV=your_dev_bot_token_here
|
||||
DISCORD_TOKEN_LIVE=your_live_bot_token_here
|
||||
|
||||
# Spotify credentials (get from Spotify Developer Dashboard)
|
||||
SPOTIFY_CLIENT_ID=your_spotify_client_id
|
||||
SPOTIFY_CLIENT_SECRET=your_spotify_secret
|
||||
```
|
||||
|
||||
**Optional but recommended:**
|
||||
```env
|
||||
# Bot settings
|
||||
DISCORD_PREFIX==
|
||||
STATUS_TYPE=listening
|
||||
STATUS_TEXT=Zilean's Theme
|
||||
|
||||
# Colors (hex format)
|
||||
COLOR_PRIMARY=#7289DA
|
||||
COLOR_SUCCESS=#43B581
|
||||
COLOR_ERROR=#F04747
|
||||
COLOR_WARNING=#FAA61A
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Create Data Directory
|
||||
|
||||
```bash
|
||||
# Create directory for database
|
||||
mkdir -p data
|
||||
|
||||
# The database file (music.db) will be created automatically on first run
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. Run the Bot
|
||||
|
||||
### Development Mode
|
||||
|
||||
```bash
|
||||
# Make sure .env has ENVIRONMENT=dev
|
||||
source venv/bin/activate # If not already activated
|
||||
python main.py
|
||||
```
|
||||
|
||||
### Production Mode
|
||||
|
||||
```bash
|
||||
# Change .env to ENVIRONMENT=live
|
||||
source venv/bin/activate
|
||||
python main.py
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. Switching Between Dev and Live
|
||||
|
||||
**Super easy!** Just change one line in `.env`:
|
||||
|
||||
```bash
|
||||
# For development bot
|
||||
ENVIRONMENT=dev
|
||||
|
||||
# For production bot
|
||||
ENVIRONMENT=live
|
||||
```
|
||||
|
||||
The bot will automatically use the correct token!
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### "Configuration Error: DISCORD_TOKEN_DEV not found"
|
||||
- Make sure you copied `.env.example` to `.env`
|
||||
- Check that `.env` has the token values filled in
|
||||
- Token should NOT have quotes around it
|
||||
|
||||
### "No module named 'dotenv'"
|
||||
```bash
|
||||
pip install python-dotenv
|
||||
```
|
||||
|
||||
### "FFmpeg not found"
|
||||
```bash
|
||||
# Debian/Ubuntu
|
||||
sudo apt install ffmpeg
|
||||
|
||||
# macOS
|
||||
brew install ffmpeg
|
||||
|
||||
# Arch Linux
|
||||
sudo pacman -S ffmpeg
|
||||
```
|
||||
|
||||
### Python version issues
|
||||
yt-dlp requires Python 3.10+. Check your version:
|
||||
```bash
|
||||
python --version
|
||||
```
|
||||
|
||||
If too old, install newer Python and recreate venv:
|
||||
```bash
|
||||
python3.12 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
groovy-zilean/
|
||||
├── main.py # Entry point (run this!)
|
||||
├── bot.py # Bot class definition
|
||||
├── config.py # Configuration management
|
||||
├── .env # YOUR secrets (never commit!)
|
||||
├── .env.example # Template (safe to commit)
|
||||
├── requirements.txt # Python dependencies
|
||||
├── cogs/
|
||||
│ └── music/ # Music functionality
|
||||
│ ├── main.py # Commands
|
||||
│ ├── queue.py # Queue management
|
||||
│ ├── util.py # Utilities
|
||||
│ └── translate.py # URL/search handling
|
||||
└── data/
|
||||
└── music.db # SQLite database (auto-created)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Next Steps
|
||||
|
||||
- Check out `PRODUCTION_ROADMAP.md` for the full development plan
|
||||
- See `README.md` for feature list and usage
|
||||
- Join your test server and try commands!
|
||||
|
||||
**Happy coding!** 🎵⏱️
|
||||
25
bot.py
25
bot.py
@@ -1,20 +1,30 @@
|
||||
"""
|
||||
Groovy-Zilean Bot Class
|
||||
Main bot implementation with cog loading and background tasks
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from discord.ext import commands
|
||||
from discord.ext import tasks
|
||||
from cogs.music.main import music
|
||||
from help import GroovyHelp # Import the new Help Cog
|
||||
from help import GroovyHelp
|
||||
|
||||
cogs = [
|
||||
# List of cogs to load on startup
|
||||
cogs: list[type[commands.Cog]] = [
|
||||
music,
|
||||
GroovyHelp
|
||||
]
|
||||
|
||||
|
||||
class Groovy(commands.Bot):
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Custom bot class with automatic cog loading and inactivity checking"""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# We force help_command to None because we are using a custom Cog for it
|
||||
# But we pass all other args (like command_prefix) to the parent
|
||||
super().__init__(*args, help_command=None, **kwargs)
|
||||
|
||||
async def on_ready(self):
|
||||
async def on_ready(self) -> None:
|
||||
import config # Imported here to avoid circular dependencies if any
|
||||
|
||||
# Set status
|
||||
@@ -47,11 +57,12 @@ class Groovy(commands.Bot):
|
||||
print(f"✅ {self.user} is ready and online!")
|
||||
|
||||
@tasks.loop(seconds=30)
|
||||
async def inactivity_checker(self):
|
||||
"""Check for inactive voice connections"""
|
||||
async def inactivity_checker(self) -> None:
|
||||
"""Check for inactive voice connections every 30 seconds"""
|
||||
from cogs.music import util
|
||||
await util.check_inactivity(self)
|
||||
|
||||
@inactivity_checker.before_loop
|
||||
async def before_inactivity_checker(self):
|
||||
async def before_inactivity_checker(self) -> None:
|
||||
"""Wait for bot to be ready before starting inactivity checker"""
|
||||
await self.wait_until_ready()
|
||||
|
||||
315
cogs/music/db_manager.py
Normal file
315
cogs/music/db_manager.py
Normal file
@@ -0,0 +1,315 @@
|
||||
"""
|
||||
Database Manager for Groovy-Zilean
|
||||
Centralizes all database operations and provides a clean interface.
|
||||
Makes future PostgreSQL migration much easier.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional, List, Tuple, Any
|
||||
import config
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""Manages database connections and operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.db_path = config.get_db_path()
|
||||
|
||||
@contextmanager
|
||||
def get_connection(self):
|
||||
"""
|
||||
Context manager for database connections.
|
||||
Automatically handles commit/rollback and closing.
|
||||
|
||||
Usage:
|
||||
with db.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(...)
|
||||
"""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
raise e
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def initialize_tables(self):
|
||||
"""Create database tables if they don't exist"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create servers table
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
|
||||
server_id TEXT PRIMARY KEY,
|
||||
is_playing INTEGER DEFAULT 0,
|
||||
song_name TEXT,
|
||||
song_url TEXT,
|
||||
song_thumbnail TEXT,
|
||||
loop_mode TEXT DEFAULT 'off',
|
||||
volume INTEGER DEFAULT 100,
|
||||
effect TEXT DEFAULT 'none',
|
||||
song_start_time REAL DEFAULT 0,
|
||||
song_duration INTEGER DEFAULT 0
|
||||
);''')
|
||||
|
||||
# Set all to not playing on startup
|
||||
cursor.execute("UPDATE servers SET is_playing = 0;")
|
||||
|
||||
# Migrations for existing databases - add columns if missing
|
||||
migrations = [
|
||||
("loop_mode", "TEXT DEFAULT 'off'"),
|
||||
("volume", "INTEGER DEFAULT 100"),
|
||||
("effect", "TEXT DEFAULT 'none'"),
|
||||
("song_start_time", "REAL DEFAULT 0"),
|
||||
("song_duration", "INTEGER DEFAULT 0"),
|
||||
("song_thumbnail", "TEXT DEFAULT ''"),
|
||||
("song_url", "TEXT DEFAULT ''")
|
||||
]
|
||||
|
||||
for col_name, col_type in migrations:
|
||||
try:
|
||||
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
|
||||
except sqlite3.OperationalError:
|
||||
# Column already exists, skip
|
||||
pass
|
||||
|
||||
# Create songs/queue table
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
|
||||
server_id TEXT NOT NULL,
|
||||
song_link TEXT,
|
||||
queued_by TEXT,
|
||||
position INTEGER NOT NULL,
|
||||
title TEXT,
|
||||
thumbnail TEXT,
|
||||
duration INTEGER,
|
||||
PRIMARY KEY (position),
|
||||
FOREIGN KEY (server_id) REFERENCES servers(server_id)
|
||||
);''')
|
||||
|
||||
# Clear all songs on startup
|
||||
cursor.execute("DELETE FROM songs;")
|
||||
|
||||
# ===================================
|
||||
# Server Operations
|
||||
# ===================================
|
||||
|
||||
def ensure_server_exists(self, server_id: str) -> None:
|
||||
"""Add server to database if it doesn't exist"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
|
||||
if cursor.fetchone()[0] == 0:
|
||||
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
|
||||
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
|
||||
|
||||
def set_server_playing(self, server_id: str, playing: bool) -> None:
|
||||
"""Update server playing status"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
val = 1 if playing else 0
|
||||
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
|
||||
|
||||
def is_server_playing(self, server_id: str) -> bool:
|
||||
"""Check if server is currently playing"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
return True if res and res[0] == 1 else False
|
||||
|
||||
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
|
||||
"""Update currently playing song information"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
# Ensure duration is an integer
|
||||
try:
|
||||
duration = int(duration)
|
||||
except:
|
||||
duration = 0
|
||||
|
||||
cursor.execute(''' UPDATE servers
|
||||
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
|
||||
WHERE server_id = ?''',
|
||||
(title, url, thumbnail, start_time, duration, server_id))
|
||||
|
||||
def get_current_song(self, server_id: str) -> dict:
|
||||
"""Get current song info"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result:
|
||||
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
|
||||
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
|
||||
|
||||
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
|
||||
"""Get playback progress (elapsed, duration, percentage)"""
|
||||
import time
|
||||
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if not result or result[2] == 0:
|
||||
return 0, 0, 0.0
|
||||
|
||||
start_time, duration, _ = result
|
||||
|
||||
if duration is None or duration == 0:
|
||||
return 0, 0, 0.0
|
||||
|
||||
elapsed = int(time.time() - start_time)
|
||||
elapsed = min(elapsed, duration)
|
||||
percentage = (elapsed / duration) * 100 if duration > 0 else 0
|
||||
|
||||
return elapsed, duration, percentage
|
||||
|
||||
# ===================================
|
||||
# Queue Operations
|
||||
# ===================================
|
||||
|
||||
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
|
||||
"""Add song to queue, returns position"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
|
||||
if position is None:
|
||||
# Add to end
|
||||
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
||||
max_pos = cursor.fetchone()[0]
|
||||
position = (max_pos + 1) if max_pos is not None else 0
|
||||
else:
|
||||
# Insert at specific position (shift others down)
|
||||
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
|
||||
(server_id, position))
|
||||
|
||||
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(server_id, song_link, queued_by, position, title, thumbnail, duration))
|
||||
|
||||
return position
|
||||
|
||||
def get_next_song(self, server_id: str) -> Optional[Tuple]:
|
||||
"""Get the next song in queue (doesn't remove it)"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
|
||||
return cursor.fetchone()
|
||||
|
||||
def remove_song(self, server_id: str, position: int) -> None:
|
||||
"""Remove song at position from queue"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
|
||||
|
||||
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
|
||||
"""Get songs in queue (returns max_position, list of songs)"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
|
||||
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
|
||||
(server_id, limit))
|
||||
songs = cursor.fetchall()
|
||||
|
||||
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
||||
max_pos = cursor.fetchone()[0]
|
||||
max_pos = max_pos if max_pos is not None else -1
|
||||
|
||||
return max_pos, songs
|
||||
|
||||
def clear_queue(self, server_id: str) -> None:
|
||||
"""Clear all songs from queue"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||
|
||||
def shuffle_queue(self, server_id: str) -> bool:
|
||||
"""Shuffle the queue randomly, returns success"""
|
||||
import random
|
||||
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
|
||||
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
|
||||
(server_id,))
|
||||
songs = cursor.fetchall()
|
||||
|
||||
if len(songs) <= 1:
|
||||
return False
|
||||
|
||||
random.shuffle(songs)
|
||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||
|
||||
for i, s in enumerate(songs):
|
||||
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
|
||||
|
||||
return True
|
||||
|
||||
# ===================================
|
||||
# Settings Operations
|
||||
# ===================================
|
||||
|
||||
def get_loop_mode(self, server_id: str) -> str:
|
||||
"""Get loop mode: 'off', 'song', or 'queue'"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
return res[0] if res else 'off'
|
||||
|
||||
def set_loop_mode(self, server_id: str, mode: str) -> None:
|
||||
"""Set loop mode: 'off', 'song', or 'queue'"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
|
||||
|
||||
def get_volume(self, server_id: str) -> int:
|
||||
"""Get volume (0-200)"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
return res[0] if res else 100
|
||||
|
||||
def set_volume(self, server_id: str, volume: int) -> int:
|
||||
"""Set volume (0-200), returns the set volume"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
|
||||
return volume
|
||||
|
||||
def get_effect(self, server_id: str) -> str:
|
||||
"""Get current audio effect"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
return res[0] if res else 'none'
|
||||
|
||||
def set_effect(self, server_id: str, effect: str) -> None:
|
||||
"""Set audio effect"""
|
||||
with self.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
self.ensure_server_exists(server_id)
|
||||
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
|
||||
|
||||
|
||||
# Global instance
|
||||
db = DatabaseManager()
|
||||
@@ -12,28 +12,7 @@ from cogs.music.help import music_help
|
||||
|
||||
import spotipy
|
||||
from spotipy.oauth2 import SpotifyClientCredentials
|
||||
|
||||
|
||||
# Fix this pls
|
||||
|
||||
import json
|
||||
#from .. import config
|
||||
# Read data from JSON file in ./data/config.json
|
||||
def read_data():
|
||||
with open("./data/config.json", "r") as file:
|
||||
return json.load(file)
|
||||
|
||||
raise Exception("Could not load config data")
|
||||
|
||||
|
||||
def get_spotify_creds():
|
||||
data = read_data()
|
||||
data = data.get("spotify")
|
||||
|
||||
SCID = data.get("SCID")
|
||||
secret = data.get("SECRET")
|
||||
|
||||
return SCID, secret
|
||||
import config # Use centralized config
|
||||
|
||||
|
||||
|
||||
@@ -51,10 +30,13 @@ class music(commands.Cog):
|
||||
help_command.cog = self
|
||||
self.help_command = help_command
|
||||
|
||||
SCID, secret = get_spotify_creds()
|
||||
# Get Spotify credentials from centralized config
|
||||
spotify_id, spotify_secret = config.get_spotify_creds()
|
||||
# Authentication - without user
|
||||
client_credentials_manager = SpotifyClientCredentials(client_id=SCID,
|
||||
client_secret=secret)
|
||||
client_credentials_manager = SpotifyClientCredentials(
|
||||
client_id=spotify_id,
|
||||
client_secret=spotify_secret
|
||||
)
|
||||
|
||||
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
||||
|
||||
@@ -346,7 +328,7 @@ class music(commands.Cog):
|
||||
app_commands.Choice(name="Song", value="song"),
|
||||
app_commands.Choice(name="Queue", value="queue")
|
||||
])
|
||||
async def loop(self, ctx: Context, mode: str = None):
|
||||
async def loop(self, ctx: Context, mode: str | None = None):
|
||||
"""Toggle between loop modes or set a specific mode"""
|
||||
server = ctx.guild
|
||||
|
||||
@@ -409,7 +391,7 @@ class music(commands.Cog):
|
||||
description="Set playback volume",
|
||||
aliases=['vol', 'v'])
|
||||
@app_commands.describe(level="Volume level (0-200%, default shows current)")
|
||||
async def volume(self, ctx: Context, level: int = None):
|
||||
async def volume(self, ctx: Context, level: int | None = None):
|
||||
"""Set or display the current volume"""
|
||||
server = ctx.guild
|
||||
|
||||
@@ -453,7 +435,7 @@ class music(commands.Cog):
|
||||
description="Apply audio effects to playback",
|
||||
aliases=['fx', 'filter'])
|
||||
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
|
||||
async def effect(self, ctx: Context, effect_name: str = None):
|
||||
async def effect(self, ctx: Context, effect_name: str | None = None):
|
||||
"""Apply or list audio effects"""
|
||||
server = ctx.guild
|
||||
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
from http import server
|
||||
import sqlite3
|
||||
import random
|
||||
import time
|
||||
"""
|
||||
Queue management for Groovy-Zilean music bot
|
||||
Now using centralized database manager for cleaner code
|
||||
"""
|
||||
|
||||
import discord
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from .translate import search_song
|
||||
|
||||
db_path = "./data/music.db"
|
||||
from .db_manager import db
|
||||
|
||||
# Base FFmpeg options (will be modified by effects)
|
||||
BASE_FFMPEG_OPTS = {
|
||||
@@ -103,342 +103,224 @@ def get_effect_options(effect_name):
|
||||
return effects.get(effect_name, effects['none'])
|
||||
|
||||
|
||||
# Creates the tables if they don't exist
|
||||
# ===================================
|
||||
# Initialization
|
||||
# ===================================
|
||||
|
||||
def initialize_tables():
|
||||
# Connect to the database
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
"""Initialize database tables"""
|
||||
db.initialize_tables()
|
||||
|
||||
# Create servers table if it doesn't exist
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
|
||||
server_id TEXT PRIMARY KEY,
|
||||
is_playing INTEGER DEFAULT 0,
|
||||
song_name TEXT,
|
||||
song_url TEXT,
|
||||
song_thumbnail TEXT,
|
||||
loop_mode TEXT DEFAULT 'off',
|
||||
volume INTEGER DEFAULT 100,
|
||||
effect TEXT DEFAULT 'none',
|
||||
song_start_time REAL DEFAULT 0,
|
||||
song_duration INTEGER DEFAULT 0
|
||||
);''')
|
||||
|
||||
# Set all to not playing
|
||||
cursor.execute("UPDATE servers SET is_playing = 0;")
|
||||
# ===================================
|
||||
# Queue Management
|
||||
# ===================================
|
||||
|
||||
# Add new columns if they don't exist (for existing databases)
|
||||
# Migrations for existing databases
|
||||
columns = [
|
||||
("loop_mode", "TEXT DEFAULT 'off'"),
|
||||
("volume", "INTEGER DEFAULT 100"),
|
||||
("effect", "TEXT DEFAULT 'none'"),
|
||||
("song_start_time", "REAL DEFAULT 0"),
|
||||
("song_duration", "INTEGER DEFAULT 0"),
|
||||
("song_thumbnail", "TEXT DEFAULT ''"),
|
||||
("song_url", "TEXT DEFAULT ''") # NEW
|
||||
]
|
||||
async def add_song(server_id, details, queued_by, position=None):
|
||||
"""
|
||||
Add a song to the queue
|
||||
|
||||
for col_name, col_type in columns:
|
||||
try:
|
||||
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
|
||||
server_id TEXT NOT NULL,
|
||||
song_link TEXT,
|
||||
queued_by TEXT,
|
||||
position INTEGER NOT NULL,
|
||||
title TEXT,
|
||||
thumbnail TEXT,
|
||||
duration INTEGER,
|
||||
PRIMARY KEY (position),
|
||||
FOREIGN KEY (server_id) REFERENCES servers(server_id)
|
||||
);''')
|
||||
|
||||
cursor.execute("DELETE FROM songs;")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Queue a song in the db
|
||||
async def add_song(server_id, details, queued_by):
|
||||
# Connect to db
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
await add_server(server_id, cursor, conn)
|
||||
|
||||
max_order_num = await get_max(server_id, cursor) + 1
|
||||
Args:
|
||||
server_id: Discord server ID
|
||||
details: Dictionary with song info (url, title, thumbnail, duration) or string
|
||||
queued_by: Username who queued the song
|
||||
position: Optional position in queue (None = end of queue)
|
||||
|
||||
Returns:
|
||||
Position in queue
|
||||
"""
|
||||
if isinstance(details, str):
|
||||
# Fallback for raw strings
|
||||
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(server_id, "Not grabbed", queued_by, max_order_num, details, "Unknown", 0))
|
||||
# Fallback for raw strings (legacy support)
|
||||
pos = db.add_song(
|
||||
server_id=str(server_id),
|
||||
song_link="Not grabbed",
|
||||
queued_by=queued_by,
|
||||
title=details,
|
||||
thumbnail="Unknown",
|
||||
duration=0,
|
||||
position=position
|
||||
)
|
||||
else:
|
||||
# Save exact duration and thumbnail from the start
|
||||
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(server_id, details['url'], queued_by, max_order_num, details['title'], details['thumbnail'], details['duration']))
|
||||
# Standard dictionary format
|
||||
pos = db.add_song(
|
||||
server_id=str(server_id),
|
||||
song_link=details['url'],
|
||||
queued_by=queued_by,
|
||||
title=details['title'],
|
||||
thumbnail=details.get('thumbnail', ''),
|
||||
duration=details.get('duration', 0),
|
||||
position=position
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return max_order_num
|
||||
return pos
|
||||
|
||||
|
||||
# Pop song from server (respects loop mode)
|
||||
async def pop(server_id, ignore=False, skip_mode=False):
|
||||
"""
|
||||
Pop next song from queue
|
||||
ignore: Skip the song without returning URL
|
||||
skip_mode: True when called from skip command (affects loop song behavior)
|
||||
|
||||
Args:
|
||||
server_id: Discord server ID
|
||||
ignore: Skip the song without returning URL
|
||||
skip_mode: True when called from skip command (affects loop song behavior)
|
||||
|
||||
Returns:
|
||||
Song URL or None
|
||||
"""
|
||||
# Connect to db
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# JUST INCASE!
|
||||
await add_server(server_id, cursor, conn)
|
||||
|
||||
# Fetch info: link(1), title(4), thumbnail(5), duration(6)
|
||||
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
|
||||
result = cursor.fetchone()
|
||||
conn.commit()
|
||||
conn.close()
|
||||
result = db.get_next_song(str(server_id))
|
||||
|
||||
if result is None:
|
||||
return None
|
||||
elif ignore:
|
||||
await mark_song_as_finished(server_id, result[3])
|
||||
return None
|
||||
elif result[1] == "Not grabbed":
|
||||
# Lazy load logic
|
||||
song_list = await search_song(result[4])
|
||||
if not song_list:
|
||||
return None
|
||||
song = song_list[0]
|
||||
|
||||
await set_current_song(server_id, song['title'], song.get('thumbnail', ''), song.get('duration', 0))
|
||||
# result format: (server_id, song_link, queued_by, position, title, thumbnail, duration)
|
||||
server_id_str, song_link, queued_by, position, title, thumbnail, duration = result
|
||||
|
||||
if ignore:
|
||||
db.remove_song(str(server_id), position)
|
||||
return None
|
||||
|
||||
# Handle lazy-loaded songs (not yet fetched from YouTube)
|
||||
if song_link == "Not grabbed":
|
||||
song_list = await search_song(title)
|
||||
if not song_list:
|
||||
db.remove_song(str(server_id), position)
|
||||
return None
|
||||
|
||||
song = song_list[0]
|
||||
await set_current_song(
|
||||
server_id,
|
||||
song['title'],
|
||||
song['url'],
|
||||
song.get('thumbnail', ''),
|
||||
song.get('duration', 0)
|
||||
)
|
||||
|
||||
# Check loop mode before removing
|
||||
loop_mode = await get_loop_mode(server_id)
|
||||
if loop_mode != 'song': # Only remove if not looping song
|
||||
await mark_song_as_finished(server_id, result[3])
|
||||
if loop_mode != 'song':
|
||||
db.remove_song(str(server_id), position)
|
||||
|
||||
return song['url']
|
||||
|
||||
# Pre-grabbed logic (Standard)
|
||||
# result[1] is url, result[5] is thumbnail, result[6] is duration
|
||||
await set_current_song(server_id, result[4], result[1], result[5], result[6])
|
||||
# Standard pre-fetched song
|
||||
await set_current_song(server_id, title, song_link, thumbnail, duration)
|
||||
|
||||
# Check loop mode before removing
|
||||
loop_mode = await get_loop_mode(server_id)
|
||||
if loop_mode != 'song': # Only remove if not looping song
|
||||
await mark_song_as_finished(server_id, result[3])
|
||||
if loop_mode != 'song':
|
||||
db.remove_song(str(server_id), position)
|
||||
|
||||
return result[1]
|
||||
return song_link
|
||||
|
||||
# Add server to db if first time queuing
|
||||
async def add_server(server_id, cursor, conn):
|
||||
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
|
||||
if cursor.fetchone()[0] == 0:
|
||||
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
|
||||
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
# set song as played and update indexes
|
||||
async def mark_song_as_finished(server_id, order_num):
|
||||
# Connect to the database
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Update the song as finished
|
||||
cursor.execute('''DELETE FROM songs
|
||||
WHERE server_id = ? AND position = ?''',
|
||||
(server_id, order_num))
|
||||
|
||||
# Close connection
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
# set the current playing song of the server
|
||||
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
start_time = time.time()
|
||||
|
||||
# Ensure duration is an integer
|
||||
try:
|
||||
duration = int(duration)
|
||||
except:
|
||||
duration = 0
|
||||
|
||||
cursor.execute(''' UPDATE servers
|
||||
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
|
||||
WHERE server_id = ?''',
|
||||
(title, url, thumbnail, start_time, duration, server_id))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Returns dictionary with title and thumbnail
|
||||
async def get_current_song(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||
result = cursor.fetchone()
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
if result:
|
||||
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
|
||||
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
|
||||
|
||||
async def get_current_progress(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
|
||||
result = cursor.fetchone()
|
||||
conn.close() # Close quickly
|
||||
|
||||
if not result or result[2] == 0:
|
||||
return 0, 0, 0.0
|
||||
|
||||
start_time, duration, _ = result
|
||||
|
||||
if duration is None or duration == 0:
|
||||
return 0, 0, 0.0
|
||||
|
||||
elapsed = int(time.time() - start_time)
|
||||
elapsed = min(elapsed, duration)
|
||||
percentage = (elapsed / duration) * 100 if duration > 0 else 0
|
||||
|
||||
return elapsed, duration, percentage
|
||||
|
||||
async def get_max(server_id, cursor):
|
||||
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result[0] is not None else -1
|
||||
|
||||
async def update_server(server_id, playing):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
val = 1 if playing else 0
|
||||
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
async def is_server_playing(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
conn.close()
|
||||
return True if res[0] == 1 else False
|
||||
|
||||
async def clear(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
await update_server(server_id, False)
|
||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
async def grab_songs(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT 10", (server_id,))
|
||||
songs = cursor.fetchall()
|
||||
max_pos = await get_max(server_id, cursor)
|
||||
conn.close()
|
||||
return max_pos, songs
|
||||
"""
|
||||
Get current queue
|
||||
|
||||
# --- Effects/Loop/Shuffle/Volume (Simplified Paste) ---
|
||||
async def get_loop_mode(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
conn.close()
|
||||
return res[0] if res else 'off'
|
||||
Returns:
|
||||
Tuple of (max_position, list_of_songs)
|
||||
"""
|
||||
return db.get_queue(str(server_id), limit=10)
|
||||
|
||||
async def set_loop_mode(server_id, mode):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
async def get_volume(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
conn.close()
|
||||
return res[0] if res else 100
|
||||
async def clear(server_id):
|
||||
"""Clear the queue for a server"""
|
||||
db.clear_queue(str(server_id))
|
||||
await update_server(server_id, False)
|
||||
|
||||
async def set_volume(server_id, vol):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (vol, server_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return vol
|
||||
|
||||
async def shuffle_queue(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position", (server_id,))
|
||||
songs = cursor.fetchall()
|
||||
if len(songs) <= 1:
|
||||
conn.close()
|
||||
return False
|
||||
random.shuffle(songs)
|
||||
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
|
||||
for i, s in enumerate(songs):
|
||||
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)", (server_id, s[1], s[2], i, s[3], s[4], s[5]))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return True
|
||||
"""Shuffle the queue randomly"""
|
||||
return db.shuffle_queue(str(server_id))
|
||||
|
||||
|
||||
# ===================================
|
||||
# Server State Management
|
||||
# ===================================
|
||||
|
||||
async def update_server(server_id, playing):
|
||||
"""Update server playing status"""
|
||||
db.set_server_playing(str(server_id), playing)
|
||||
|
||||
|
||||
async def is_server_playing(server_id):
|
||||
"""Check if server is currently playing"""
|
||||
return db.is_server_playing(str(server_id))
|
||||
|
||||
|
||||
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
|
||||
"""Set the currently playing song"""
|
||||
db.set_current_song(
|
||||
str(server_id),
|
||||
title,
|
||||
url,
|
||||
thumbnail,
|
||||
duration,
|
||||
time.time() # start_time
|
||||
)
|
||||
|
||||
|
||||
async def get_current_song(server_id):
|
||||
"""Get current song info"""
|
||||
return db.get_current_song(str(server_id))
|
||||
|
||||
|
||||
async def get_current_progress(server_id):
|
||||
"""Get playback progress (elapsed, duration, percentage)"""
|
||||
return db.get_current_progress(str(server_id))
|
||||
|
||||
|
||||
# ===================================
|
||||
# Settings Management
|
||||
# ===================================
|
||||
|
||||
async def get_loop_mode(server_id):
|
||||
"""Get loop mode: 'off', 'song', or 'queue'"""
|
||||
return db.get_loop_mode(str(server_id))
|
||||
|
||||
|
||||
async def set_loop_mode(server_id, mode):
|
||||
"""Set loop mode: 'off', 'song', or 'queue'"""
|
||||
db.set_loop_mode(str(server_id), mode)
|
||||
|
||||
|
||||
async def get_volume(server_id):
|
||||
"""Get volume (0-200)"""
|
||||
return db.get_volume(str(server_id))
|
||||
|
||||
|
||||
async def set_volume(server_id, vol):
|
||||
"""Set volume (0-200)"""
|
||||
return db.set_volume(str(server_id), vol)
|
||||
|
||||
|
||||
async def get_effect(server_id):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
|
||||
res = cursor.fetchone()
|
||||
conn.close()
|
||||
return res[0] if res else 'none'
|
||||
"""Get current audio effect"""
|
||||
return db.get_effect(str(server_id))
|
||||
|
||||
|
||||
async def set_effect(server_id, fx):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
await add_server(server_id, cursor, conn)
|
||||
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (fx, server_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
"""Set audio effect"""
|
||||
db.set_effect(str(server_id), fx)
|
||||
|
||||
|
||||
# ===================================
|
||||
# Effect Metadata
|
||||
# ===================================
|
||||
|
||||
def list_all_effects():
|
||||
"""List all available audio effects"""
|
||||
return [
|
||||
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
|
||||
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
|
||||
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
|
||||
]
|
||||
|
||||
|
||||
def get_effect_emoji(effect_name):
|
||||
# Short list of emoji mappings
|
||||
"""Get emoji for effect"""
|
||||
emojis = {
|
||||
'none': '✨', # Changed to generic Sparkles
|
||||
'none': '✨',
|
||||
'bassboost': '💥',
|
||||
'nightcore': '⚡',
|
||||
'slowed': '🐢',
|
||||
@@ -459,7 +341,9 @@ def get_effect_emoji(effect_name):
|
||||
}
|
||||
return emojis.get(effect_name, '✨')
|
||||
|
||||
|
||||
def get_effect_description(effect_name):
|
||||
"""Get description for effect"""
|
||||
descriptions = {
|
||||
'none': 'Normal audio',
|
||||
'bassboost': 'MAXIMUM BASS 🔊',
|
||||
@@ -482,43 +366,60 @@ def get_effect_description(effect_name):
|
||||
}
|
||||
return descriptions.get(effect_name, 'Unknown effect')
|
||||
|
||||
|
||||
# ===================================
|
||||
# Playback
|
||||
# ===================================
|
||||
|
||||
async def play(ctx):
|
||||
"""
|
||||
Main playback loop - plays songs from queue
|
||||
"""
|
||||
server_id = ctx.guild.id
|
||||
voice_client = ctx.voice_client
|
||||
|
||||
if voice_client is None:
|
||||
await update_server(server_id, False)
|
||||
return
|
||||
|
||||
# Wait for current song to finish
|
||||
while voice_client.is_playing():
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Get next song
|
||||
url = await pop(server_id)
|
||||
if url is None:
|
||||
await update_server(server_id, False)
|
||||
return
|
||||
|
||||
try:
|
||||
# Scale volume down to prevent earrape
|
||||
# User sees 0-200%, but internally we scale by 0.25
|
||||
# So user's 100% = 0.25 actual volume (25%)
|
||||
vol = await get_volume(server_id) / 100.0 * 0.25
|
||||
# Get volume and effect settings
|
||||
vol = await get_volume(server_id) / 100.0 * 0.25 # Scale down by 0.25
|
||||
fx = await get_effect(server_id)
|
||||
opts = get_effect_options(fx)
|
||||
|
||||
# Create audio source
|
||||
src = discord.FFmpegPCMAudio(url, **opts)
|
||||
src = discord.PCMVolumeTransformer(src, volume=vol)
|
||||
|
||||
# After callback - play next song
|
||||
def after(e):
|
||||
if e: print(e)
|
||||
if voice_client and not voice_client.is_connected(): return
|
||||
if e:
|
||||
print(f"Playback error: {e}")
|
||||
if voice_client and not voice_client.is_connected():
|
||||
return
|
||||
|
||||
# Schedule next song
|
||||
coro = play(ctx)
|
||||
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
|
||||
try: fut.result()
|
||||
except: pass
|
||||
try:
|
||||
fut.result()
|
||||
except Exception as ex:
|
||||
print(f"Error in after callback: {ex}")
|
||||
|
||||
voice_client.play(src, after=after)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Play error: {e}")
|
||||
# Try next song on error
|
||||
await play(ctx)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
# Handles translating urls and search terms
|
||||
"""
|
||||
URL and search query handling for Groovy-Zilean
|
||||
Translates YouTube, Spotify, SoundCloud URLs and search queries into playable audio
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import yt_dlp as ytdlp
|
||||
import spotipy
|
||||
|
||||
@@ -22,7 +26,7 @@ ydl_opts = {
|
||||
},
|
||||
}
|
||||
|
||||
async def main(url, sp):
|
||||
async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]:
|
||||
|
||||
#url = url.lower()
|
||||
|
||||
@@ -60,7 +64,7 @@ async def main(url, sp):
|
||||
return []
|
||||
|
||||
|
||||
async def search_song(search):
|
||||
async def search_song(search: str) -> list[dict[str, Any]]:
|
||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||
try:
|
||||
info = ydl.extract_info(f"ytsearch1:{search}", download=False)
|
||||
@@ -89,7 +93,7 @@ async def search_song(search):
|
||||
return [data]
|
||||
|
||||
|
||||
async def spotify_song(url, sp):
|
||||
async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]:
|
||||
track = sp.track(url.split("/")[-1].split("?")[0])
|
||||
search = ""
|
||||
|
||||
@@ -106,7 +110,11 @@ async def spotify_song(url, sp):
|
||||
return await search_song(query)
|
||||
|
||||
|
||||
async def spotify_playlist(url, sp):
|
||||
async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str, Any]]:
|
||||
"""
|
||||
Get songs from a Spotify playlist
|
||||
Returns a mixed list where first item is dict, rest are search strings
|
||||
"""
|
||||
# Get the playlist uri code
|
||||
code = url.split("/")[-1].split("?")[0]
|
||||
|
||||
@@ -116,41 +124,35 @@ async def spotify_playlist(url, sp):
|
||||
except spotipy.exceptions.SpotifyException:
|
||||
return []
|
||||
|
||||
# Go through the tracks
|
||||
songs = []
|
||||
# Go through the tracks and build search queries
|
||||
songs: list[str | dict[str, Any]] = [] # Explicit type for mypy
|
||||
for track in results:
|
||||
search = ""
|
||||
|
||||
# Fetch all artists
|
||||
for artist in track['track']['artists']:
|
||||
|
||||
# Add all artists to search
|
||||
search += f"{artist['name']}, "
|
||||
|
||||
# Remove last column
|
||||
# Remove last comma
|
||||
search = search[:-2]
|
||||
search += f" - {track['track']['name']}"
|
||||
songs.append(search)
|
||||
|
||||
#searched_result = search_song(search)
|
||||
#if searched_result == []:
|
||||
#continue
|
||||
|
||||
#songs.append(searched_result[0])
|
||||
|
||||
# Fetch first song's full data
|
||||
while True:
|
||||
search_result = await search_song(songs[0])
|
||||
search_result = await search_song(songs[0]) # type: ignore
|
||||
if search_result == []:
|
||||
songs.pop(0)
|
||||
continue
|
||||
else:
|
||||
songs[0] = search_result[0]
|
||||
songs[0] = search_result[0] # Replace string with dict
|
||||
break
|
||||
|
||||
return songs
|
||||
|
||||
|
||||
async def song_download(url):
|
||||
async def song_download(url: str) -> list[dict[str, Any]]:
|
||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||
try:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
@@ -180,7 +182,7 @@ async def song_download(url):
|
||||
return [data]
|
||||
|
||||
|
||||
async def playlist_download(url):
|
||||
async def playlist_download(url: str) -> list[dict[str, Any]]:
|
||||
with ytdlp.YoutubeDL(ydl_opts) as ydl:
|
||||
try:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
"""
|
||||
Utility functions for Groovy-Zilean music bot
|
||||
Handles voice channel operations, queue display, and inactivity tracking
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import discord
|
||||
from discord.ext.commands.context import Context
|
||||
from discord.ext.commands.converter import CommandError
|
||||
@@ -7,15 +13,29 @@ from . import queue
|
||||
import asyncio
|
||||
|
||||
# Track last activity time for each server
|
||||
last_activity = {}
|
||||
last_activity: dict[int, float] = {}
|
||||
|
||||
# Joining/moving to the user's vc in a guild
|
||||
async def join_vc(ctx: Context):
|
||||
|
||||
# ===================================
|
||||
# Voice Channel Management
|
||||
# ===================================
|
||||
|
||||
async def join_vc(ctx: Context) -> discord.VoiceClient:
|
||||
"""
|
||||
Join or move to the user's voice channel
|
||||
|
||||
Args:
|
||||
ctx: Command context
|
||||
|
||||
Returns:
|
||||
The voice client connection
|
||||
|
||||
Raises:
|
||||
CommandError: If user is not in a voice channel
|
||||
"""
|
||||
# Get the user's vc
|
||||
author_voice = getattr(ctx.author, "voice")
|
||||
if author_voice is None:
|
||||
# Raise exception if user is not in vc
|
||||
raise CommandError("User is not in voice channel")
|
||||
|
||||
# Get user's vc
|
||||
@@ -25,19 +45,26 @@ async def join_vc(ctx: Context):
|
||||
|
||||
# Join or move to the user's vc
|
||||
if ctx.voice_client is None:
|
||||
vc = await vc.connect()
|
||||
vc_client = await vc.connect()
|
||||
else:
|
||||
# Safe to ignore type error for now
|
||||
vc = await ctx.voice_client.move_to(vc)
|
||||
vc_client = await ctx.voice_client.move_to(vc)
|
||||
|
||||
# Update last activity
|
||||
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
|
||||
|
||||
return vc
|
||||
return vc_client
|
||||
|
||||
|
||||
# Leaving the voice channel of a user
|
||||
async def leave_vc(ctx: Context):
|
||||
async def leave_vc(ctx: Context) -> None:
|
||||
"""
|
||||
Leave the voice channel and clean up
|
||||
|
||||
Args:
|
||||
ctx: Command context
|
||||
|
||||
Raises:
|
||||
CommandError: If bot is not in VC or user is not in same VC
|
||||
"""
|
||||
# If the bot is not in a vc of this server
|
||||
if ctx.voice_client is None:
|
||||
raise CommandError("I am not in a voice channel")
|
||||
@@ -73,9 +100,18 @@ async def leave_vc(ctx: Context):
|
||||
del last_activity[ctx.guild.id]
|
||||
|
||||
|
||||
# Auto-disconnect if inactive
|
||||
async def check_inactivity(bot):
|
||||
"""Background task to check for inactive voice connections"""
|
||||
# ===================================
|
||||
# Inactivity Management
|
||||
# ===================================
|
||||
|
||||
async def check_inactivity(bot: discord.Client) -> None:
|
||||
"""
|
||||
Background task to check for inactive voice connections
|
||||
Auto-disconnects after 5 minutes of inactivity
|
||||
|
||||
Args:
|
||||
bot: The Discord bot instance
|
||||
"""
|
||||
try:
|
||||
current_time = asyncio.get_event_loop().time()
|
||||
|
||||
@@ -98,20 +134,34 @@ async def check_inactivity(bot):
|
||||
print(f"Error in inactivity checker: {e}")
|
||||
|
||||
|
||||
# Update activity timestamp when playing
|
||||
def update_activity(guild_id):
|
||||
"""Call this when a song starts playing"""
|
||||
def update_activity(guild_id: int) -> None:
|
||||
"""
|
||||
Update activity timestamp when a song starts playing
|
||||
|
||||
Args:
|
||||
guild_id: Discord guild/server ID
|
||||
"""
|
||||
last_activity[guild_id] = asyncio.get_event_loop().time()
|
||||
|
||||
|
||||
# Interactive buttons for queue control
|
||||
# ===================================
|
||||
# Queue Display & Controls
|
||||
# ===================================
|
||||
|
||||
class QueueControls(View):
|
||||
def __init__(self, ctx):
|
||||
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
|
||||
"""Interactive buttons for queue control"""
|
||||
|
||||
def __init__(self, ctx: Context) -> None:
|
||||
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
|
||||
self.ctx = ctx
|
||||
|
||||
async def refresh_message(self, interaction: discord.Interaction):
|
||||
"""Helper to regenerate the embed and edit the message"""
|
||||
async def refresh_message(self, interaction: discord.Interaction) -> None:
|
||||
"""
|
||||
Helper to regenerate the embed and edit the message
|
||||
|
||||
Args:
|
||||
interaction: Discord interaction from button press
|
||||
"""
|
||||
try:
|
||||
# Generate new embed
|
||||
embed, view = await generate_queue_ui(self.ctx)
|
||||
@@ -119,10 +169,13 @@ class QueueControls(View):
|
||||
except Exception as e:
|
||||
# Fallback if edit fails
|
||||
if not interaction.response.is_done():
|
||||
await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True)
|
||||
await interaction.response.send_message(
|
||||
"Refreshed, but something went wrong updating the display.",
|
||||
ephemeral=True
|
||||
)
|
||||
|
||||
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
|
||||
async def skip_button(self, interaction: discord.Interaction, button: Button):
|
||||
async def skip_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||
if interaction.user not in self.ctx.voice_client.channel.members:
|
||||
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
|
||||
return
|
||||
@@ -130,11 +183,6 @@ class QueueControls(View):
|
||||
# Loop logic check
|
||||
loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
||||
|
||||
# Logic mimics the command
|
||||
if loop_mode == 'song':
|
||||
# Just restart current song effectively but here we assume standard skip behavior for button
|
||||
pass
|
||||
|
||||
# Perform the skip
|
||||
await queue.pop(self.ctx.guild.id, True, skip_mode=True)
|
||||
if self.ctx.voice_client:
|
||||
@@ -144,35 +192,45 @@ class QueueControls(View):
|
||||
await self.refresh_message(interaction)
|
||||
|
||||
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
|
||||
async def shuffle_button(self, interaction: discord.Interaction, button: Button):
|
||||
async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||
await queue.shuffle_queue(self.ctx.guild.id)
|
||||
await self.refresh_message(interaction)
|
||||
|
||||
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
|
||||
async def loop_button(self, interaction: discord.Interaction, button: Button):
|
||||
async def loop_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||
current_mode = await queue.get_loop_mode(self.ctx.guild.id)
|
||||
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
|
||||
await queue.set_loop_mode(self.ctx.guild.id, new_mode)
|
||||
await self.refresh_message(interaction)
|
||||
|
||||
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
|
||||
async def clear_button(self, interaction: discord.Interaction, button: Button):
|
||||
async def clear_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||
await queue.clear(self.ctx.guild.id)
|
||||
if self.ctx.voice_client and self.ctx.voice_client.is_playing():
|
||||
self.ctx.voice_client.stop()
|
||||
await self.refresh_message(interaction)
|
||||
|
||||
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
|
||||
async def refresh_button(self, interaction: discord.Interaction, button: Button):
|
||||
async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None:
|
||||
await self.refresh_message(interaction)
|
||||
|
||||
async def generate_queue_ui(ctx: Context):
|
||||
|
||||
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
|
||||
"""
|
||||
Generate the queue embed and controls
|
||||
|
||||
Args:
|
||||
ctx: Command context
|
||||
|
||||
Returns:
|
||||
Tuple of (embed, view) for displaying queue
|
||||
"""
|
||||
guild_id = ctx.guild.id
|
||||
server = ctx.guild
|
||||
|
||||
# Fetch all data
|
||||
n, songs = await queue.grab_songs(guild_id)
|
||||
current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url
|
||||
current = await queue.get_current_song(guild_id)
|
||||
loop_mode = await queue.get_loop_mode(guild_id)
|
||||
volume = await queue.get_volume(guild_id)
|
||||
effect = await queue.get_effect(guild_id)
|
||||
@@ -183,10 +241,10 @@ async def generate_queue_ui(ctx: Context):
|
||||
|
||||
# Map loop mode to nicer text
|
||||
loop_map = {
|
||||
'off': {'emoji': '⏹️', 'text': 'Off'},
|
||||
'song': {'emoji': '🔂', 'text': 'Song'},
|
||||
'queue': {'emoji': '🔁', 'text': 'Queue'}
|
||||
}
|
||||
'off': {'emoji': '⏹️', 'text': 'Off'},
|
||||
'song': {'emoji': '🔂', 'text': 'Song'},
|
||||
'queue': {'emoji': '🔁', 'text': 'Queue'}
|
||||
}
|
||||
loop_info = loop_map.get(loop_mode, loop_map['off'])
|
||||
loop_emoji = loop_info['emoji']
|
||||
loop_text = loop_info['text']
|
||||
@@ -197,11 +255,9 @@ async def generate_queue_ui(ctx: Context):
|
||||
|
||||
# Progress Bar Logic
|
||||
progress_bar = ""
|
||||
# Only show bar if duration > 0 (prevents weird 00:00 bars)
|
||||
if duration > 0:
|
||||
bar_length = 16
|
||||
filled = int((percentage / 100) * bar_length)
|
||||
# Ensure filled isn't bigger than length
|
||||
filled = min(filled, bar_length)
|
||||
bar_str = '▬' * filled + '🔘' + '▬' * (bar_length - filled)
|
||||
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
|
||||
@@ -215,14 +271,11 @@ async def generate_queue_ui(ctx: Context):
|
||||
description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
|
||||
else:
|
||||
# Create Hyperlink [Title](URL)
|
||||
# If no URL exists, link to Discord homepage as fallback or just bold
|
||||
if url and url.startswith("http"):
|
||||
song_link = f"[{title}]({url})"
|
||||
else:
|
||||
song_link = f"**{title}**"
|
||||
|
||||
# CLEARER STATUS LINE:
|
||||
# Loop: Mode | Effect: Name | Vol: %
|
||||
description = (
|
||||
f"## 💿 Now Playing\n"
|
||||
f"### {song_link}\n"
|
||||
@@ -241,7 +294,7 @@ async def generate_queue_ui(ctx: Context):
|
||||
|
||||
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
|
||||
|
||||
remaining = (n) - 9 # Approx calculation based on your grabbing logic
|
||||
remaining = n - 9
|
||||
if remaining > 0:
|
||||
embed.set_footer(text=f"Waitlist: {remaining} more songs...")
|
||||
else:
|
||||
@@ -251,23 +304,38 @@ async def generate_queue_ui(ctx: Context):
|
||||
if thumb and isinstance(thumb, str) and thumb.startswith("http"):
|
||||
embed.set_thumbnail(url=thumb)
|
||||
elif server.icon:
|
||||
# Fallback to server icon
|
||||
embed.set_thumbnail(url=server.icon.url)
|
||||
|
||||
view = QueueControls(ctx)
|
||||
return embed, view
|
||||
|
||||
# The command entry point calls this
|
||||
async def display_server_queue(ctx: Context, songs, n):
|
||||
|
||||
async def display_server_queue(ctx: Context, songs: list, n: int) -> None:
|
||||
"""
|
||||
Display the server's queue with interactive controls
|
||||
|
||||
Args:
|
||||
ctx: Command context
|
||||
songs: List of songs in queue
|
||||
n: Total number of songs
|
||||
"""
|
||||
embed, view = await generate_queue_ui(ctx)
|
||||
await ctx.send(embed=embed, view=view)
|
||||
|
||||
# Build a display message for queuing a new song
|
||||
async def queue_message(ctx: Context, data: dict):
|
||||
|
||||
async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
|
||||
"""
|
||||
Display a message when a song is queued
|
||||
|
||||
Args:
|
||||
ctx: Command context
|
||||
data: Song data dictionary
|
||||
"""
|
||||
msg = discord.Embed(
|
||||
title="🎵 Song Queued",
|
||||
description=f"**{data['title']}**",
|
||||
color=discord.Color.green())
|
||||
title="🎵 Song Queued",
|
||||
description=f"**{data['title']}**",
|
||||
color=discord.Color.green()
|
||||
)
|
||||
|
||||
msg.set_thumbnail(url=data['thumbnail'])
|
||||
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
|
||||
@@ -276,9 +344,23 @@ async def queue_message(ctx: Context, data: dict):
|
||||
|
||||
await ctx.send(embed=msg)
|
||||
|
||||
# Converts seconds into more readable format
|
||||
def format_time(seconds):
|
||||
|
||||
# ===================================
|
||||
# Utility Functions
|
||||
# ===================================
|
||||
|
||||
def format_time(seconds: int | float) -> str:
|
||||
"""
|
||||
Convert seconds into readable time format (MM:SS or HH:MM:SS)
|
||||
|
||||
Args:
|
||||
seconds: Time in seconds
|
||||
|
||||
Returns:
|
||||
Formatted time string
|
||||
"""
|
||||
try:
|
||||
seconds = int(seconds)
|
||||
minutes, seconds = divmod(seconds, 60)
|
||||
hours, minutes = divmod(minutes, 60)
|
||||
|
||||
|
||||
297
config.py
297
config.py
@@ -1,115 +1,198 @@
|
||||
# config.py
|
||||
# This file should parse all configurations within the bot
|
||||
# Modern configuration management using environment variables
|
||||
|
||||
import os
|
||||
import discord
|
||||
from discord import Color
|
||||
import json
|
||||
from dotenv import load_dotenv
|
||||
from typing import Optional
|
||||
|
||||
# Read data from JSON file in ./data/config.json
|
||||
def read_data():
|
||||
with open("./data/config.json", "r") as file:
|
||||
return json.load(file)
|
||||
# Load environment variables from .env file
|
||||
load_dotenv()
|
||||
|
||||
raise Exception("Could not load config data")
|
||||
|
||||
|
||||
def get_spotify_creds():
|
||||
data = read_data()
|
||||
data = data.get("spotify")
|
||||
|
||||
SCID = data.get("SCID")
|
||||
secret = data.get("SECRET")
|
||||
|
||||
return SCID, secret
|
||||
|
||||
|
||||
# Reading prefix
|
||||
def get_prefix():
|
||||
data = read_data()
|
||||
|
||||
prefix = data.get('prefix')
|
||||
if prefix:
|
||||
return prefix
|
||||
|
||||
raise Exception("Missing config data: prefix")
|
||||
|
||||
|
||||
# Fetch the bot secret token
|
||||
def get_login(bot):
|
||||
data = read_data()
|
||||
if data is False or data.get(f"{bot}bot") is False:
|
||||
raise Exception(f"Missing config data: {bot}bot")
|
||||
|
||||
data = data.get(f"{bot}bot")
|
||||
return data.get("secret")
|
||||
|
||||
|
||||
# Read the status and text data
|
||||
def get_status():
|
||||
data = read_data()
|
||||
|
||||
if data is False or data.get('status') is False:
|
||||
raise Exception("Missing config data: status")
|
||||
|
||||
# Find type
|
||||
data = data.get('status')
|
||||
return translate_status(
|
||||
data.get('type'),
|
||||
data.get('text'),
|
||||
data.get('link')
|
||||
)
|
||||
|
||||
# Get colors from colorscheme
|
||||
def get_color(color):
|
||||
data = read_data()
|
||||
|
||||
if data is False or data.get('status') is False:
|
||||
raise Exception("Missing config data: color")
|
||||
|
||||
# Grab color
|
||||
string_value = data.get("colorscheme").get(color)
|
||||
hex_value = Color.from_str(string_value)
|
||||
return hex_value
|
||||
|
||||
|
||||
# Taking JSON variables and converting them into a presence
|
||||
# Use None url incase not provided
|
||||
def translate_status(status_type, status_text, status_url=None):
|
||||
if status_type == "playing":
|
||||
return discord.Activity(
|
||||
type=discord.ActivityType.playing,
|
||||
name=status_text
|
||||
)
|
||||
|
||||
|
||||
elif status_type == "streaming":
|
||||
return discord.Activity(
|
||||
type=discord.ActivityType.streaming,
|
||||
name=status_text,
|
||||
url=status_url
|
||||
)
|
||||
|
||||
elif status_type == "listening":
|
||||
return discord.Activity(
|
||||
type=discord.ActivityType.listening,
|
||||
name=status_text
|
||||
)
|
||||
|
||||
|
||||
elif status_type == "watching":
|
||||
return discord.Activity(
|
||||
type=discord.ActivityType.watching,
|
||||
name=status_text
|
||||
)
|
||||
|
||||
elif status_type == "competing":
|
||||
return discord.Activity(
|
||||
type=discord.ActivityType.competing,
|
||||
name=status_text
|
||||
)
|
||||
|
||||
#TODO
|
||||
# Implement custom status type
|
||||
# ===================================
|
||||
# Environment Detection
|
||||
# ===================================
|
||||
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev").lower()
|
||||
IS_PRODUCTION = ENVIRONMENT == "live"
|
||||
IS_DEVELOPMENT = ENVIRONMENT == "dev"
|
||||
|
||||
# ===================================
|
||||
# Discord Configuration
|
||||
# ===================================
|
||||
def get_discord_token() -> str:
|
||||
"""Get the appropriate Discord token based on environment"""
|
||||
if IS_PRODUCTION:
|
||||
token = os.getenv("DISCORD_TOKEN_LIVE")
|
||||
if not token:
|
||||
raise ValueError("DISCORD_TOKEN_LIVE not found in environment!")
|
||||
return token
|
||||
else:
|
||||
raise Exception(f"Invalid status type: {status_type}")
|
||||
token = os.getenv("DISCORD_TOKEN_DEV")
|
||||
if not token:
|
||||
raise ValueError("DISCORD_TOKEN_DEV not found in environment!")
|
||||
return token
|
||||
|
||||
|
||||
def get_prefix() -> str:
|
||||
"""Get command prefix (default: =)"""
|
||||
return os.getenv("DISCORD_PREFIX", "=")
|
||||
|
||||
|
||||
# ===================================
|
||||
# Spotify Configuration
|
||||
# ===================================
|
||||
def get_spotify_creds() -> tuple[str, str]:
|
||||
"""Get Spotify API credentials"""
|
||||
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
||||
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||||
|
||||
if not client_id or not client_secret:
|
||||
raise ValueError("Spotify credentials not found in environment!")
|
||||
|
||||
return client_id, client_secret
|
||||
|
||||
|
||||
# ===================================
|
||||
# Database Configuration
|
||||
# ===================================
|
||||
def get_db_path() -> str:
|
||||
"""Get SQLite database path"""
|
||||
return os.getenv("DB_PATH", "./data/music.db")
|
||||
|
||||
|
||||
# Future PostgreSQL config
|
||||
def get_postgres_url() -> Optional[str]:
|
||||
"""Get PostgreSQL connection URL (for future migration)"""
|
||||
host = os.getenv("DB_HOST")
|
||||
port = os.getenv("DB_PORT", "5432")
|
||||
name = os.getenv("DB_NAME")
|
||||
user = os.getenv("DB_USER")
|
||||
password = os.getenv("DB_PASSWORD")
|
||||
|
||||
if all([host, name, user, password]):
|
||||
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
|
||||
return None
|
||||
|
||||
|
||||
# ===================================
|
||||
# Bot Status/Presence
|
||||
# ===================================
|
||||
def get_status() -> discord.Activity:
|
||||
"""Get bot status/presence"""
|
||||
status_type = os.getenv("STATUS_TYPE", "listening").lower()
|
||||
status_text = os.getenv("STATUS_TEXT", "Zilean's Theme")
|
||||
status_url = os.getenv("STATUS_URL")
|
||||
|
||||
return translate_status(status_type, status_text, status_url)
|
||||
|
||||
|
||||
def translate_status(
|
||||
status_type: str,
|
||||
status_text: str,
|
||||
status_url: Optional[str] = None
|
||||
) -> discord.Activity:
|
||||
"""Convert status type string to Discord Activity"""
|
||||
|
||||
status_map = {
|
||||
"playing": discord.ActivityType.playing,
|
||||
"streaming": discord.ActivityType.streaming,
|
||||
"listening": discord.ActivityType.listening,
|
||||
"watching": discord.ActivityType.watching,
|
||||
"competing": discord.ActivityType.competing,
|
||||
}
|
||||
|
||||
activity_type = status_map.get(status_type)
|
||||
if not activity_type:
|
||||
raise ValueError(f"Invalid status type: {status_type}")
|
||||
|
||||
# Streaming requires URL
|
||||
if status_type == "streaming":
|
||||
if not status_url:
|
||||
raise ValueError("Streaming status requires STATUS_URL")
|
||||
return discord.Activity(
|
||||
type=activity_type,
|
||||
name=status_text,
|
||||
url=status_url
|
||||
)
|
||||
|
||||
return discord.Activity(type=activity_type, name=status_text)
|
||||
|
||||
|
||||
# ===================================
|
||||
# Color Scheme
|
||||
# ===================================
|
||||
def get_color(color_name: str) -> Color:
|
||||
"""Get color from environment (hex format)"""
|
||||
color_map = {
|
||||
"primary": os.getenv("COLOR_PRIMARY", "#7289DA"),
|
||||
"success": os.getenv("COLOR_SUCCESS", "#43B581"),
|
||||
"error": os.getenv("COLOR_ERROR", "#F04747"),
|
||||
"warning": os.getenv("COLOR_WARNING", "#FAA61A"),
|
||||
}
|
||||
|
||||
hex_value = color_map.get(color_name.lower())
|
||||
if not hex_value:
|
||||
# Default to Discord blurple
|
||||
hex_value = "#7289DA"
|
||||
|
||||
return Color.from_str(hex_value)
|
||||
|
||||
|
||||
# ===================================
|
||||
# Logging Configuration
|
||||
# ===================================
|
||||
def get_log_level() -> str:
|
||||
"""Get logging level from environment"""
|
||||
return os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
|
||||
|
||||
# ===================================
|
||||
# Legacy Support (for backward compatibility)
|
||||
# ===================================
|
||||
def get_login(bot: str) -> str:
|
||||
"""Legacy function - maps to new get_discord_token()"""
|
||||
# Ignore the 'bot' parameter, use ENVIRONMENT instead
|
||||
return get_discord_token()
|
||||
|
||||
|
||||
# ===================================
|
||||
# Validation
|
||||
# ===================================
|
||||
def validate_config():
|
||||
"""Validate that all required config is present"""
|
||||
errors = []
|
||||
|
||||
# Check Discord token
|
||||
try:
|
||||
get_discord_token()
|
||||
except ValueError as e:
|
||||
errors.append(str(e))
|
||||
|
||||
# Check Spotify creds
|
||||
try:
|
||||
get_spotify_creds()
|
||||
except ValueError as e:
|
||||
errors.append(str(e))
|
||||
|
||||
if errors:
|
||||
error_msg = "\n".join(errors)
|
||||
raise ValueError(f"Configuration errors:\n{error_msg}")
|
||||
|
||||
print(f"✅ Configuration validated (Environment: {ENVIRONMENT})")
|
||||
|
||||
|
||||
# ===================================
|
||||
# Startup Info
|
||||
# ===================================
|
||||
def print_config_info():
|
||||
"""Print configuration summary (without secrets!)"""
|
||||
print("=" * 50)
|
||||
print("🎵 Groovy-Zilean Configuration")
|
||||
print("=" * 50)
|
||||
print(f"Environment: {ENVIRONMENT.upper()}")
|
||||
print(f"Prefix: {get_prefix()}")
|
||||
print(f"Database: {get_db_path()}")
|
||||
print(f"Log Level: {get_log_level()}")
|
||||
print(f"Spotify: {'Configured ✅' if os.getenv('SPOTIFY_CLIENT_ID') else 'Not configured ❌'}")
|
||||
print("=" * 50)
|
||||
|
||||
13
main.py
13
main.py
@@ -3,6 +3,16 @@ from bot import Groovy
|
||||
import config
|
||||
import help
|
||||
|
||||
# Validate configuration before starting
|
||||
try:
|
||||
config.validate_config()
|
||||
config.print_config_info()
|
||||
except ValueError as e:
|
||||
print(f"❌ Configuration Error:\n{e}")
|
||||
print("\n💡 Tip: Copy .env.example to .env and fill in your values")
|
||||
exit(1)
|
||||
|
||||
# Initialize bot with validated config
|
||||
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
|
||||
|
||||
@client.event
|
||||
@@ -25,4 +35,5 @@ async def on_voice_state_update(member, before, after):
|
||||
except Exception as e:
|
||||
print(f"Error auto-disconnecting: {e}")
|
||||
|
||||
client.run(config.get_login("live"))
|
||||
# Run bot with environment-appropriate token
|
||||
client.run(config.get_discord_token())
|
||||
|
||||
31
mypy.ini
Normal file
31
mypy.ini
Normal file
@@ -0,0 +1,31 @@
|
||||
# mypy configuration for groovy-zilean
|
||||
# Type checking configuration that's practical for a Discord bot
|
||||
|
||||
[mypy]
|
||||
# Python version
|
||||
python_version = 3.13
|
||||
|
||||
# Ignore missing imports for libraries without type stubs
|
||||
# Discord.py, spotipy, yt-dlp don't have complete type stubs
|
||||
ignore_missing_imports = True
|
||||
|
||||
# Be strict about our own code
|
||||
# Start lenient, can tighten later
|
||||
disallow_untyped_defs = False
|
||||
check_untyped_defs = True
|
||||
# Too noisy with discord.py
|
||||
warn_return_any = False
|
||||
warn_unused_configs = True
|
||||
|
||||
# Exclude patterns
|
||||
exclude = venv/
|
||||
|
||||
# Per-module overrides
|
||||
[mypy-discord.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-spotipy.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-yt_dlp.*]
|
||||
ignore_missing_imports = True
|
||||
@@ -1,12 +1,14 @@
|
||||
# Core bot framework
|
||||
discord.py==2.6.4
|
||||
aiohttp==3.8.4
|
||||
PyNaCl==1.5.0
|
||||
spotipy==2.23.0
|
||||
discord.py>=2.6.4
|
||||
aiohttp>=3.9.0
|
||||
PyNaCl>=1.5.0
|
||||
spotipy>=2.23.0
|
||||
|
||||
# Configuration management
|
||||
python-dotenv>=1.0.0
|
||||
|
||||
# YouTube extractor
|
||||
yt-dlp>=2025.10.14
|
||||
|
||||
# System dependencies
|
||||
PyAudio==0.2.13
|
||||
mutagen==1.46.0
|
||||
# Audio metadata (if needed by yt-dlp)
|
||||
mutagen>=1.47.0
|
||||
|
||||
Reference in New Issue
Block a user