435 lines
14 KiB
Python
435 lines
14 KiB
Python
from discord.ext import commands
|
|
from discord.ext.commands.context import Context
|
|
from discord import app_commands
|
|
import discord
|
|
|
|
import cogs.music.util as util
|
|
import cogs.music.queue as queue
|
|
import cogs.music.translate as translate
|
|
|
|
|
|
from cogs.music.help import music_help
|
|
|
|
import spotipy
|
|
from spotipy.oauth2 import SpotifyClientCredentials
|
|
|
|
|
|
# Fix this pls
|
|
|
|
import json
|
|
#from .. import config
|
|
# Read data from JSON file in ./data/config.json
|
|
def read_data():
|
|
with open("./data/config.json", "r") as file:
|
|
return json.load(file)
|
|
|
|
raise Exception("Could not load config data")
|
|
|
|
|
|
def get_spotify_creds():
|
|
data = read_data()
|
|
data = data.get("spotify")
|
|
|
|
SCID = data.get("SCID")
|
|
secret = data.get("SECRET")
|
|
|
|
return SCID, secret
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class music(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
|
|
self.name = "🎶 Music"
|
|
self.emoji = "🎶"
|
|
|
|
help_command = music_help()
|
|
help_command.cog = self
|
|
self.help_command = help_command
|
|
|
|
SCID, secret = get_spotify_creds()
|
|
# Authentication - without user
|
|
client_credentials_manager = SpotifyClientCredentials(client_id=SCID,
|
|
client_secret=secret)
|
|
|
|
self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
|
|
|
queue.initialize_tables()
|
|
|
|
|
|
@commands.command(
|
|
help="Connects to your current voice channel",
|
|
aliases=['connect'])
|
|
async def join(self, ctx: Context):
|
|
await util.join_vc(ctx)
|
|
await ctx.message.add_reaction('👍')
|
|
|
|
|
|
@commands.command(
|
|
help="Leaves the voice chat if the bot is present",
|
|
aliases=['disconnect'])
|
|
async def leave(self, ctx: Context):
|
|
await util.leave_vc(ctx)
|
|
await ctx.message.add_reaction('👍')
|
|
|
|
|
|
# HYBRID COMMAND - works as both =play and /play
|
|
@commands.hybrid_command(
|
|
name="play",
|
|
description="Queue a song to play",
|
|
aliases=['p'])
|
|
@app_commands.describe(query="YouTube URL, Spotify link, or search query")
|
|
async def play(self, ctx: Context, *, query: str):
|
|
"""Queues a song into the bot"""
|
|
if ctx.guild is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
server = ctx.guild.id
|
|
|
|
# For slash commands, defer the response since fetching takes time
|
|
if ctx.interaction:
|
|
await ctx.defer()
|
|
|
|
await util.join_vc(ctx)
|
|
|
|
# Different responses for slash vs prefix
|
|
if not ctx.interaction:
|
|
await ctx.message.add_reaction('👍')
|
|
|
|
msg = await ctx.send("Fetching song(s)...")
|
|
|
|
#TODO potentially save requests before getting stream link
|
|
# Grab video details such as title thumbnail duration
|
|
audio = await translate.main(query, self.sp)
|
|
|
|
await msg.delete()
|
|
|
|
if len(audio) == 0:
|
|
if not ctx.interaction:
|
|
await ctx.message.add_reaction('🚫')
|
|
await ctx.send("❌ Failed to find song!")
|
|
return
|
|
|
|
|
|
#TODO make sure user isn't queuing in dm for some stupid reason
|
|
|
|
# Setup first song's position
|
|
audio[0]['position'] = await queue.add_song(
|
|
server,
|
|
audio[0],
|
|
ctx.author.display_name)
|
|
|
|
# Add any other songs
|
|
for song in audio[1:]:
|
|
await queue.add_song(
|
|
server,
|
|
song,
|
|
ctx.author.display_name)
|
|
|
|
await util.queue_message(ctx, audio[0])
|
|
|
|
|
|
if await queue.is_server_playing(server):
|
|
return
|
|
|
|
await queue.update_server(server, True)
|
|
await queue.play(ctx)
|
|
|
|
|
|
@commands.command(
|
|
help="Queue a song to play next (top of queue)",
|
|
aliases=['pt', 'pn', 'playnext'])
|
|
async def playtop(self, ctx: Context, *, url=None):
|
|
"""Queue a song at the top of the queue (plays next)"""
|
|
if url is None:
|
|
raise commands.CommandError("Must provide a link or search query")
|
|
elif ctx.guild is None:
|
|
raise commands.CommandError("Command must be issued in a server")
|
|
|
|
server = ctx.guild.id
|
|
|
|
await util.join_vc(ctx)
|
|
await ctx.message.add_reaction('⏭️')
|
|
|
|
msg = await ctx.send("Fetching song(s) to play next...")
|
|
async with ctx.typing():
|
|
audio = await translate.main(url, self.sp)
|
|
|
|
await msg.delete()
|
|
|
|
if len(audio) == 0:
|
|
await ctx.message.add_reaction('🚫')
|
|
await ctx.send("Failed to find song!")
|
|
return
|
|
|
|
# Add songs to TOP of queue (position 0, then 1, then 2...)
|
|
for i, song in enumerate(audio):
|
|
await queue.add_song(
|
|
server,
|
|
song,
|
|
ctx.author.display_name,
|
|
position=i) # Insert at top
|
|
|
|
# Show first song that was added
|
|
audio[0]['position'] = 0
|
|
await util.queue_message(ctx, audio[0])
|
|
|
|
if await queue.is_server_playing(server):
|
|
return
|
|
|
|
await queue.update_server(server, True)
|
|
await queue.play(ctx)
|
|
|
|
|
|
@commands.hybrid_command(
|
|
name="queue",
|
|
description="Display the current music queue",
|
|
aliases=['q', 'songs'])
|
|
async def queue_cmd(self, ctx: Context):
|
|
"""Display the current music queue"""
|
|
server = ctx.guild
|
|
|
|
# Perform usual checks
|
|
if server is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
# Grab all songs from this server
|
|
n, songs = await queue.grab_songs(server.id)
|
|
|
|
# Check once more
|
|
if len(songs) == 0 and await queue.is_server_playing(ctx.guild.id) == False:
|
|
await ctx.send("🚫 This server has no queue currently. Start the party by queuing up a song!")
|
|
return
|
|
|
|
# Display songs
|
|
await util.display_server_queue(ctx, songs, n)
|
|
|
|
|
|
@commands.hybrid_command(
|
|
name="skip",
|
|
description="Skip the current song",
|
|
aliases=['s'])
|
|
@app_commands.describe(count="Number of songs to skip (default: 1)")
|
|
async def skip(self, ctx: Context, count: int = 1):
|
|
"""Skips the current song that is playing"""
|
|
server = ctx.guild
|
|
|
|
if server is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
if ctx.voice_client is None:
|
|
await ctx.send("❌ I'm not in a voice channel!", ephemeral=True)
|
|
return
|
|
|
|
if count <= 0:
|
|
await ctx.send("❌ Please enter a positive number!", ephemeral=True)
|
|
return
|
|
|
|
# Check loop mode
|
|
loop_mode = await queue.get_loop_mode(server.id)
|
|
|
|
if loop_mode == 'song' and count == 1:
|
|
# When looping song and skipping once, just restart it
|
|
ctx.voice_client.stop()
|
|
await ctx.send(f"⏭️ Restarting song (loop mode active)")
|
|
else:
|
|
# Skip specified number of songs
|
|
for _ in range(count-1):
|
|
await queue.pop(server.id, True, skip_mode=True)
|
|
|
|
# Last song gets skip_mode=True to handle loop properly
|
|
if loop_mode != 'song':
|
|
await queue.pop(server.id, True, skip_mode=True)
|
|
|
|
ctx.voice_client.stop()
|
|
await ctx.send(f"⏭️ Skipped {count} song(s)")
|
|
|
|
|
|
@commands.hybrid_command(
|
|
name="loop",
|
|
description="Toggle loop mode",
|
|
aliases=['l', 'repeat'])
|
|
@app_commands.describe(mode="Loop mode: off, song, or queue")
|
|
@app_commands.choices(mode=[
|
|
app_commands.Choice(name="Off", value="off"),
|
|
app_commands.Choice(name="Song", value="song"),
|
|
app_commands.Choice(name="Queue", value="queue")
|
|
])
|
|
async def loop(self, ctx: Context, mode: str = None):
|
|
"""Toggle between loop modes or set a specific mode"""
|
|
server = ctx.guild
|
|
|
|
if server is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
current_mode = await queue.get_loop_mode(server.id)
|
|
|
|
# If no mode specified, cycle through modes
|
|
if mode is None:
|
|
if current_mode == 'off':
|
|
new_mode = 'song'
|
|
elif current_mode == 'song':
|
|
new_mode = 'queue'
|
|
else:
|
|
new_mode = 'off'
|
|
else:
|
|
# Set specific mode
|
|
mode = mode.lower()
|
|
if mode not in ['off', 'song', 'queue']:
|
|
await ctx.send("❌ Loop mode must be: off, song, or queue", ephemeral=True)
|
|
return
|
|
new_mode = mode
|
|
|
|
await queue.set_loop_mode(server.id, new_mode)
|
|
|
|
# Response messages
|
|
emojis = {'off': '⏹️', 'song': '🔂', 'queue': '🔁'}
|
|
messages = {
|
|
'off': 'Loop disabled',
|
|
'song': 'Looping current song 🔂 (skip to restart)',
|
|
'queue': 'Looping entire queue 🔁'
|
|
}
|
|
|
|
await ctx.send(f"{emojis[new_mode]} {messages[new_mode]}")
|
|
|
|
|
|
@commands.command(
|
|
help="Shuffle the queue randomly",
|
|
aliases=['mix', 'randomize'])
|
|
async def shuffle(self, ctx: Context):
|
|
"""Shuffle all songs in the queue"""
|
|
server = ctx.guild
|
|
|
|
if server is None:
|
|
raise commands.CommandError("Command must be issued in a server")
|
|
|
|
success = await queue.shuffle_queue(server.id)
|
|
|
|
if not success:
|
|
await ctx.send("🚫 Not enough songs in the queue to shuffle!")
|
|
else:
|
|
await ctx.message.add_reaction('🔀')
|
|
await ctx.send("🔀 Queue shuffled!")
|
|
|
|
|
|
@commands.hybrid_command(
|
|
name="volume",
|
|
description="Set playback volume",
|
|
aliases=['vol', 'v'])
|
|
@app_commands.describe(level="Volume level (0-200%, default shows current)")
|
|
async def volume(self, ctx: Context, level: int = None):
|
|
"""Set or display the current volume"""
|
|
server = ctx.guild
|
|
|
|
if server is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
if level is None:
|
|
# Display current volume
|
|
current_vol = await queue.get_volume(server.id)
|
|
await ctx.send(f"🔊 Current volume: {current_vol}%")
|
|
return
|
|
|
|
# Set volume
|
|
if level < 0 or level > 200:
|
|
await ctx.send("❌ Volume must be between 0 and 200!", ephemeral=True)
|
|
return
|
|
|
|
new_vol = await queue.set_volume(server.id, level)
|
|
|
|
# Update the current playing song's volume if something is playing
|
|
if ctx.voice_client and ctx.voice_client.source:
|
|
ctx.voice_client.source.volume = new_vol / 100.0
|
|
|
|
# Pick an emoji based on volume
|
|
if new_vol == 0:
|
|
emoji = '🔇'
|
|
elif new_vol < 50:
|
|
emoji = '🔉'
|
|
elif new_vol < 100:
|
|
emoji = '🔊'
|
|
else:
|
|
emoji = '📢'
|
|
|
|
await ctx.send(f"{emoji} Volume set to {new_vol}%")
|
|
|
|
|
|
@commands.hybrid_command(
|
|
name="effect",
|
|
description="Apply audio effects to playback",
|
|
aliases=['fx', 'filter'])
|
|
@app_commands.describe(effect_name="The audio effect to apply (leave empty to see list)")
|
|
async def effect(self, ctx: Context, effect_name: str = None):
|
|
"""Apply or list audio effects"""
|
|
server = ctx.guild
|
|
|
|
if server is None:
|
|
await ctx.send("❌ This command must be used in a server!", ephemeral=True)
|
|
return
|
|
|
|
# If no effect specified, show current effect and list options
|
|
if effect_name is None:
|
|
current = await queue.get_effect(server.id)
|
|
emoji = queue.get_effect_emoji(current)
|
|
desc = queue.get_effect_description(current)
|
|
|
|
effects_list = '\n'.join([
|
|
f"`{e}` - {queue.get_effect_description(e)}"
|
|
for e in queue.list_all_effects()[:9] # Show first 9
|
|
])
|
|
more_effects = '\n'.join([
|
|
f"`{e}` - {queue.get_effect_description(e)}"
|
|
for e in queue.list_all_effects()[9:] # Show rest
|
|
])
|
|
|
|
await ctx.send(
|
|
f"{emoji} **Current effect:** {current} - {desc}\n\n"
|
|
f"**Available effects:**\n{effects_list}\n\n"
|
|
f"**More effects:**\n{more_effects}\n\n"
|
|
f"Use `=effect <name>` or `/effect <name>` to apply!"
|
|
)
|
|
return
|
|
|
|
# Set effect
|
|
effect_name = effect_name.lower()
|
|
|
|
if effect_name not in queue.list_all_effects():
|
|
await ctx.send(
|
|
f"❌ Unknown effect! Use `/effect` to see available effects.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
await queue.set_effect(server.id, effect_name)
|
|
|
|
emoji = queue.get_effect_emoji(effect_name)
|
|
desc = queue.get_effect_description(effect_name)
|
|
|
|
# Special warning for earrape/deepfry
|
|
if effect_name in ['earrape', 'deepfry']:
|
|
await ctx.send(
|
|
f"⚠️ **{effect_name.upper()} MODE ACTIVATED** ⚠️\n"
|
|
f"{desc}\n"
|
|
f"Effect will apply to next song.\n"
|
|
f"Use `/effect none` to disable."
|
|
)
|
|
else:
|
|
await ctx.send(
|
|
f"{emoji} Effect set to **{effect_name}**\n"
|
|
f"{desc}\n"
|
|
f"Effect will apply to next song!"
|
|
)
|
|
|
|
# If something is currently playing, notify about skip
|
|
if ctx.voice_client and ctx.voice_client.is_playing():
|
|
await ctx.send("💡 Tip: Use `/skip` to apply effect immediately!")
|