dr.botzo/modules/Radio.py

324 lines
12 KiB
Python
Raw Normal View History

"""
Radio - query and control an MPD instance
Copyright (C) 2012 Brian S. Stephan
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from ConfigParser import NoSectionError, NoOptionError
import re
import telnetlib
import time
from Module import Module
class NoMpdDataException(Exception):
"""Custom exception."""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class Radio(Module):
"""
Query and control an MPD server via telnet.
Pretty straightforward and brute-force, this telnets to the MPD port.
The module attempts to reuse the one connection when possible, and does
enough sanity checking that it should be able to detect when a new
connection is necessary or not.
"""
def __init__(self, irc, config):
"""Set up the usual IRC regex-type stuff."""
# set up regexes, for replying to specific stuff
statuspattern = '^!radio\s+status$'
self.statusre = re.compile(statuspattern)
# default settings/state
self.mpd = None
self.set_mpd_hostname('localhost')
self.set_mpd_port(6600)
# read config if provided
if config is not None:
try:
self.set_mpd_hostname(config.get(self.__class__.__name__, 'mpd_hostname'))
self.set_mpd_port(config.getint(self.__class__.__name__, 'mpd_port'))
except (NoSectionError, NoOptionError):
pass
Module.__init__(self, irc, config)
def do(self, connection, event, nick, userhost, what, admin_unlocked):
"""Handle commands and inputs from IRC events."""
if self.statusre.search(what):
return self.irc.reply(event, self.get_status())
def get_status(self):
"""Get the status (playlist entries, mostly) of the MPD server."""
try:
state = self._get_mpd_player_state()
if state == 'playing' or state == 'paused':
# get the current track's info
title = self._get_mpd_current_title()
artist = self._get_mpd_current_artist()
album = self._get_mpd_current_album()
status_str = 'MPD ({0:s}) :: current: {1:s} - {2:s} from {3:s}'.format(state,
artist, title, album)
current_song_num = self._get_mpd_current_song_playlist_number()
num_tracks = self._get_mpd_playlist_length()
# if there's a next, get it
if current_song_num+1 < num_tracks:
next_title = self._get_mpd_playlist_entry_title(current_song_num+1)
next_album = self._get_mpd_playlist_entry_album(current_song_num+1)
if next_album != album:
status_str = status_str + ' :: next: {0:s} from {1:s}'.format(next_title,
next_album)
else:
status_str = status_str + ' :: next: {0:s}'.format(next_title)
# get a couple more, too, if available
if current_song_num+2 < num_tracks:
next_title = self._get_mpd_playlist_entry_title(current_song_num+2)
status_str = status_str + ', {0:s}'.format(next_title)
if current_song_num+3 < num_tracks:
next_title = self._get_mpd_playlist_entry_title(current_song_num+3)
status_str = status_str + ', {0:s}'.format(next_title)
# if there's even more, just summarize the number left
if current_song_num+4 < num_tracks:
status_str = status_str + ', {0:d} more'.format(num_tracks - (current_song_num+4))
return status_str
elif state == 'stopped':
# TODO: spiff this up a bit
return 'MPD ({0:s})'.format(state)
else:
return state
except NoMpdDataException as nmde:
return 'Error retrieving MPD status: ' + str(nmde)
def set_mpd_hostname(self, hostname):
"""Override the hostname to connect to for MPD (e.g. from a config file or directly)."""
self.mpd_hostname = hostname
def set_mpd_port(self, port):
"""Override the port to connect to for MPD (e.g. from a config file or directly)."""
self.mpd_port = port
def _get_mpd_connection(self):
"""Open the telnet connection, or check if the existing one is healthy."""
if self.mpd is not None:
try:
self.mpd.write('status\n')
(index, match, text) = self.mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5)
if index >= 0:
self.mpd.read_until('\nOK\n', 5)
return self.mpd
else:
# no text was found
pass
except (EOFError):
pass
try:
self.mpd = telnetlib.Telnet(self.mpd_hostname, self.mpd_port)
(index, match, text) = self.mpd.expect(['OK MPD 0.17.0\n'], 5)
if index >= 0:
return self.mpd
else:
raise NoMpdDataException('could not connect to MPD server, or version mismatch')
except (EOFError):
raise NoMpdDataException('could not connect to MPD server, or version mismatch')
def _get_mpd_player_state(self):
"""See if player is playing, stopped, or paused."""
mpd = self._get_mpd_connection()
try:
mpd.write('status\n')
(index, match, text) = mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
match_text = match.group(1)
else:
raise NoMpdDataException('could not get current player status')
except (EOFError):
raise NoMpdDataException('could not get current player status')
if match_text == 'play':
return 'playing'
elif match_text == 'stop':
return 'stopped'
elif match_text == 'pause':
return 'paused'
def _get_mpd_current_title(self):
"""Get the title of the currently playing/paused song from MPD."""
mpd = self._get_mpd_connection()
try:
mpd.write('currentsong\n')
(index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get current song title')
except (EOFError):
raise NoMpdDataException('could not get current song title')
def _get_mpd_current_artist(self):
"""Get the artist of the currently playing/paused song from MPD."""
mpd = self._get_mpd_connection()
try:
mpd.write('currentsong\n')
(index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get current song artist')
except (EOFError):
raise NoMpdDataException('could not get current song artist')
def _get_mpd_current_album(self):
"""Get the album of the currently playing/paused song from MPD."""
mpd = self._get_mpd_connection()
try:
mpd.write('currentsong\n')
(index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get current song album')
except (EOFError):
raise NoMpdDataException('could not get current song album')
def _get_mpd_current_song_playlist_number(self):
"""Get the playlist entry number of the current song."""
mpd = self._get_mpd_connection()
try:
mpd.write('status\n')
(index, match, text) = mpd.expect(['song: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return int(match.group(1))
else:
raise NoMpdDataException('could not get current song playlist number')
except (EOFError):
raise NoMpdDataException('could not get current song playlist number')
def _get_mpd_next_song_playlist_number(self):
"""Get the playlist entry number of the next song."""
mpd = self._get_mpd_connection()
try:
mpd.write('status\n')
(index, match, text) = mpd.expect(['nextsong: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return int(match.group(1))
else:
raise NoMpdDataException('could not get next song playlist number')
except (EOFError):
raise NoMpdDataException('could not get next song playlist number')
def _get_mpd_playlist_length(self):
"""Get the number of songs in the playlist."""
mpd = self._get_mpd_connection()
try:
mpd.write('status\n')
(index, match, text) = mpd.expect(['playlistlength: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return int(match.group(1))
else:
raise NoMpdDataException('could not get playlist length')
except (EOFError):
raise NoMpdDataException('could not get playlist length')
def _get_mpd_playlist_entry_title(self, next_song_num):
"""Get the title of the song for the given playlist entry number."""
mpd = self._get_mpd_connection()
try:
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
(index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get song title')
except (EOFError):
raise NoMpdDataException('could not get song title')
def _get_mpd_playlist_entry_artist(self, next_song_num):
"""Get the artist of the song for the given playlist entry number."""
mpd = self._get_mpd_connection()
try:
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
(index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get song artist')
except (EOFError):
raise NoMpdDataException('could not get song artist')
def _get_mpd_playlist_entry_album(self, next_song_num):
"""Get the album of the song for the given playlist entry number."""
mpd = self._get_mpd_connection()
try:
mpd.write('playlistinfo ' + str(next_song_num) + '\n')
(index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5)
if index >= 0:
mpd.read_until('\nOK\n', 5)
return match.group(1)
else:
raise NoMpdDataException('could not get song album')
except (EOFError):
raise NoMpdDataException('could not get song album')
if __name__ == '__main__':
radio = Radio(None, None, None)
print(radio.get_status())
# vi:tabstop=4:expandtab:autoindent