""" Utility functions for Groovy-Zilean music bot Handles voice channel operations, queue display, and inactivity tracking """ from typing import Any import discord from discord.ext.commands.context import Context from discord.ext.commands.converter import CommandError from discord.ui import Button, View import config from . import queue import asyncio # Track last activity time for each server last_activity: dict[int, float] = {} # =================================== # Voice Channel Management # =================================== async def join_vc(ctx: Context) -> discord.VoiceClient: """ Join or move to the user's voice channel Args: ctx: Command context Returns: The voice client connection Raises: CommandError: If user is not in a voice channel """ # Get the user's vc author_voice = getattr(ctx.author, "voice") if author_voice is None: raise CommandError("User is not in voice channel") # Get user's vc vc = getattr(author_voice, "channel") if vc is None: raise CommandError("Unable to find voice channel") # Join or move to the user's vc if ctx.voice_client is None: vc_client = await vc.connect() else: vc_client = await ctx.voice_client.move_to(vc) # Update last activity last_activity[ctx.guild.id] = asyncio.get_event_loop().time() return vc_client async def leave_vc(ctx: Context) -> None: """ Leave the voice channel and clean up Args: ctx: Command context Raises: CommandError: If bot is not in VC or user is not in same VC """ # If the bot is not in a vc of this server if ctx.voice_client is None: raise CommandError("I am not in a voice channel") # if user is not in voice of the server author_voice = getattr(ctx.author, "voice") if author_voice is None: raise CommandError("You are not in a voice channel") # Make sure both bot and User are in same vc vc = ctx.voice_client.channel author_vc = getattr(author_voice, "channel") if author_vc is None or vc != author_vc: raise CommandError("You are not in this voice channel") # Clear the queue for this server await queue.clear(ctx.guild.id) # Stop any currently playing audio if ctx.voice_client.is_playing(): ctx.voice_client.stop() # Disconnect with force to ensure it actually leaves try: await ctx.voice_client.disconnect(force=True) except Exception as e: print(f"Error disconnecting: {e}") # If regular disconnect fails, try cleanup await ctx.voice_client.cleanup() # Remove from activity tracker if ctx.guild.id in last_activity: del last_activity[ctx.guild.id] # =================================== # Inactivity Management # =================================== async def check_inactivity(bot: discord.Client) -> None: """ Background task to check for inactive voice connections Auto-disconnects after 5 minutes of inactivity Args: bot: The Discord bot instance """ try: current_time = asyncio.get_event_loop().time() for guild_id, last_time in list(last_activity.items()): # If inactive for more than 5 minutes if current_time - last_time > 300: # 300 seconds = 5 minutes # Find the guild and voice client guild = bot.get_guild(guild_id) if guild and guild.voice_client: # Check if not playing if not guild.voice_client.is_playing(): print(f"Auto-disconnecting from {guild.name} due to inactivity") await queue.clear(guild_id) try: await guild.voice_client.disconnect(force=True) except: pass del last_activity[guild_id] except Exception as e: print(f"Error in inactivity checker: {e}") def update_activity(guild_id: int) -> None: """ Update activity timestamp when a song starts playing Args: guild_id: Discord guild/server ID """ last_activity[guild_id] = asyncio.get_event_loop().time() # =================================== # Queue Display & Controls # =================================== class QueueControls(View): """Interactive buttons for queue control""" def __init__(self, ctx: Context) -> None: super().__init__(timeout=None) # No timeout allows buttons to stay active longer self.ctx = ctx async def refresh_message(self, interaction: discord.Interaction) -> None: """ Helper to regenerate the embed and edit the message Args: interaction: Discord interaction from button press """ try: # Generate new embed embed, view = await generate_queue_ui(self.ctx) await interaction.response.edit_message(embed=embed, view=view) except Exception as e: # Fallback if edit fails if not interaction.response.is_done(): await interaction.response.send_message( "Refreshed, but something went wrong updating the display.", ephemeral=True ) @discord.ui.button(label="â­ī¸ Skip", style=discord.ButtonStyle.primary) async def skip_button(self, interaction: discord.Interaction, button: Button) -> None: if interaction.user not in self.ctx.voice_client.channel.members: await interaction.response.send_message("❌ You must be in the voice channel!", ephemeral=True) return # Loop logic check loop_mode = await queue.get_loop_mode(self.ctx.guild.id) # Perform the skip await queue.pop(self.ctx.guild.id, True, skip_mode=True) if self.ctx.voice_client: self.ctx.voice_client.stop() # Refresh UI await self.refresh_message(interaction) @discord.ui.button(label="🔀 Shuffle", style=discord.ButtonStyle.secondary) async def shuffle_button(self, interaction: discord.Interaction, button: Button) -> None: await queue.shuffle_queue(self.ctx.guild.id) await self.refresh_message(interaction) @discord.ui.button(label="🔁 Loop", style=discord.ButtonStyle.secondary) async def loop_button(self, interaction: discord.Interaction, button: Button) -> None: current_mode = await queue.get_loop_mode(self.ctx.guild.id) new_mode = 'song' if current_mode == 'off' else ('queue' if current_mode == 'song' else 'off') await queue.set_loop_mode(self.ctx.guild.id, new_mode) await self.refresh_message(interaction) @discord.ui.button(label="đŸ—‘ī¸ Clear", style=discord.ButtonStyle.danger) async def clear_button(self, interaction: discord.Interaction, button: Button) -> None: await queue.clear(self.ctx.guild.id) if self.ctx.voice_client and self.ctx.voice_client.is_playing(): self.ctx.voice_client.stop() await self.refresh_message(interaction) @discord.ui.button(label="🔄 Refresh", style=discord.ButtonStyle.gray) async def refresh_button(self, interaction: discord.Interaction, button: Button) -> None: await self.refresh_message(interaction) async def generate_queue_ui(ctx: Context) -> tuple[discord.Embed, QueueControls]: """ Generate the queue embed and controls Args: ctx: Command context Returns: Tuple of (embed, view) for displaying queue """ guild_id = ctx.guild.id server = ctx.guild # Fetch all data n, songs = await queue.grab_songs(guild_id) current = await queue.get_current_song(guild_id) loop_mode = await queue.get_loop_mode(guild_id) volume = await queue.get_volume(guild_id) effect = await queue.get_effect(guild_id) elapsed, duration, percentage = await queue.get_current_progress(guild_id) # Configs effect_emoji = queue.get_effect_emoji(effect) # Map loop mode to nicer text loop_map = { 'off': {'emoji': 'âšī¸', 'text': 'Off'}, 'song': {'emoji': '🔂', 'text': 'Song'}, 'queue': {'emoji': '🔁', 'text': 'Queue'} } loop_info = loop_map.get(loop_mode, loop_map['off']) loop_emoji = loop_info['emoji'] loop_text = loop_info['text'] # Build Embed embed = discord.Embed(color=discord.Color.from_rgb(43, 45, 49)) embed.set_author(name=f"{server.name}'s Queue", icon_url=server.icon.url if server.icon else None) # Progress Bar Logic progress_bar = "" if duration > 0: bar_length = 16 filled = int((percentage / 100) * bar_length) filled = min(filled, bar_length) bar_str = 'â–Ŧ' * filled + '🔘' + 'â–Ŧ' * (bar_length - filled) progress_bar = f"\n`{format_time(elapsed)}` {bar_str} `{format_time(duration)}`" # Now Playing Header title = current.get('title', 'Nothing Playing') thumb = current.get('thumbnail') url = current.get('url', '') if title == 'Nothing': description = "## 💤 Nothing is playing\nUse `/play` to start the party!" else: # Create Hyperlink [Title](URL) if url and url.startswith("http"): song_link = f"[{title}]({url})" else: song_link = f"**{title}**" description = ( f"## đŸ’ŋ Now Playing\n" f"### {song_link}\n" f"{loop_emoji} **Loop: {loop_text}** | {effect_emoji} **Effect: {effect}** | 🔊 **{volume}%**" f"{progress_bar}" ) embed.description = description # Queue List if len(songs) > 0: queue_text = "" for i, song in enumerate(songs[:10]): dur = '' if isinstance(song[1], str) else f" | `{format_time(song[1])}`" queue_text += f"**{i+1}.** {song[0]}{dur}\n" embed.add_field(name="âŗ Up Next", value=queue_text, inline=False) remaining = n - 9 if remaining > 0: embed.set_footer(text=f"Waitlist: {remaining} more songs...") else: embed.add_field(name="âŗ Up Next", value="*The queue is empty.*") # Set Thumbnail safely if thumb and isinstance(thumb, str) and thumb.startswith("http"): embed.set_thumbnail(url=thumb) elif server.icon: embed.set_thumbnail(url=server.icon.url) view = QueueControls(ctx) return embed, view async def display_server_queue(ctx: Context, songs: list, n: int) -> None: """ Display the server's queue with interactive controls Args: ctx: Command context songs: List of songs in queue n: Total number of songs """ embed, view = await generate_queue_ui(ctx) await ctx.send(embed=embed, view=view) async def queue_message(ctx: Context, data: dict[str, Any]) -> None: """ Display a message when a song is queued Args: ctx: Command context data: Song data dictionary """ msg = discord.Embed( title="đŸŽĩ Song Queued", description=f"**{data['title']}**", color=discord.Color.green() ) msg.set_thumbnail(url=data['thumbnail']) msg.add_field(name="âąī¸ Duration", value=format_time(data['duration']), inline=True) msg.add_field(name="📍 Position", value=f"#{data['position']}", inline=True) msg.set_footer(text=f"Queued by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url) await ctx.send(embed=msg) # =================================== # Utility Functions # =================================== def format_time(seconds: int | float) -> str: """ Convert seconds into readable time format (MM:SS or HH:MM:SS) Args: seconds: Time in seconds Returns: Formatted time string """ try: seconds = int(seconds) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours > 0: return f"{hours}:{minutes:02d}:{seconds:02d}" elif minutes > 0: return f"{minutes}:{seconds:02d}" else: return f"0:{seconds:02d}" except: return "Unknown"