another "this is unnecessary" change, obviously impacting all the modules that override __init__ as well as the base class. again, they can use the DrBotIRC instance for anything, which is (with one exception) only for add/remove_global_handler, which i'm planning on working my way off of anyway
324 lines
12 KiB
Python
324 lines
12 KiB
Python
"""
|
|
Radio - query and control an MPD instance
|
|
Copyright (C) 2012 Brian S. Stephan
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
from ConfigParser import NoSectionError, NoOptionError
|
|
import re
|
|
import telnetlib
|
|
import time
|
|
|
|
from Module import Module
|
|
|
|
class NoMpdDataException(Exception):
|
|
|
|
"""Custom exception."""
|
|
|
|
def __init__(self, msg):
|
|
|
|
self.msg = msg
|
|
|
|
def __str__(self):
|
|
|
|
return self.msg
|
|
|
|
class Radio(Module):
|
|
|
|
"""
|
|
Query and control an MPD server via telnet.
|
|
|
|
Pretty straightforward and brute-force, this telnets to the MPD port.
|
|
The module attempts to reuse the one connection when possible, and does
|
|
enough sanity checking that it should be able to detect when a new
|
|
connection is necessary or not.
|
|
"""
|
|
|
|
def __init__(self, irc, config):
|
|
"""Set up the usual IRC regex-type stuff."""
|
|
|
|
# set up regexes, for replying to specific stuff
|
|
statuspattern = '^!radio\s+status$'
|
|
|
|
self.statusre = re.compile(statuspattern)
|
|
|
|
# default settings/state
|
|
self.mpd = None
|
|
self.set_mpd_hostname('localhost')
|
|
self.set_mpd_port(6600)
|
|
|
|
# read config if provided
|
|
if config is not None:
|
|
try:
|
|
self.set_mpd_hostname(config.get(self.__class__.__name__, 'mpd_hostname'))
|
|
self.set_mpd_port(config.getint(self.__class__.__name__, 'mpd_port'))
|
|
except (NoSectionError, NoOptionError):
|
|
pass
|
|
|
|
Module.__init__(self, irc, config)
|
|
|
|
def do(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Handle commands and inputs from IRC events."""
|
|
|
|
if self.statusre.search(what):
|
|
return self.irc.reply(event, self.get_status())
|
|
|
|
def get_status(self):
|
|
"""Get the status (playlist entries, mostly) of the MPD server."""
|
|
|
|
try:
|
|
state = self._get_mpd_player_state()
|
|
if state == 'playing' or state == 'paused':
|
|
# get the current track's info
|
|
title = self._get_mpd_current_title()
|
|
artist = self._get_mpd_current_artist()
|
|
album = self._get_mpd_current_album()
|
|
status_str = 'MPD ({0:s}) :: current: {1:s} - {2:s} from {3:s}'.format(state,
|
|
artist, title, album)
|
|
|
|
current_song_num = self._get_mpd_current_song_playlist_number()
|
|
num_tracks = self._get_mpd_playlist_length()
|
|
|
|
# if there's a next, get it
|
|
if current_song_num+1 < num_tracks:
|
|
next_title = self._get_mpd_playlist_entry_title(current_song_num+1)
|
|
next_album = self._get_mpd_playlist_entry_album(current_song_num+1)
|
|
if next_album != album:
|
|
status_str = status_str + ' :: next: {0:s} from {1:s}'.format(next_title,
|
|
next_album)
|
|
else:
|
|
status_str = status_str + ' :: next: {0:s}'.format(next_title)
|
|
|
|
# get a couple more, too, if available
|
|
if current_song_num+2 < num_tracks:
|
|
next_title = self._get_mpd_playlist_entry_title(current_song_num+2)
|
|
status_str = status_str + ', {0:s}'.format(next_title)
|
|
|
|
if current_song_num+3 < num_tracks:
|
|
next_title = self._get_mpd_playlist_entry_title(current_song_num+3)
|
|
status_str = status_str + ', {0:s}'.format(next_title)
|
|
|
|
# if there's even more, just summarize the number left
|
|
if current_song_num+4 < num_tracks:
|
|
status_str = status_str + ', {0:d} more'.format(num_tracks - (current_song_num+4))
|
|
|
|
return status_str
|
|
elif state == 'stopped':
|
|
# TODO: spiff this up a bit
|
|
return 'MPD ({0:s})'.format(state)
|
|
else:
|
|
return state
|
|
except NoMpdDataException as nmde:
|
|
return 'Error retrieving MPD status: ' + str(nmde)
|
|
|
|
def set_mpd_hostname(self, hostname):
|
|
"""Override the hostname to connect to for MPD (e.g. from a config file or directly)."""
|
|
|
|
self.mpd_hostname = hostname
|
|
|
|
def set_mpd_port(self, port):
|
|
"""Override the port to connect to for MPD (e.g. from a config file or directly)."""
|
|
|
|
self.mpd_port = port
|
|
|
|
def _get_mpd_connection(self):
|
|
"""Open the telnet connection, or check if the existing one is healthy."""
|
|
|
|
if self.mpd is not None:
|
|
try:
|
|
self.mpd.write('status\n')
|
|
(index, match, text) = self.mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5)
|
|
if index >= 0:
|
|
self.mpd.read_until('\nOK\n', 5)
|
|
return self.mpd
|
|
else:
|
|
# no text was found
|
|
pass
|
|
except (EOFError):
|
|
pass
|
|
|
|
try:
|
|
self.mpd = telnetlib.Telnet(self.mpd_hostname, self.mpd_port)
|
|
(index, match, text) = self.mpd.expect(['OK MPD 0.17.0\n'], 5)
|
|
if index >= 0:
|
|
return self.mpd
|
|
else:
|
|
raise NoMpdDataException('could not connect to MPD server, or version mismatch')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not connect to MPD server, or version mismatch')
|
|
|
|
def _get_mpd_player_state(self):
|
|
"""See if player is playing, stopped, or paused."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('status\n')
|
|
(index, match, text) = mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
match_text = match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get current player status')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get current player status')
|
|
|
|
if match_text == 'play':
|
|
return 'playing'
|
|
elif match_text == 'stop':
|
|
return 'stopped'
|
|
elif match_text == 'pause':
|
|
return 'paused'
|
|
|
|
def _get_mpd_current_title(self):
|
|
"""Get the title of the currently playing/paused song from MPD."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('currentsong\n')
|
|
(index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get current song title')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get current song title')
|
|
|
|
def _get_mpd_current_artist(self):
|
|
"""Get the artist of the currently playing/paused song from MPD."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('currentsong\n')
|
|
(index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get current song artist')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get current song artist')
|
|
|
|
def _get_mpd_current_album(self):
|
|
"""Get the album of the currently playing/paused song from MPD."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('currentsong\n')
|
|
(index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get current song album')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get current song album')
|
|
|
|
def _get_mpd_current_song_playlist_number(self):
|
|
"""Get the playlist entry number of the current song."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('status\n')
|
|
(index, match, text) = mpd.expect(['song: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return int(match.group(1))
|
|
else:
|
|
raise NoMpdDataException('could not get current song playlist number')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get current song playlist number')
|
|
|
|
def _get_mpd_next_song_playlist_number(self):
|
|
"""Get the playlist entry number of the next song."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('status\n')
|
|
(index, match, text) = mpd.expect(['nextsong: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return int(match.group(1))
|
|
else:
|
|
raise NoMpdDataException('could not get next song playlist number')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get next song playlist number')
|
|
|
|
def _get_mpd_playlist_length(self):
|
|
"""Get the number of songs in the playlist."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('status\n')
|
|
(index, match, text) = mpd.expect(['playlistlength: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return int(match.group(1))
|
|
else:
|
|
raise NoMpdDataException('could not get playlist length')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get playlist length')
|
|
|
|
def _get_mpd_playlist_entry_title(self, next_song_num):
|
|
"""Get the title of the song for the given playlist entry number."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
|
|
(index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get song title')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get song title')
|
|
|
|
def _get_mpd_playlist_entry_artist(self, next_song_num):
|
|
"""Get the artist of the song for the given playlist entry number."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
|
|
(index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get song artist')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get song artist')
|
|
|
|
def _get_mpd_playlist_entry_album(self, next_song_num):
|
|
"""Get the album of the song for the given playlist entry number."""
|
|
|
|
mpd = self._get_mpd_connection()
|
|
try:
|
|
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
|
|
(index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5)
|
|
if index >= 0:
|
|
mpd.read_until('\nOK\n', 5)
|
|
return match.group(1)
|
|
else:
|
|
raise NoMpdDataException('could not get song album')
|
|
except (EOFError):
|
|
raise NoMpdDataException('could not get song album')
|
|
|
|
if __name__ == '__main__':
|
|
radio = Radio(None, None, None)
|
|
print(radio.get_status())
|
|
|
|
# vi:tabstop=4:expandtab:autoindent
|