Files
groovy-zilean/cogs/music/util.py

375 lines
12 KiB
Python

"""
Utility functions for Groovy-Zilean music bot
Handles voice channel operations, queue display, and inactivity tracking
"""
from typing import Any
import discord
from discord.ext.commands.context import Context
from discord.ext.commands.converter import CommandError
from discord.ui import Button, View
import config
from . import queue
import asyncio
# Track last activity time for each server
last_activity: dict[int, float] = {}
# ===================================
# Voice Channel Management
# ===================================
async def join_vc(ctx: Context) -> discord.VoiceClient:
"""
Join or move to the user's voice channel
Args:
ctx: Command context
Returns:
The voice client connection
Raises:
CommandError: If user is not in a voice channel
"""
# Get the user's vc
author_voice = getattr(ctx.author, "voice")
if author_voice is None:
raise CommandError("User is not in voice channel")
# Get user's vc
vc = getattr(author_voice, "channel")
if vc is None:
raise CommandError("Unable to find voice channel")
# Join or move to the user's vc
if ctx.voice_client is None:
vc_client = await vc.connect()
else:
vc_client = await ctx.voice_client.move_to(vc)
# Update last activity
last_activity[ctx.guild.id] = asyncio.get_event_loop().time()
return vc_client
async def leave_vc(ctx: Context) -> None:
"""
Leave the voice channel and clean up
Args:
ctx: Command context
Raises:
CommandError: If bot is not in VC or user is not in same VC
"""
# If the bot is not in a vc of this server
if ctx.voice_client is None:
raise CommandError("I am not in a voice channel")
# if user is not in voice of the server
author_voice = getattr(ctx.author, "voice")
if author_voice is None:
raise CommandError("You are not in a voice channel")
# Make sure both bot and User are in same vc
vc = ctx.voice_client.channel
author_vc = getattr(author_voice, "channel")
if author_vc is None or vc != author_vc:
raise CommandError("You are not in this voice channel")
# Clear the queue for this server
await queue.clear(ctx.guild.id)
# Stop any currently playing audio
if ctx.voice_client.is_playing():
ctx.voice_client.stop()
# Disconnect with force to ensure it actually leaves
try:
await ctx.voice_client.disconnect(force=True)
except Exception as e:
print(f"Error disconnecting: {e}")
# If regular disconnect fails, try cleanup
await ctx.voice_client.cleanup()
# Remove from activity tracker
if ctx.guild.id in last_activity:
del last_activity[ctx.guild.id]
# ===================================
# Inactivity Management
# ===================================
async def check_inactivity(bot: discord.Client) -> None:
"""
Background task to check for inactive voice connections
Auto-disconnects after 5 minutes of inactivity
Args:
bot: The Discord bot instance
"""
try:
current_time = asyncio.get_event_loop().time()
for guild_id, last_time in list(last_activity.items()):
# If inactive for more than 5 minutes
if current_time - last_time > 300: # 300 seconds = 5 minutes
# Find the guild and voice client
guild = bot.get_guild(guild_id)
if guild and guild.voice_client:
# Check if not playing
if not guild.voice_client.is_playing():
print(f"Auto-disconnecting from {guild.name} due to inactivity")
await queue.clear(guild_id)
try:
await guild.voice_client.disconnect(force=True)
except:
pass
del last_activity[guild_id]
except Exception as e:
print(f"Error in inactivity checker: {e}")
def update_activity(guild_id: int) -> None:
"""
Update activity timestamp when a song starts playing
Args:
guild_id: Discord guild/server ID
"""
last_activity[guild_id] = asyncio.get_event_loop().time()
# ===================================
# Queue Display & Controls
# ===================================
class QueueControls(View):
"""Interactive buttons for queue control"""
def __init__(self, ctx: Context) -> None:
super().__init__(timeout=None) # No timeout allows buttons to stay active longer
self.ctx = ctx
async def refresh_message(self, interaction: discord.Interaction) -> None:
"""
Helper to regenerate the embed and edit the message
Args:
interaction: Discord interaction from button press
"""
try:
# Generate new embed
embed, view = await generate_queue_ui(self.ctx)
await interaction.response.edit_message(embed=embed, view=view)
except Exception as e:
# Fallback if edit fails
if not interaction.response.is_done():
await interaction.response.send_message(
"Refreshed, but something went wrong updating the display.",
ephemeral=True
)
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.primary)
async def skip_button(self, interaction: discord.Interaction, button: Button) -> None:
if interaction.user not in self.ctx.voice_client.channel.members:
await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True)
return
# Loop logic check
loop_mode = await queue.get_loop_mode(self.ctx.guild.id)
# Perform the skip
await queue.pop(self.ctx.guild.id, True, skip_mode=True)
if self.ctx.voice_client:
self.ctx.voice_client.stop()
# Refresh UI
await self.refresh_message(interaction)
@discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary)
async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.shuffle_queue(self.ctx.guild.id)
await self.refresh_message(interaction)
@discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary)
async def loop_button(self, interaction: discord.Interaction, button: Button) -> None:
current_mode = await queue.get_loop_mode(self.ctx.guild.id)
new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off')
await queue.set_loop_mode(self.ctx.guild.id, new_mode)
await self.refresh_message(interaction)
@discord.ui.button(label="🗑️ Clear", style=discord.ButtonStyle.danger)
async def clear_button(self, interaction: discord.Interaction, button: Button) -> None:
await queue.clear(self.ctx.guild.id)
if self.ctx.voice_client and self.ctx.voice_client.is_playing():
self.ctx.voice_client.stop()
await self.refresh_message(interaction)
@discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray)
async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None:
await self.refresh_message(interaction)
async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]:
"""
Generate the queue embed and controls
Args:
ctx: Command context
Returns:
Tuple of (embed, view) for displaying queue
"""
guild_id = ctx.guild.id
server = ctx.guild
# Fetch all data
n, songs = await queue.grab_songs(guild_id)
current = await queue.get_current_song(guild_id)
loop_mode = await queue.get_loop_mode(guild_id)
volume = await queue.get_volume(guild_id)
effect = await queue.get_effect(guild_id)
elapsed, duration, percentage = await queue.get_current_progress(guild_id)
# Configs
effect_emoji = queue.get_effect_emoji(effect)
# Map loop mode to nicer text
loop_map = {
'off': {'emoji': '⏹️', 'text': 'Off'},
'song': {'emoji': '🔂', 'text': 'Song'},
'queue': {'emoji': '🔁', 'text': 'Queue'}
}
loop_info = loop_map.get(loop_mode, loop_map['off'])
loop_emoji = loop_info['emoji']
loop_text = loop_info['text']
# Build Embed
embed = discord.Embed(color=discord.Color.from_rgb(43, 45, 49))
embed.set_author(name=f"{server.name}'s Queue", icon_url=server.icon.url if server.icon else None)
# Progress Bar Logic
progress_bar = ""
if duration > 0:
bar_length = 16
filled = int((percentage / 100) * bar_length)
filled = min(filled, bar_length)
bar_str = '' * filled + '🔘' + '' * (bar_length - filled)
progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`"
# Now Playing Header
title = current.get('title', 'Nothing Playing')
thumb = current.get('thumbnail')
url = current.get('url', '')
if title == 'Nothing':
description = "## 💤 Nothing is playing\nUse `/play` to start the party!"
else:
# Create Hyperlink [Title](URL)
if url and url.startswith("http"):
song_link = f"[{title}]({url})"
else:
song_link = f"**{title}**"
description = (
f"## 💿 Now Playing\n"
f"### {song_link}\n"
f"{loop_emoji} **Loop: {loop_text}** | {effect_emoji} **Effect: {effect}** | 🔊 **{volume}%**"
f"{progress_bar}"
)
embed.description = description
# Queue List
if len(songs) > 0:
queue_text = ""
for i, song in enumerate(songs[:10]):
dur = '' if isinstance(song[1], str) else f" | `{format_time(song[1])}`"
queue_text += f"**{i+1}.** {song[0]}{dur}\n"
embed.add_field(name="⏳ Up Next", value=queue_text, inline=False)
remaining = n - 9
if remaining > 0:
embed.set_footer(text=f"Waitlist: {remaining} more songs...")
else:
embed.add_field(name="⏳ Up Next", value="*The queue is empty.*")
# Set Thumbnail safely
if thumb and isinstance(thumb, str) and thumb.startswith("http"):
embed.set_thumbnail(url=thumb)
elif server.icon:
embed.set_thumbnail(url=server.icon.url)
view = QueueControls(ctx)
return embed, view
async def display_server_queue(ctx: Context, songs: list, n: int) -> None:
"""
Display the server's queue with interactive controls
Args:
ctx: Command context
songs: List of songs in queue
n: Total number of songs
"""
embed, view = await generate_queue_ui(ctx)
await ctx.send(embed=embed, view=view)
async def queue_message(ctx: Context, data: dict[str, Any]) -> None:
"""
Display a message when a song is queued
Args:
ctx: Command context
data: Song data dictionary
"""
msg = discord.Embed(
title="🎵 Song Queued",
description=f"**{data['title']}**",
color=discord.Color.green()
)
msg.set_thumbnail(url=data['thumbnail'])
msg.add_field(name="⏱️ Duration", value=format_time(data['duration']), inline=True)
msg.add_field(name="📍 Position", value=f"#{data['position']}", inline=True)
msg.set_footer(text=f"Queued by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url)
await ctx.send(embed=msg)
# ===================================
# Utility Functions
# ===================================
def format_time(seconds: int | float) -> str:
"""
Convert seconds into readable time format (MM:SS or HH:MM:SS)
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
try:
seconds = int(seconds)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{seconds:02d}"
elif minutes > 0:
return f"{minutes}:{seconds:02d}"
else:
return f"0:{seconds:02d}"
except:
return "Unknown"