diff --git a/modules/Radio.py b/modules/Radio.py deleted file mode 100644 index 535a3f5..0000000 --- a/modules/Radio.py +++ /dev/null @@ -1,323 +0,0 @@ -""" -Radio - query and control an MPD instance -Copyright (C) 2012 Brian S. Stephan - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . -""" - -from ConfigParser import NoSectionError, NoOptionError -import re -import telnetlib -import time - -from Module import Module - -class NoMpdDataException(Exception): - - """Custom exception.""" - - def __init__(self, msg): - - self.msg = msg - - def __str__(self): - - return self.msg - -class Radio(Module): - - """ - Query and control an MPD server via telnet. - - Pretty straightforward and brute-force, this telnets to the MPD port. - The module attempts to reuse the one connection when possible, and does - enough sanity checking that it should be able to detect when a new - connection is necessary or not. - """ - - def __init__(self, irc, config): - """Set up the usual IRC regex-type stuff.""" - - # set up regexes, for replying to specific stuff - statuspattern = '^!radio\s+status$' - - self.statusre = re.compile(statuspattern) - - # default settings/state - self.mpd = None - self.set_mpd_hostname('localhost') - self.set_mpd_port(6600) - - # read config if provided - if config is not None: - try: - self.set_mpd_hostname(config.get(self.__class__.__name__, 'mpd_hostname')) - self.set_mpd_port(config.getint(self.__class__.__name__, 'mpd_port')) - except (NoSectionError, NoOptionError): - pass - - Module.__init__(self, irc, config) - - def do(self, connection, event, nick, userhost, what, admin_unlocked): - """Handle commands and inputs from IRC events.""" - - if self.statusre.search(what): - return self.irc.reply(event, self.get_status()) - - def get_status(self): - """Get the status (playlist entries, mostly) of the MPD server.""" - - try: - state = self._get_mpd_player_state() - if state == 'playing' or state == 'paused': - # get the current track's info - title = self._get_mpd_current_title() - artist = self._get_mpd_current_artist() - album = self._get_mpd_current_album() - status_str = 'MPD ({0:s}) :: current: {1:s} - {2:s} from {3:s}'.format(state, - artist, title, album) - - current_song_num = self._get_mpd_current_song_playlist_number() - num_tracks = self._get_mpd_playlist_length() - - # if there's a next, get it - if current_song_num+1 < num_tracks: - next_title = self._get_mpd_playlist_entry_title(current_song_num+1) - next_album = self._get_mpd_playlist_entry_album(current_song_num+1) - if next_album != album: - status_str = status_str + ' :: next: {0:s} from {1:s}'.format(next_title, - next_album) - else: - status_str = status_str + ' :: next: {0:s}'.format(next_title) - - # get a couple more, too, if available - if current_song_num+2 < num_tracks: - next_title = self._get_mpd_playlist_entry_title(current_song_num+2) - status_str = status_str + ', {0:s}'.format(next_title) - - if current_song_num+3 < num_tracks: - next_title = self._get_mpd_playlist_entry_title(current_song_num+3) - status_str = status_str + ', {0:s}'.format(next_title) - - # if there's even more, just summarize the number left - if current_song_num+4 < num_tracks: - status_str = status_str + ', {0:d} more'.format(num_tracks - (current_song_num+4)) - - return status_str - elif state == 'stopped': - # TODO: spiff this up a bit - return 'MPD ({0:s})'.format(state) - else: - return state - except NoMpdDataException as nmde: - return 'Error retrieving MPD status: ' + str(nmde) - - def set_mpd_hostname(self, hostname): - """Override the hostname to connect to for MPD (e.g. from a config file or directly).""" - - self.mpd_hostname = hostname - - def set_mpd_port(self, port): - """Override the port to connect to for MPD (e.g. from a config file or directly).""" - - self.mpd_port = port - - def _get_mpd_connection(self): - """Open the telnet connection, or check if the existing one is healthy.""" - - if self.mpd is not None: - try: - self.mpd.write('status\n') - (index, match, text) = self.mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5) - if index >= 0: - self.mpd.read_until('\nOK\n', 5) - return self.mpd - else: - # no text was found - pass - except (EOFError): - pass - - try: - self.mpd = telnetlib.Telnet(self.mpd_hostname, self.mpd_port) - (index, match, text) = self.mpd.expect(['OK MPD 0.17.0\n'], 5) - if index >= 0: - return self.mpd - else: - raise NoMpdDataException('could not connect to MPD server, or version mismatch') - except (EOFError): - raise NoMpdDataException('could not connect to MPD server, or version mismatch') - - def _get_mpd_player_state(self): - """See if player is playing, stopped, or paused.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('status\n') - (index, match, text) = mpd.expect(['state: ((play)|(stop)|(pause))\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - match_text = match.group(1) - else: - raise NoMpdDataException('could not get current player status') - except (EOFError): - raise NoMpdDataException('could not get current player status') - - if match_text == 'play': - return 'playing' - elif match_text == 'stop': - return 'stopped' - elif match_text == 'pause': - return 'paused' - - def _get_mpd_current_title(self): - """Get the title of the currently playing/paused song from MPD.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('currentsong\n') - (index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get current song title') - except (EOFError): - raise NoMpdDataException('could not get current song title') - - def _get_mpd_current_artist(self): - """Get the artist of the currently playing/paused song from MPD.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('currentsong\n') - (index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get current song artist') - except (EOFError): - raise NoMpdDataException('could not get current song artist') - - def _get_mpd_current_album(self): - """Get the album of the currently playing/paused song from MPD.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('currentsong\n') - (index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get current song album') - except (EOFError): - raise NoMpdDataException('could not get current song album') - - def _get_mpd_current_song_playlist_number(self): - """Get the playlist entry number of the current song.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('status\n') - (index, match, text) = mpd.expect(['song: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return int(match.group(1)) - else: - raise NoMpdDataException('could not get current song playlist number') - except (EOFError): - raise NoMpdDataException('could not get current song playlist number') - - def _get_mpd_next_song_playlist_number(self): - """Get the playlist entry number of the next song.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('status\n') - (index, match, text) = mpd.expect(['nextsong: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return int(match.group(1)) - else: - raise NoMpdDataException('could not get next song playlist number') - except (EOFError): - raise NoMpdDataException('could not get next song playlist number') - - def _get_mpd_playlist_length(self): - """Get the number of songs in the playlist.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('status\n') - (index, match, text) = mpd.expect(['playlistlength: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return int(match.group(1)) - else: - raise NoMpdDataException('could not get playlist length') - except (EOFError): - raise NoMpdDataException('could not get playlist length') - - def _get_mpd_playlist_entry_title(self, next_song_num): - """Get the title of the song for the given playlist entry number.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('playlistinfo ' + str(next_song_num) + '\n') - (index, match, text) = mpd.expect(['Title: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get song title') - except (EOFError): - raise NoMpdDataException('could not get song title') - - def _get_mpd_playlist_entry_artist(self, next_song_num): - """Get the artist of the song for the given playlist entry number.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('playlistinfo ' + str(next_song_num) + '\n') - (index, match, text) = mpd.expect(['Artist: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get song artist') - except (EOFError): - raise NoMpdDataException('could not get song artist') - - def _get_mpd_playlist_entry_album(self, next_song_num): - """Get the album of the song for the given playlist entry number.""" - - mpd = self._get_mpd_connection() - try: - mpd.write('playlistinfo ' + str(next_song_num) + '\n') - (index, match, text) = mpd.expect(['Album: ([^\n]+)\n'], 5) - if index >= 0: - mpd.read_until('\nOK\n', 5) - return match.group(1) - else: - raise NoMpdDataException('could not get song album') - except (EOFError): - raise NoMpdDataException('could not get song album') - -if __name__ == '__main__': - radio = Radio(None, None, None) - print(radio.get_status()) - -# vi:tabstop=4:expandtab:autoindent