Compare commits

4 Commits

13 changed files with 1215 additions and 527 deletions

67
.env_example Normal file
View File

@@ -0,0 +1,67 @@
# Groovy-Zilean Configuration
# Copy this file to .env and fill in your values
# NEVER commit .env to git!
# ===================================
# Environment Selection
# ===================================
# Set to "dev" for development bot, "live" for production
ENVIRONMENT=dev
# ===================================
# Discord Bot Tokens
# ===================================
DISCORD_TOKEN_DEV=
DISCORD_TOKEN_LIVE=
# Bot settings
DISCORD_PREFIX=
# ===================================
# Spotify Integration
# ===================================
SPOTIFY_CLIENT_ID=
SPOTIFY_CLIENT_SECRET=
# ===================================
# Database
# ===================================
# For now (SQLite)
DB_PATH=./data/music.db
# For future (PostgreSQL)
# DB_HOST=localhost
# DB_PORT=5432
# DB_NAME=groovy_zilean
# DB_USER=groovy
# DB_PASSWORD=your_db_password
# ===================================
# Bot Status/Presence
# ===================================
# Types: playing, listening, watching, streaming, competing
STATUS_TYPE=listening
STATUS_TEXT==help | /help
# STATUS_URL= # Only needed for streaming type
# ===================================
# Color Scheme (hex colors)
# ===================================
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
# ===================================
# Web Dashboard (Future)
# ===================================
# DISCORD_CLIENT_ID=your_oauth_client_id
# DISCORD_CLIENT_SECRET=your_oauth_secret
# DISCORD_REDIRECT_URI=http://localhost:8000/callback
# WEB_SECRET_KEY=random_secret_for_sessions
# ===================================
# Logging
# ===================================
LOG_LEVEL=INFO
# Options: DEBUG, INFO, WARNING, ERROR, CRITICAL

2
.gitignore vendored
View File

@@ -160,4 +160,4 @@ cython_debug/
#.idea/ #.idea/
# My stuff # My stuff
data/ data/*.db

201
SETUP.md Normal file
View File

@@ -0,0 +1,201 @@
# Groovy-Zilean Setup Guide
Quick start guide for getting groovy-zilean running locally or in production.
---
## Prerequisites
- **Python 3.11+** (3.9 deprecated by yt-dlp)
- **FFmpeg** installed on your system
- Discord Bot Token ([Discord Developer Portal](https://discord.com/developers/applications))
- Spotify API Credentials ([Spotify Developer Dashboard](https://developer.spotify.com/dashboard))
---
## 1. Clone & Setup Environment
```bash
# Clone the repository
git clone <your-repo-url>
cd groovy-zilean
# Create virtual environment (keeps dependencies isolated)
python3.12 -m venv venv
# Activate virtual environment
source venv/bin/activate # Linux/Mac
# OR
venv\Scripts\activate # Windows
# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
```
---
## 2. Configuration
### Create `.env` file
```bash
# Copy the example file
cp .env.example .env
# Edit with your favorite editor
nano .env
# OR
vim .env
# OR
code .env # VS Code
```
### Fill in Required Values
At minimum, you need:
```env
# Choose environment: "dev" or "live"
ENVIRONMENT=dev
# Discord bot tokens (get from Discord Developer Portal)
DISCORD_TOKEN_DEV=your_dev_bot_token_here
DISCORD_TOKEN_LIVE=your_live_bot_token_here
# Spotify credentials (get from Spotify Developer Dashboard)
SPOTIFY_CLIENT_ID=your_spotify_client_id
SPOTIFY_CLIENT_SECRET=your_spotify_secret
```
**Optional but recommended:**
```env
# Bot settings
DISCORD_PREFIX==
STATUS_TYPE=listening
STATUS_TEXT=Zilean's Theme
# Colors (hex format)
COLOR_PRIMARY=#7289DA
COLOR_SUCCESS=#43B581
COLOR_ERROR=#F04747
COLOR_WARNING=#FAA61A
```
---
## 3. Create Data Directory
```bash
# Create directory for database
mkdir -p data
# The database file (music.db) will be created automatically on first run
```
---
## 4. Run the Bot
### Development Mode
```bash
# Make sure .env has ENVIRONMENT=dev
source venv/bin/activate # If not already activated
python main.py
```
### Production Mode
```bash
# Change .env to ENVIRONMENT=live
source venv/bin/activate
python main.py
```
---
## 5. Switching Between Dev and Live
**Super easy!** Just change one line in `.env`:
```bash
# For development bot
ENVIRONMENT=dev
# For production bot
ENVIRONMENT=live
```
The bot will automatically use the correct token!
---
## Troubleshooting
### "Configuration Error: DISCORD_TOKEN_DEV not found"
- Make sure you copied `.env.example` to `.env`
- Check that `.env` has the token values filled in
- Token should NOT have quotes around it
### "No module named 'dotenv'"
```bash
pip install python-dotenv
```
### "FFmpeg not found"
```bash
# Debian/Ubuntu
sudo apt install ffmpeg
# macOS
brew install ffmpeg
# Arch Linux
sudo pacman -S ffmpeg
```
### Python version issues
yt-dlp requires Python 3.10+. Check your version:
```bash
python --version
```
If too old, install newer Python and recreate venv:
```bash
python3.12 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
---
## Project Structure
```
groovy-zilean/
├── main.py # Entry point (run this!)
├── bot.py # Bot class definition
├── config.py # Configuration management
├── .env # YOUR secrets (never commit!)
├── .env.example # Template (safe to commit)
├── requirements.txt # Python dependencies
├── cogs/
│ └── music/ # Music functionality
│ ├── main.py # Commands
│ ├── queue.py # Queue management
│ ├── util.py # Utilities
│ └── translate.py # URL/search handling
└── data/
└── music.db # SQLite database (auto-created)
```
---
## Next Steps
- Check out `PRODUCTION_ROADMAP.md` for the full development plan
- See `README.md` for feature list and usage
- Join your test server and try commands!
**Happy coding!** 🎵⏱️

25
bot.py
View File

@@ -1,20 +1,30 @@
"""
Groovy-Zilean Bot Class
Main bot implementation with cog loading and background tasks
"""
from typing import Any
from discord.ext import commands from discord.ext import commands
from discord.ext import tasks from discord.ext import tasks
from cogs.music.main import music from cogs.music.main import music
from help import GroovyHelp # Import the new Help Cog from help import GroovyHelp
cogs = [ # List of cogs to load on startup
cogs: list[type[commands.Cog]] = [
music, music,
GroovyHelp GroovyHelp
] ]
class Groovy(commands.Bot): class Groovy(commands.Bot):
def __init__(self, *args, **kwargs): """Custom bot class with automatic cog loading and inactivity checking"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
# We force help_command to None because we are using a custom Cog for it # We force help_command to None because we are using a custom Cog for it
# But we pass all other args (like command_prefix) to the parent # But we pass all other args (like command_prefix) to the parent
super().__init__(*args, help_command=None, **kwargs) super().__init__(*args, help_command=None, **kwargs)
async def on_ready(self): async def on_ready(self) -> None:
import config # Imported here to avoid circular dependencies if any import config # Imported here to avoid circular dependencies if any
# Set status # Set status
@@ -47,11 +57,12 @@ class Groovy(commands.Bot):
print(f"{self.user} is ready and online!") print(f"{self.user} is ready and online!")
@tasks.loop(seconds=30) @tasks.loop(seconds=30)
async def inactivity_checker(self): async def inactivity_checker(self) -> None:
"""Check for inactive voice connections""" """Check for inactive voice connections every 30 seconds"""
from cogs.music import util from cogs.music import util
await util.check_inactivity(self) await util.check_inactivity(self)
@inactivity_checker.before_loop @inactivity_checker.before_loop
async def before_inactivity_checker(self): async def before_inactivity_checker(self) -> None:
"""Wait for bot to be ready before starting inactivity checker"""
await self.wait_until_ready() await self.wait_until_ready()

315
cogs/music/db_manager.py Normal file
View File

@@ -0,0 +1,315 @@
"""
Database Manager for Groovy-Zilean
Centralizes all database operations and provides a clean interface.
Makes future PostgreSQL migration much easier.
"""
import sqlite3
from contextlib import contextmanager
from typing import Optional, List, Tuple, Any
import config
class DatabaseManager:
"""Manages database connections and operations"""
def __init__(self):
self.db_path = config.get_db_path()
@contextmanager
def get_connection(self):
"""
Context manager for database connections.
Automatically handles commit/rollback and closing.
Usage:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(...)
"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def initialize_tables(self):
"""Create database tables if they don't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create servers table
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# Set all to not playing on startup
cursor.execute("UPDATE servers SET is_playing = 0;")
# Migrations for existing databases - add columns if missing
migrations = [
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''")
]
for col_name, col_type in migrations:
try:
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};")
except sqlite3.OperationalError:
# Column already exists, skip
pass
# Create songs/queue table
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
# Clear all songs on startup
cursor.execute("DELETE FROM songs;")
# ===================================
# Server Operations
# ===================================
def ensure_server_exists(self, server_id: str) -> None:
"""Add server to database if it doesn't exist"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
def set_server_playing(self, server_id: str, playing: bool) -> None:
"""Update server playing status"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
def is_server_playing(self, server_id: str) -> bool:
"""Check if server is currently playing"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return True if res and res[0] == 1 else False
def set_current_song(self, server_id: str, title: str, url: str, thumbnail: str = "", duration: int = 0, start_time: float = 0) -> None:
"""Update currently playing song information"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
def get_current_song(self, server_id: str) -> dict:
"""Get current song info"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
def get_current_progress(self, server_id: str) -> Tuple[int, int, float]:
"""Get playback progress (elapsed, duration, percentage)"""
import time
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
if not result or result[2] == 0:
return 0, 0, 0.0
start_time, duration, _ = result
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
# ===================================
# Queue Operations
# ===================================
def add_song(self, server_id: str, song_link: str, queued_by: str, title: str, thumbnail: str = "", duration: int = 0, position: Optional[int] = None) -> int:
"""Add song to queue, returns position"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
if position is None:
# Add to end
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
position = (max_pos + 1) if max_pos is not None else 0
else:
# Insert at specific position (shift others down)
cursor.execute("UPDATE songs SET position = position + 1 WHERE server_id = ? AND position >= ?",
(server_id, position))
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""",
(server_id, song_link, queued_by, position, title, thumbnail, duration))
return position
def get_next_song(self, server_id: str) -> Optional[Tuple]:
"""Get the next song in queue (doesn't remove it)"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
return cursor.fetchone()
def remove_song(self, server_id: str, position: int) -> None:
"""Remove song at position from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''DELETE FROM songs WHERE server_id = ? AND position = ?''', (server_id, position))
def get_queue(self, server_id: str, limit: int = 10) -> Tuple[int, List[Tuple]]:
"""Get songs in queue (returns max_position, list of songs)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT ?",
(server_id, limit))
songs = cursor.fetchall()
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
max_pos = cursor.fetchone()[0]
max_pos = max_pos if max_pos is not None else -1
return max_pos, songs
def clear_queue(self, server_id: str) -> None:
"""Clear all songs from queue"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
def shuffle_queue(self, server_id: str) -> bool:
"""Shuffle the queue randomly, returns success"""
import random
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position",
(server_id,))
songs = cursor.fetchall()
if len(songs) <= 1:
return False
random.shuffle(songs)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
for i, s in enumerate(songs):
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)",
(server_id, s[1], s[2], i, s[3], s[4], s[5]))
return True
# ===================================
# Settings Operations
# ===================================
def get_loop_mode(self, server_id: str) -> str:
"""Get loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'off'
def set_loop_mode(self, server_id: str, mode: str) -> None:
"""Set loop mode: 'off', 'song', or 'queue'"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
def get_volume(self, server_id: str) -> int:
"""Get volume (0-200)"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 100
def set_volume(self, server_id: str, volume: int) -> int:
"""Set volume (0-200), returns the set volume"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (volume, server_id))
return volume
def get_effect(self, server_id: str) -> str:
"""Get current audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
return res[0] if res else 'none'
def set_effect(self, server_id: str, effect: str) -> None:
"""Set audio effect"""
with self.get_connection() as conn:
cursor = conn.cursor()
self.ensure_server_exists(server_id)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (effect, server_id))
# Global instance
db = DatabaseManager()

View File

@@ -12,28 +12,7 @@ from cogs.music.help import music_help
import spotipy import spotipy
from spotipy.oauth2 import SpotifyClientCredentials from spotipy.oauth2 import SpotifyClientCredentials
import config # Use centralized config
# Fix this pls
import json
#from .. import config
# Read data from JSON file in ./data/config.json
def read_data():
with open("./data/config.json", "r") as file:
return json.load(file)
raise Exception("Could not load config data")
def get_spotify_creds():
data = read_data()
data = data.get("spotify")
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
@@ -51,10 +30,13 @@ class music(commands.Cog):
help_command.cog = self help_command.cog = self
self.help_command = help_command self.help_command = help_command
SCID, secret = get_spotify_creds() # Get Spotify credentials from centralized config
spotify_id, spotify_secret = config.get_spotify_creds()
# Authentication - without user # Authentication - without user
client_credentials_manager = SpotifyClientCredentials(client_id=SCID, client_credentials_manager = SpotifyClientCredentials(
client_secret=secret) client_id=spotify_id,
client_secret=spotify_secret
)
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager) self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
@@ -346,7 +328,7 @@ class music(commands.Cog):
app_commands.Choice(name="Song", value="song"), app_commands.Choice(name="Song", value="song"),
app_commands.Choice(name="Queue", value="queue") app_commands.Choice(name="Queue", value="queue")
]) ])
async def loop(self, ctx: Context, mode: str = None): async def loop(self, ctx: Context, mode: str | None = None):
"""Toggle between loop modes or set a specific mode""" """Toggle between loop modes or set a specific mode"""
server = ctx.guild server = ctx.guild
@@ -409,7 +391,7 @@ class music(commands.Cog):
description="Set playback volume", description="Set playback volume",
aliases=['vol', 'v']) aliases=['vol', 'v'])
@app_commands.describe(level="Volume level (0-200%, default shows current)") @app_commands.describe(level="Volume level (0-200%, default shows current)")
async def volume(self, ctx: Context, level: int = None): async def volume(self, ctx: Context, level: int | None = None):
"""Set or display the current volume""" """Set or display the current volume"""
server = ctx.guild server = ctx.guild
@@ -453,7 +435,7 @@ class music(commands.Cog):
description="Apply audio effects to playback", description="Apply audio effects to playback",
aliases=['fx', 'filter']) aliases=['fx', 'filter'])
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)") @app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
async def effect(self, ctx: Context, effect_name: str = None): async def effect(self, ctx: Context, effect_name: str | None = None):
"""Apply or list audio effects""" """Apply or list audio effects"""
server = ctx.guild server = ctx.guild

View File

@@ -1,14 +1,14 @@
from http import server """
import sqlite3 Queue management for Groovy-Zilean music bot
import random Now using centralized database manager for cleaner code
import time """
import discord import discord
import asyncio import asyncio
import time
from .translate import search_song from .translate import search_song
from .db_manager import db
db_path = "./data/music.db"
# Base FFmpeg options (will be modified by effects) # Base FFmpeg options (will be modified by effects)
BASE_FFMPEG_OPTS = { BASE_FFMPEG_OPTS = {
@@ -103,342 +103,224 @@ def get_effect_options(effect_name):
return effects.get(effect_name, effects['none']) return effects.get(effect_name, effects['none'])
# Creates the tables if they don't exist # ===================================
# Initialization
# ===================================
def initialize_tables(): def initialize_tables():
# Connect to the database """Initialize database tables"""
conn = sqlite3.connect(db_path) db.initialize_tables()
cursor = conn.cursor()
# Create servers table if it doesn't exist
cursor.execute('''CREATE TABLE IF NOT EXISTS servers (
server_id TEXT PRIMARY KEY,
is_playing INTEGER DEFAULT 0,
song_name TEXT,
song_url TEXT,
song_thumbnail TEXT,
loop_mode TEXT DEFAULT 'off',
volume INTEGER DEFAULT 100,
effect TEXT DEFAULT 'none',
song_start_time REAL DEFAULT 0,
song_duration INTEGER DEFAULT 0
);''')
# Set all to not playing # ===================================
cursor.execute("UPDATE servers SET is_playing = 0;") # Queue Management
# ===================================
# Add new columns if they don't exist (for existing databases) async def add_song(server_id, details, queued_by, position=None):
# Migrations for existing databases """
columns = [ Add a song to the queue
("loop_mode", "TEXT DEFAULT 'off'"),
("volume", "INTEGER DEFAULT 100"),
("effect", "TEXT DEFAULT 'none'"),
("song_start_time", "REAL DEFAULT 0"),
("song_duration", "INTEGER DEFAULT 0"),
("song_thumbnail", "TEXT DEFAULT ''"),
("song_url", "TEXT DEFAULT ''") # NEW
]
for col_name, col_type in columns: Args:
try: server_id: Discord server ID
cursor.execute(f"ALTER TABLE servers ADD COLUMN {col_name} {col_type};") details: Dictionary with song info (url, title, thumbnail, duration) or string
except sqlite3.OperationalError: queued_by: Username who queued the song
pass position: Optional position in queue (None = end of queue)
cursor.execute('''CREATE TABLE IF NOT EXISTS songs (
server_id TEXT NOT NULL,
song_link TEXT,
queued_by TEXT,
position INTEGER NOT NULL,
title TEXT,
thumbnail TEXT,
duration INTEGER,
PRIMARY KEY (position),
FOREIGN KEY (server_id) REFERENCES servers(server_id)
);''')
cursor.execute("DELETE FROM songs;")
conn.commit()
conn.close()
# Queue a song in the db
async def add_song(server_id, details, queued_by):
# Connect to db
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
max_order_num = await get_max(server_id, cursor) + 1
Returns:
Position in queue
"""
if isinstance(details, str): if isinstance(details, str):
# Fallback for raw strings # Fallback for raw strings (legacy support)
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""", pos = db.add_song(
(server_id, "Not grabbed", queued_by, max_order_num, details, "Unknown", 0)) server_id=str(server_id),
song_link="Not grabbed",
queued_by=queued_by,
title=details,
thumbnail="Unknown",
duration=0,
position=position
)
else: else:
# Save exact duration and thumbnail from the start # Standard dictionary format
cursor.execute("""INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)""", pos = db.add_song(
(server_id, details['url'], queued_by, max_order_num, details['title'], details['thumbnail'], details['duration'])) server_id=str(server_id),
song_link=details['url'],
queued_by=queued_by,
title=details['title'],
thumbnail=details.get('thumbnail', ''),
duration=details.get('duration', 0),
position=position
)
conn.commit() return pos
conn.close()
return max_order_num
# Pop song from server (respects loop mode)
async def pop(server_id, ignore=False, skip_mode=False): async def pop(server_id, ignore=False, skip_mode=False):
""" """
Pop next song from queue Pop next song from queue
ignore: Skip the song without returning URL
skip_mode: True when called from skip command (affects loop song behavior) Args:
server_id: Discord server ID
ignore: Skip the song without returning URL
skip_mode: True when called from skip command (affects loop song behavior)
Returns:
Song URL or None
""" """
# Connect to db result = db.get_next_song(str(server_id))
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# JUST INCASE!
await add_server(server_id, cursor, conn)
# Fetch info: link(1), title(4), thumbnail(5), duration(6)
cursor.execute('''SELECT * FROM songs WHERE server_id = ? ORDER BY position LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
if result is None: if result is None:
return None return None
elif ignore:
await mark_song_as_finished(server_id, result[3])
return None
elif result[1] == "Not grabbed":
# Lazy load logic
song_list = await search_song(result[4])
if not song_list:
return None
song = song_list[0]
await set_current_song(server_id, song['title'], song.get('thumbnail', ''), song.get('duration', 0)) # result format: (server_id, song_link, queued_by, position, title, thumbnail, duration)
server_id_str, song_link, queued_by, position, title, thumbnail, duration = result
if ignore:
db.remove_song(str(server_id), position)
return None
# Handle lazy-loaded songs (not yet fetched from YouTube)
if song_link == "Not grabbed":
song_list = await search_song(title)
if not song_list:
db.remove_song(str(server_id), position)
return None
song = song_list[0]
await set_current_song(
server_id,
song['title'],
song['url'],
song.get('thumbnail', ''),
song.get('duration', 0)
)
# Check loop mode before removing # Check loop mode before removing
loop_mode = await get_loop_mode(server_id) loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': # Only remove if not looping song if loop_mode != 'song':
await mark_song_as_finished(server_id, result[3]) db.remove_song(str(server_id), position)
return song['url'] return song['url']
# Pre-grabbed logic (Standard) # Standard pre-fetched song
# result[1] is url, result[5] is thumbnail, result[6] is duration await set_current_song(server_id, title, song_link, thumbnail, duration)
await set_current_song(server_id, result[4], result[1], result[5], result[6])
# Check loop mode before removing # Check loop mode before removing
loop_mode = await get_loop_mode(server_id) loop_mode = await get_loop_mode(server_id)
if loop_mode != 'song': # Only remove if not looping song if loop_mode != 'song':
await mark_song_as_finished(server_id, result[3]) db.remove_song(str(server_id), position)
return result[1] return song_link
# Add server to db if first time queuing
async def add_server(server_id, cursor, conn):
cursor.execute('SELECT COUNT(*) FROM servers WHERE server_id = ?', (server_id,))
if cursor.fetchone()[0] == 0:
cursor.execute('''INSERT INTO servers (server_id, loop_mode, volume, effect, song_thumbnail, song_url)
VALUES (?, 'off', 100, 'none', '', '')''', (server_id,))
conn.commit()
# set song as played and update indexes
async def mark_song_as_finished(server_id, order_num):
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Update the song as finished
cursor.execute('''DELETE FROM songs
WHERE server_id = ? AND position = ?''',
(server_id, order_num))
# Close connection
conn.commit()
conn.close()
# set the current playing song of the server
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
start_time = time.time()
# Ensure duration is an integer
try:
duration = int(duration)
except:
duration = 0
cursor.execute(''' UPDATE servers
SET song_name = ?, song_url = ?, song_thumbnail = ?, song_start_time = ?, song_duration = ?
WHERE server_id = ?''',
(title, url, thumbnail, start_time, duration, server_id))
conn.commit()
conn.close()
# Returns dictionary with title and thumbnail
async def get_current_song(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(''' SELECT song_name, song_thumbnail, song_url FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
if result:
return {'title': result[0], 'thumbnail': result[1], 'url': result[2]}
return {'title': "Nothing", 'thumbnail': None, 'url': ''}
async def get_current_progress(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT song_start_time, song_duration, is_playing FROM servers WHERE server_id = ? LIMIT 1;''', (server_id,))
result = cursor.fetchone()
conn.close() # Close quickly
if not result or result[2] == 0:
return 0, 0, 0.0
start_time, duration, _ = result
if duration is None or duration == 0:
return 0, 0, 0.0
elapsed = int(time.time() - start_time)
elapsed = min(elapsed, duration)
percentage = (elapsed / duration) * 100 if duration > 0 else 0
return elapsed, duration, percentage
async def get_max(server_id, cursor):
cursor.execute("SELECT MAX(position) FROM songs WHERE server_id = ?", (server_id,))
result = cursor.fetchone()
return result[0] if result[0] is not None else -1
async def update_server(server_id, playing):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
val = 1 if playing else 0
cursor.execute("UPDATE servers SET is_playing = ? WHERE server_id = ?", (val, server_id))
conn.commit()
conn.close()
async def is_server_playing(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("SELECT is_playing FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return True if res[0] == 1 else False
async def clear(server_id):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
await update_server(server_id, False)
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,))
conn.commit()
conn.close()
async def grab_songs(server_id): async def grab_songs(server_id):
conn = sqlite3.connect(db_path) """
cursor = conn.cursor() Get current queue
await add_server(server_id, cursor, conn)
cursor.execute("SELECT title, duration, queued_by FROM songs WHERE server_id = ? ORDER BY position LIMIT 10", (server_id,))
songs = cursor.fetchall()
max_pos = await get_max(server_id, cursor)
conn.close()
return max_pos, songs
# --- Effects/Loop/Shuffle/Volume (Simplified Paste) --- Returns:
async def get_loop_mode(server_id): Tuple of (max_position, list_of_songs)
conn = sqlite3.connect(db_path) """
cursor = conn.cursor() return db.get_queue(str(server_id), limit=10)
await add_server(server_id, cursor, conn)
cursor.execute("SELECT loop_mode FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'off'
async def set_loop_mode(server_id, mode):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET loop_mode = ? WHERE server_id = ?", (mode, server_id))
conn.commit()
conn.close()
async def get_volume(server_id): async def clear(server_id):
conn = sqlite3.connect(db_path) """Clear the queue for a server"""
cursor = conn.cursor() db.clear_queue(str(server_id))
await add_server(server_id, cursor, conn) await update_server(server_id, False)
cursor.execute("SELECT volume FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 100
async def set_volume(server_id, vol):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET volume = ? WHERE server_id = ?", (vol, server_id))
conn.commit()
conn.close()
return vol
async def shuffle_queue(server_id): async def shuffle_queue(server_id):
conn = sqlite3.connect(db_path) """Shuffle the queue randomly"""
cursor = conn.cursor() return db.shuffle_queue(str(server_id))
await add_server(server_id, cursor, conn)
cursor.execute("SELECT position, song_link, queued_by, title, thumbnail, duration FROM songs WHERE server_id = ? ORDER BY position", (server_id,))
songs = cursor.fetchall() # ===================================
if len(songs) <= 1: # Server State Management
conn.close() # ===================================
return False
random.shuffle(songs) async def update_server(server_id, playing):
cursor.execute("DELETE FROM songs WHERE server_id = ?", (server_id,)) """Update server playing status"""
for i, s in enumerate(songs): db.set_server_playing(str(server_id), playing)
cursor.execute("INSERT INTO songs VALUES (?, ?, ?, ?, ?, ?, ?)", (server_id, s[1], s[2], i, s[3], s[4], s[5]))
conn.commit()
conn.close() async def is_server_playing(server_id):
return True """Check if server is currently playing"""
return db.is_server_playing(str(server_id))
async def set_current_song(server_id, title, url, thumbnail="", duration=0):
"""Set the currently playing song"""
db.set_current_song(
str(server_id),
title,
url,
thumbnail,
duration,
time.time() # start_time
)
async def get_current_song(server_id):
"""Get current song info"""
return db.get_current_song(str(server_id))
async def get_current_progress(server_id):
"""Get playback progress (elapsed, duration, percentage)"""
return db.get_current_progress(str(server_id))
# ===================================
# Settings Management
# ===================================
async def get_loop_mode(server_id):
"""Get loop mode: 'off', 'song', or 'queue'"""
return db.get_loop_mode(str(server_id))
async def set_loop_mode(server_id, mode):
"""Set loop mode: 'off', 'song', or 'queue'"""
db.set_loop_mode(str(server_id), mode)
async def get_volume(server_id):
"""Get volume (0-200)"""
return db.get_volume(str(server_id))
async def set_volume(server_id, vol):
"""Set volume (0-200)"""
return db.set_volume(str(server_id), vol)
async def get_effect(server_id): async def get_effect(server_id):
conn = sqlite3.connect(db_path) """Get current audio effect"""
cursor = conn.cursor() return db.get_effect(str(server_id))
await add_server(server_id, cursor, conn)
cursor.execute("SELECT effect FROM servers WHERE server_id = ?", (server_id,))
res = cursor.fetchone()
conn.close()
return res[0] if res else 'none'
async def set_effect(server_id, fx): async def set_effect(server_id, fx):
conn = sqlite3.connect(db_path) """Set audio effect"""
cursor = conn.cursor() db.set_effect(str(server_id), fx)
await add_server(server_id, cursor, conn)
cursor.execute("UPDATE servers SET effect = ? WHERE server_id = ?", (fx, server_id))
conn.commit() # ===================================
conn.close() # Effect Metadata
# ===================================
def list_all_effects(): def list_all_effects():
"""List all available audio effects"""
return [ return [
'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion', 'none', 'bassboost', 'nightcore', 'slowed', 'earrape', 'deepfry', 'distortion',
'reverse', 'chipmunk', 'demonic', 'underwater', 'robot', 'reverse', 'chipmunk', 'demonic', 'underwater', 'robot',
'8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone' '8d', 'vibrato', 'tremolo', 'echo', 'phone', 'megaphone'
] ]
def get_effect_emoji(effect_name): def get_effect_emoji(effect_name):
# Short list of emoji mappings """Get emoji for effect"""
emojis = { emojis = {
'none': '', # Changed to generic Sparkles 'none': '',
'bassboost': '💥', 'bassboost': '💥',
'nightcore': '', 'nightcore': '',
'slowed': '🐢', 'slowed': '🐢',
@@ -459,7 +341,9 @@ def get_effect_emoji(effect_name):
} }
return emojis.get(effect_name, '') return emojis.get(effect_name, '')
def get_effect_description(effect_name): def get_effect_description(effect_name):
"""Get description for effect"""
descriptions = { descriptions = {
'none': 'Normal audio', 'none': 'Normal audio',
'bassboost': 'MAXIMUM BASS 🔊', 'bassboost': 'MAXIMUM BASS 🔊',
@@ -482,43 +366,60 @@ def get_effect_description(effect_name):
} }
return descriptions.get(effect_name, 'Unknown effect') return descriptions.get(effect_name, 'Unknown effect')
# ===================================
# Playback
# ===================================
async def play(ctx): async def play(ctx):
"""
Main playback loop - plays songs from queue
"""
server_id = ctx.guild.id server_id = ctx.guild.id
voice_client = ctx.voice_client voice_client = ctx.voice_client
if voice_client is None: if voice_client is None:
await update_server(server_id, False) await update_server(server_id, False)
return return
# Wait for current song to finish
while voice_client.is_playing(): while voice_client.is_playing():
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Get next song
url = await pop(server_id) url = await pop(server_id)
if url is None: if url is None:
await update_server(server_id, False) await update_server(server_id, False)
return return
try: try:
# Scale volume down to prevent earrape # Get volume and effect settings
# User sees 0-200%, but internally we scale by 0.25 vol = await get_volume(server_id) / 100.0 * 0.25 # Scale down by 0.25
# So user's 100% = 0.25 actual volume (25%)
vol = await get_volume(server_id) / 100.0 * 0.25
fx = await get_effect(server_id) fx = await get_effect(server_id)
opts = get_effect_options(fx) opts = get_effect_options(fx)
# Create audio source
src = discord.FFmpegPCMAudio(url, **opts) src = discord.FFmpegPCMAudio(url, **opts)
src = discord.PCMVolumeTransformer(src, volume=vol) src = discord.PCMVolumeTransformer(src, volume=vol)
# After callback - play next song
def after(e): def after(e):
if e: print(e) if e:
if voice_client and not voice_client.is_connected(): return print(f"Playback error: {e}")
if voice_client and not voice_client.is_connected():
return
# Schedule next song
coro = play(ctx) coro = play(ctx)
fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop) fut = asyncio.run_coroutine_threadsafe(coro, ctx.bot.loop)
try: fut.result() try:
except: pass fut.result()
except Exception as ex:
print(f"Error in after callback: {ex}")
voice_client.play(src, after=after) voice_client.play(src, after=after)
except Exception as e: except Exception as e:
print(f"Play error: {e}") print(f"Play error: {e}")
# Try next song on error
await play(ctx) await play(ctx)

View File

@@ -1,5 +1,9 @@
# Handles translating urls and search terms """
URL and search query handling for Groovy-Zilean
Translates YouTube, Spotify, SoundCloud URLs and search queries into playable audio
"""
from typing import Any
import yt_dlp as ytdlp import yt_dlp as ytdlp
import spotipy import spotipy
@@ -22,7 +26,7 @@ ydl_opts = {
}, },
} }
async def main(url, sp): async def main(url: str, sp: spotipy.Spotify) -> list[dict[str, Any] | str]:
#url = url.lower() #url = url.lower()
@@ -60,7 +64,7 @@ async def main(url, sp):
return [] return []
async def search_song(search): async def search_song(search: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(f"ytsearch1:{search}", download=False) info = ydl.extract_info(f"ytsearch1:{search}", download=False)
@@ -89,7 +93,7 @@ async def search_song(search):
return [data] return [data]
async def spotify_song(url, sp): async def spotify_song(url: str, sp: spotipy.Spotify) -> list[dict[str, Any]]:
track = sp.track(url.split("/")[-1].split("?")[0]) track = sp.track(url.split("/")[-1].split("?")[0])
search = "" search = ""
@@ -106,7 +110,11 @@ async def spotify_song(url, sp):
return await search_song(query) return await search_song(query)
async def spotify_playlist(url, sp): async def spotify_playlist(url: str, sp: spotipy.Spotify) -> list[str | dict[str, Any]]:
"""
Get songs from a Spotify playlist
Returns a mixed list where first item is dict, rest are search strings
"""
# Get the playlist uri code # Get the playlist uri code
code = url.split("/")[-1].split("?")[0] code = url.split("/")[-1].split("?")[0]
@@ -115,42 +123,36 @@ async def spotify_playlist(url, sp):
results = sp.playlist_tracks(code)['items'] results = sp.playlist_tracks(code)['items']
except spotipy.exceptions.SpotifyException: except spotipy.exceptions.SpotifyException:
return [] return []
# Go through the tracks # Go through the tracks and build search queries
songs = [] songs: list[str | dict[str, Any]] = [] # Explicit type for mypy
for track in results: for track in results:
search = "" search = ""
# Fetch all artists # Fetch all artists
for artist in track['track']['artists']: for artist in track['track']['artists']:
# Add all artists to search # Add all artists to search
search += f"{artist['name']}, " search += f"{artist['name']}, "
# Remove last column # Remove last comma
search = search[:-2] search = search[:-2]
search += f" - {track['track']['name']}" search += f" - {track['track']['name']}"
songs.append(search) songs.append(search)
#searched_result = search_song(search) # Fetch first song's full data
#if searched_result == []:
#continue
#songs.append(searched_result[0])
while True: while True:
search_result = await search_song(songs[0]) search_result = await search_song(songs[0]) # type: ignore
if search_result == []: if search_result == []:
songs.pop(0) songs.pop(0)
continue continue
else: else:
songs[0] = search_result[0] songs[0] = search_result[0] # Replace string with dict
break break
return songs return songs
async def song_download(url): async def song_download(url: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)
@@ -180,7 +182,7 @@ async def song_download(url):
return [data] return [data]
async def playlist_download(url): async def playlist_download(url: str) -> list[dict[str, Any]]:
with ytdlp.YoutubeDL(ydl_opts) as ydl: with ytdlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)

View File

@@ -1,3 +1,9 @@
"""
Utility functions for Groovy-Zilean music bot
Handles voice channel operations, queue display, and inactivity tracking
"""
from typing import Any
import discord import discord
from discord.ext.commands.context import Context from discord.ext.commands.context import Context
from discord.ext.commands.converter import CommandError from discord.ext.commands.converter import CommandError
@@ -7,15 +13,29 @@ from . import queue
import asyncio import asyncio
# Track last activity time for each server # Track last activity time for each server
last_activity = {} last_activity: dict[int, float] = {}
# Joining/moving to the user's vc in a guild
async def join_vc(ctx: Context):
# ===================================
# Voice Channel Management
# ===================================
async def join_vc(ctx: Context) -> discord.VoiceClient:
"""
Join or move to the user's voice channel
Args:
ctx: Command context
Returns:
The voice client connection
Raises:
CommandError: If user is not in a voice channel
"""
# Get the user's vc # Get the user's vc
author_voice = getattr(ctx.author, "voice") author_voice = getattr(ctx.author, "voice")
if author_voice is None: if author_voice is None:
# Raise exception if user is not in vc
raise CommandError("User is not in voice channel") raise CommandError("User is not in voice channel")
# Get user's vc # Get user's vc
@@ -25,19 +45,26 @@ async def join_vc(ctx: Context):
# Join or move to the user's vc # Join or move to the user's vc
if ctx.voice_client is None: if ctx.voice_client is None:
vc = await vc.connect() vc_client = await vc.connect()
else: else:
# Safe to ignore type error for now vc_client = await ctx.voice_client.move_to(vc)
vc = await ctx.voice_client.move_to(vc)
# Update last activity # Update last activity
last_activity[ctx.guild.id] = asyncio.get_event_loop().time() last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
return vc return vc_client
# Leaving the voice channel of a user async def leave_vc(ctx: Context) -> None:
async def leave_vc(ctx: Context): """
Leave the voice channel and clean up
Args:
ctx: Command context
Raises:
CommandError: If bot is not in VC or user is not in same VC
"""
# If the bot is not in a vc of this server # If the bot is not in a vc of this server
if ctx.voice_client is None: if ctx.voice_client is None:
raise CommandError("I am not in a voice channel") raise CommandError("I am not in a voice channel")
@@ -73,9 +100,18 @@ async def leave_vc(ctx: Context):
del last_activity[ctx.guild.id] del last_activity[ctx.guild.id]
# Auto-disconnect if inactive # ===================================
async def check_inactivity(bot): # Inactivity Management
"""Background task to check for inactive voice connections""" # ===================================
async def check_inactivity(bot: discord.Client) -> None:
"""
Background task to check for inactive voice connections
Auto-disconnects after 5 minutes of inactivity
Args:
bot: The Discord bot instance
"""
try: try:
current_time = asyncio.get_event_loop().time() current_time = asyncio.get_event_loop().time()
@@ -98,20 +134,34 @@ async def check_inactivity(bot):
print(f"Error in inactivity checker: {e}") print(f"Error in inactivity checker: {e}")
# Update activity timestamp when playing def update_activity(guild_id: int) -> None:
def update_activity(guild_id): """
"""Call this when a song starts playing""" Update activity timestamp when a song starts playing
Args:
guild_id: Discord guild/server ID
"""
last_activity[guild_id] = asyncio.get_event_loop().time() last_activity[guild_id] = asyncio.get_event_loop().time()
# Interactive buttons for queue control # ===================================
# Queue Display & Controls
# ===================================
class QueueControls(View): class QueueControls(View):
def __init__(self, ctx): """Interactive buttons for queue control"""
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
def __init__(self, ctx: Context) -> None:
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
self.ctx = ctx self.ctx = ctx
async def refresh_message(self, interaction: discord.Interaction): async def refresh_message(self, interaction: discord.Interaction) -> None:
"""Helper to regenerate the embed and edit the message""" """
Helper to regenerate the embed and edit the message
Args:
interaction: Discord interaction from button press
"""
try: try:
# Generate new embed # Generate new embed
embed, view = await generate_queue_ui(self.ctx) embed, view = await generate_queue_ui(self.ctx)
@@ -119,10 +169,13 @@ class QueueControls(View):
except Exception as e: except Exception as e:
# Fallback if edit fails # Fallback if edit fails
if not interaction.response.is_done(): if not interaction.response.is_done():
await interaction.response.send_message("Refreshed, but something went wrong updating the display.", ephemeral=True) await interaction.response.send_message(
"Refreshed, but something went wrong updating the display.",
ephemeral=True
)
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary) @discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
async def skip_button(self, interaction: discord.Interaction, button: Button): async def skip_button(self, interaction: discord.Interaction, button: Button) -> None:
if interaction.user not in self.ctx.voice_client.channel.members: if interaction.user not in self.ctx.voice_client.channel.members:
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True) await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
return return
@@ -130,11 +183,6 @@ class QueueControls(View):
# Loop logic check # Loop logic check
loop_mode = await queue.get_loop_mode(self.ctx.guild.id) loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
# Logic mimics the command
if loop_mode == 'song':
# Just restart current song effectively but here we assume standard skip behavior for button
pass
# Perform the skip # Perform the skip
await queue.pop(self.ctx.guild.id, True, skip_mode=True) await queue.pop(self.ctx.guild.id, True, skip_mode=True)
if self.ctx.voice_client: if self.ctx.voice_client:
@@ -144,35 +192,45 @@ class QueueControls(View):
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary) @discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
async def shuffle_button(self, interaction: discord.Interaction, button: Button): async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.shuffle_queue(self.ctx.guild.id) await queue.shuffle_queue(self.ctx.guild.id)
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary) @discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
async def loop_button(self, interaction: discord.Interaction, button: Button): async def loop_button(self, interaction: discord.Interaction, button: Button) -> None:
current_mode = await queue.get_loop_mode(self.ctx.guild.id) current_mode = await queue.get_loop_mode(self.ctx.guild.id)
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off') new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
await queue.set_loop_mode(self.ctx.guild.id, new_mode) await queue.set_loop_mode(self.ctx.guild.id, new_mode)
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger) @discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
async def clear_button(self, interaction: discord.Interaction, button: Button): async def clear_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.clear(self.ctx.guild.id) await queue.clear(self.ctx.guild.id)
if self.ctx.voice_client and self.ctx.voice_client.is_playing(): if self.ctx.voice_client and self.ctx.voice_client.is_playing():
self.ctx.voice_client.stop() self.ctx.voice_client.stop()
await self.refresh_message(interaction) await self.refresh_message(interaction)
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray) @discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
async def refresh_button(self, interaction: discord.Interaction, button: Button): async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None:
await self.refresh_message(interaction) await self.refresh_message(interaction)
async def generate_queue_ui(ctx: Context):
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
"""
Generate the queue embed and controls
Args:
ctx: Command context
Returns:
Tuple of (embed, view) for displaying queue
"""
guild_id = ctx.guild.id guild_id = ctx.guild.id
server = ctx.guild server = ctx.guild
# Fetch all data # Fetch all data
n, songs = await queue.grab_songs(guild_id) n, songs = await queue.grab_songs(guild_id)
current = await queue.get_current_song(guild_id) # Returns title, thumbnail, url current = await queue.get_current_song(guild_id)
loop_mode = await queue.get_loop_mode(guild_id) loop_mode = await queue.get_loop_mode(guild_id)
volume = await queue.get_volume(guild_id) volume = await queue.get_volume(guild_id)
effect = await queue.get_effect(guild_id) effect = await queue.get_effect(guild_id)
@@ -183,10 +241,10 @@ async def generate_queue_ui(ctx: Context):
# Map loop mode to nicer text # Map loop mode to nicer text
loop_map = { loop_map = {
'off': {'emoji': '⏹️', 'text': 'Off'}, 'off': {'emoji': '⏹️', 'text': 'Off'},
'song': {'emoji': '🔂', 'text': 'Song'}, 'song': {'emoji': '🔂', 'text': 'Song'},
'queue': {'emoji': '🔁', 'text': 'Queue'} 'queue': {'emoji': '🔁', 'text': 'Queue'}
} }
loop_info = loop_map.get(loop_mode, loop_map['off']) loop_info = loop_map.get(loop_mode, loop_map['off'])
loop_emoji = loop_info['emoji'] loop_emoji = loop_info['emoji']
loop_text = loop_info['text'] loop_text = loop_info['text']
@@ -197,11 +255,9 @@ async def generate_queue_ui(ctx: Context):
# Progress Bar Logic # Progress Bar Logic
progress_bar = "" progress_bar = ""
# Only show bar if duration > 0 (prevents weird 00:00 bars)
if duration > 0: if duration > 0:
bar_length = 16 bar_length = 16
filled = int((percentage / 100) * bar_length) filled = int((percentage / 100) * bar_length)
# Ensure filled isn't bigger than length
filled = min(filled, bar_length) filled = min(filled, bar_length)
bar_str = '' * filled + '🔘' + '' * (bar_length - filled) bar_str = '' * filled + '🔘' + '' * (bar_length - filled)
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`" progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
@@ -215,14 +271,11 @@ async def generate_queue_ui(ctx: Context):
description = "## 💤 Nothing is playing\nUse `/play` to start the party!" description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
else: else:
# Create Hyperlink [Title](URL) # Create Hyperlink [Title](URL)
# If no URL exists, link to Discord homepage as fallback or just bold
if url and url.startswith("http"): if url and url.startswith("http"):
song_link = f"[{title}]({url})" song_link = f"[{title}]({url})"
else: else:
song_link = f"**{title}**" song_link = f"**{title}**"
# CLEARER STATUS LINE:
# Loop: Mode | Effect: Name | Vol: %
description = ( description = (
f"## 💿 Now Playing\n" f"## 💿 Now Playing\n"
f"### {song_link}\n" f"### {song_link}\n"
@@ -241,7 +294,7 @@ async def generate_queue_ui(ctx: Context):
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False) embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
remaining = (n) - 9 # Approx calculation based on your grabbing logic remaining = n - 9
if remaining > 0: if remaining > 0:
embed.set_footer(text=f"Waitlist: {remaining} more songs...") embed.set_footer(text=f"Waitlist: {remaining} more songs...")
else: else:
@@ -251,23 +304,38 @@ async def generate_queue_ui(ctx: Context):
if thumb and isinstance(thumb, str) and thumb.startswith("http"): if thumb and isinstance(thumb, str) and thumb.startswith("http"):
embed.set_thumbnail(url=thumb) embed.set_thumbnail(url=thumb)
elif server.icon: elif server.icon:
# Fallback to server icon
embed.set_thumbnail(url=server.icon.url) embed.set_thumbnail(url=server.icon.url)
view = QueueControls(ctx) view = QueueControls(ctx)
return embed, view return embed, view
# The command entry point calls this
async def display_server_queue(ctx: Context, songs, n): async def display_server_queue(ctx: Context, songs: list, n: int) -> None:
"""
Display the server's queue with interactive controls
Args:
ctx: Command context
songs: List of songs in queue
n: Total number of songs
"""
embed, view = await generate_queue_ui(ctx) embed, view = await generate_queue_ui(ctx)
await ctx.send(embed=embed, view=view) await ctx.send(embed=embed, view=view)
# Build a display message for queuing a new song
async def queue_message(ctx: Context, data: dict): async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
"""
Display a message when a song is queued
Args:
ctx: Command context
data: Song data dictionary
"""
msg = discord.Embed( msg = discord.Embed(
title="🎵 Song Queued", title="🎵 Song Queued",
description=f"**{data['title']}**", description=f"**{data['title']}**",
color=discord.Color.green()) color=discord.Color.green()
)
msg.set_thumbnail(url=data['thumbnail']) msg.set_thumbnail(url=data['thumbnail'])
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True) msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
@@ -276,9 +344,23 @@ async def queue_message(ctx: Context, data: dict):
await ctx.send(embed=msg) await ctx.send(embed=msg)
# Converts seconds into more readable format
def format_time(seconds): # ===================================
# Utility Functions
# ===================================
def format_time(seconds: int | float) -> str:
"""
Convert seconds into readable time format (MM:SS or HH:MM:SS)
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
try: try:
seconds = int(seconds)
minutes, seconds = divmod(seconds, 60) minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60) hours, minutes = divmod(minutes, 60)

297
config.py
View File

@@ -1,115 +1,198 @@
# config.py # config.py
# This file should parse all configurations within the bot # Modern configuration management using environment variables
import os
import discord import discord
from discord import Color from discord import Color
import json from dotenv import load_dotenv
from typing import Optional
# Read data from JSON file in ./data/config.json # Load environment variables from .env file
def read_data(): load_dotenv()
with open("./data/config.json", "r") as file:
return json.load(file)
raise Exception("Could not load config data") # ===================================
# Environment Detection
# ===================================
def get_spotify_creds(): ENVIRONMENT = os.getenv("ENVIRONMENT", "dev").lower()
data = read_data() IS_PRODUCTION = ENVIRONMENT == "live"
data = data.get("spotify") IS_DEVELOPMENT = ENVIRONMENT == "dev"
SCID = data.get("SCID")
secret = data.get("SECRET")
return SCID, secret
# Reading prefix
def get_prefix():
data = read_data()
prefix = data.get('prefix')
if prefix:
return prefix
raise Exception("Missing config data: prefix")
# Fetch the bot secret token
def get_login(bot):
data = read_data()
if data is False or data.get(f"{bot}bot") is False:
raise Exception(f"Missing config data: {bot}bot")
data = data.get(f"{bot}bot")
return data.get("secret")
# Read the status and text data
def get_status():
data = read_data()
if data is False or data.get('status') is False:
raise Exception("Missing config data: status")
# Find type
data = data.get('status')
return translate_status(
data.get('type'),
data.get('text'),
data.get('link')
)
# Get colors from colorscheme
def get_color(color):
data = read_data()
if data is False or data.get('status') is False:
raise Exception("Missing config data: color")
# Grab color
string_value = data.get("colorscheme").get(color)
hex_value = Color.from_str(string_value)
return hex_value
# Taking JSON variables and converting them into a presence
# Use None url incase not provided
def translate_status(status_type, status_text, status_url=None):
if status_type == "playing":
return discord.Activity(
type=discord.ActivityType.playing,
name=status_text
)
elif status_type == "streaming":
return discord.Activity(
type=discord.ActivityType.streaming,
name=status_text,
url=status_url
)
elif status_type == "listening":
return discord.Activity(
type=discord.ActivityType.listening,
name=status_text
)
elif status_type == "watching":
return discord.Activity(
type=discord.ActivityType.watching,
name=status_text
)
elif status_type == "competing":
return discord.Activity(
type=discord.ActivityType.competing,
name=status_text
)
#TODO
# Implement custom status type
# ===================================
# Discord Configuration
# ===================================
def get_discord_token() -> str:
"""Get the appropriate Discord token based on environment"""
if IS_PRODUCTION:
token = os.getenv("DISCORD_TOKEN_LIVE")
if not token:
raise ValueError("DISCORD_TOKEN_LIVE not found in environment!")
return token
else: else:
raise Exception(f"Invalid status type: {status_type}") token = os.getenv("DISCORD_TOKEN_DEV")
if not token:
raise ValueError("DISCORD_TOKEN_DEV not found in environment!")
return token
def get_prefix() -> str:
"""Get command prefix (default: =)"""
return os.getenv("DISCORD_PREFIX", "=")
# ===================================
# Spotify Configuration
# ===================================
def get_spotify_creds() -> tuple[str, str]:
"""Get Spotify API credentials"""
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Spotify credentials not found in environment!")
return client_id, client_secret
# ===================================
# Database Configuration
# ===================================
def get_db_path() -> str:
"""Get SQLite database path"""
return os.getenv("DB_PATH", "./data/music.db")
# Future PostgreSQL config
def get_postgres_url() -> Optional[str]:
"""Get PostgreSQL connection URL (for future migration)"""
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT", "5432")
name = os.getenv("DB_NAME")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
if all([host, name, user, password]):
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
return None
# ===================================
# Bot Status/Presence
# ===================================
def get_status() -> discord.Activity:
"""Get bot status/presence"""
status_type = os.getenv("STATUS_TYPE", "listening").lower()
status_text = os.getenv("STATUS_TEXT", "Zilean's Theme")
status_url = os.getenv("STATUS_URL")
return translate_status(status_type, status_text, status_url)
def translate_status(
status_type: str,
status_text: str,
status_url: Optional[str] = None
) -> discord.Activity:
"""Convert status type string to Discord Activity"""
status_map = {
"playing": discord.ActivityType.playing,
"streaming": discord.ActivityType.streaming,
"listening": discord.ActivityType.listening,
"watching": discord.ActivityType.watching,
"competing": discord.ActivityType.competing,
}
activity_type = status_map.get(status_type)
if not activity_type:
raise ValueError(f"Invalid status type: {status_type}")
# Streaming requires URL
if status_type == "streaming":
if not status_url:
raise ValueError("Streaming status requires STATUS_URL")
return discord.Activity(
type=activity_type,
name=status_text,
url=status_url
)
return discord.Activity(type=activity_type, name=status_text)
# ===================================
# Color Scheme
# ===================================
def get_color(color_name: str) -> Color:
"""Get color from environment (hex format)"""
color_map = {
"primary": os.getenv("COLOR_PRIMARY", "#7289DA"),
"success": os.getenv("COLOR_SUCCESS", "#43B581"),
"error": os.getenv("COLOR_ERROR", "#F04747"),
"warning": os.getenv("COLOR_WARNING", "#FAA61A"),
}
hex_value = color_map.get(color_name.lower())
if not hex_value:
# Default to Discord blurple
hex_value = "#7289DA"
return Color.from_str(hex_value)
# ===================================
# Logging Configuration
# ===================================
def get_log_level() -> str:
"""Get logging level from environment"""
return os.getenv("LOG_LEVEL", "INFO").upper()
# ===================================
# Legacy Support (for backward compatibility)
# ===================================
def get_login(bot: str) -> str:
"""Legacy function - maps to new get_discord_token()"""
# Ignore the 'bot' parameter, use ENVIRONMENT instead
return get_discord_token()
# ===================================
# Validation
# ===================================
def validate_config():
"""Validate that all required config is present"""
errors = []
# Check Discord token
try:
get_discord_token()
except ValueError as e:
errors.append(str(e))
# Check Spotify creds
try:
get_spotify_creds()
except ValueError as e:
errors.append(str(e))
if errors:
error_msg = "\n".join(errors)
raise ValueError(f"Configuration errors:\n{error_msg}")
print(f"✅ Configuration validated (Environment: {ENVIRONMENT})")
# ===================================
# Startup Info
# ===================================
def print_config_info():
"""Print configuration summary (without secrets!)"""
print("=" * 50)
print("🎵 Groovy-Zilean Configuration")
print("=" * 50)
print(f"Environment: {ENVIRONMENT.upper()}")
print(f"Prefix: {get_prefix()}")
print(f"Database: {get_db_path()}")
print(f"Log Level: {get_log_level()}")
print(f"Spotify: {'Configured ✅' if os.getenv('SPOTIFY_CLIENT_ID') else 'Not configured ❌'}")
print("=" * 50)

13
main.py
View File

@@ -3,6 +3,16 @@ from bot import Groovy
import config import config
import help import help
# Validate configuration before starting
try:
config.validate_config()
config.print_config_info()
except ValueError as e:
print(f"❌ Configuration Error:\n{e}")
print("\n💡 Tip: Copy .env.example to .env and fill in your values")
exit(1)
# Initialize bot with validated config
client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all()) client = Groovy(command_prefix=config.get_prefix(), intents=discord.Intents.all())
@client.event @client.event
@@ -25,4 +35,5 @@ async def on_voice_state_update(member, before, after):
except Exception as e: except Exception as e:
print(f"Error auto-disconnecting: {e}") print(f"Error auto-disconnecting: {e}")
client.run(config.get_login("live")) # Run bot with environment-appropriate token
client.run(config.get_discord_token())

31
mypy.ini Normal file
View File

@@ -0,0 +1,31 @@
# mypy configuration for groovy-zilean
# Type checking configuration that's practical for a Discord bot
[mypy]
# Python version
python_version = 3.13
# Ignore missing imports for libraries without type stubs
# Discord.py, spotipy, yt-dlp don't have complete type stubs
ignore_missing_imports = True
# Be strict about our own code
# Start lenient, can tighten later
disallow_untyped_defs = False
check_untyped_defs = True
# Too noisy with discord.py
warn_return_any = False
warn_unused_configs = True
# Exclude patterns
exclude = venv/
# Per-module overrides
[mypy-discord.*]
ignore_missing_imports = True
[mypy-spotipy.*]
ignore_missing_imports = True
[mypy-yt_dlp.*]
ignore_missing_imports = True

View File

@@ -1,12 +1,14 @@
# Core bot framework # Core bot framework
discord.py==2.6.4 discord.py>=2.6.4
aiohttp==3.8.4 aiohttp>=3.9.0
PyNaCl==1.5.0 PyNaCl>=1.5.0
spotipy==2.23.0 spotipy>=2.23.0
# Configuration management
python-dotenv>=1.0.0
# YouTube extractor # YouTube extractor
yt-dlp>=2025.10.14 yt-dlp>=2025.10.14
# System dependencies # Audio metadata (if needed by yt-dlp)
PyAudio==0.2.13 mutagen>=1.47.0
mutagen==1.46.0