""" DrBotIRC - customizations of irclib, for dr.botzo Copyright (C) 2011 Brian S. Stephan This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ from ConfigParser import NoOptionError, NoSectionError import re import signal import socket import sys from extlib import irclib class DrBotServerConnection(irclib.ServerConnection): """Subclass irclib's ServerConnection, in order to expand privmsg.""" def privmsg(self, target, text): """Send a PRIVMSG command.""" splitter = "..." # split messages that are too long. Max length is 512. # TODO: this does not properly handle when the hostname has been # masked by the ircd space = 512 - len('\r\n') - len('PRIVMSG ') - len(' :') - len(target) - len(self.nickname) - len('!') - len(self.username) - len('@') - len(socket.getfqdn()) splitspace = space - (len(splitter) + 1) if len(text) > space: times = 1 while len(text) > splitspace: splitpos = text.rfind(' ', 0, splitspace) splittext = text[0:splitpos] + ' ' + splitter text = splitter + ' ' + text[splitpos+1:] self.send_raw("PRIVMSG %s :%s" % (target, splittext)) times = times + 1 if times >= 4: return # done splitting self.send_raw("PRIVMSG %s :%s" % (target, text)) else: self.send_raw("PRIVMSG %s :%s" % (target, text)) class DrBotIRC(irclib.IRC): """Implement a customized irclib IRC.""" modlist = [] config = None server = None def __init__(self, config): irclib.IRC.__init__(self) self.config = config # handle SIGINT signal.signal(signal.SIGINT, self.sigint_handler) def server(self): """Create a DrBotServerConnection.""" self.server = DrBotServerConnection(self) self.connections.append(self.server) self.server.add_global_handler('pubmsg', self.on_pubmsg, 1) self.server.add_global_handler('privmsg', self.on_pubmsg, 1) return self.server def on_pubmsg(self, connection, event): """See if there is an alias ("!command") in the text, and if so, translate it into an internal bot command and run it. """ nick = irclib.nm_to_n(event.source()) userhost = irclib.nm_to_uh(event.source()) replypath = event.target() what = event.arguments()[0] admin_unlocked = False # privmsg if replypath == connection.get_nickname(): replypath = nick try: if userhost == self.config.get('dr.botzo', 'admin_userhost'): admin_unlocked = True except NoOptionError: pass return self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked) def try_alias(self, connection, event, nick, userhost, replypath, what, admin_unlocked): # first see if the aliases are being directly manipulated via add/remove whats = what.split(' ') try: if whats[0] == '!alias' and whats[1] == 'add' and len(whats) >= 4: if not self.config.has_section('Alias'): self.config.add_section('Alias') self.config.set('Alias', whats[2], ' '.join(whats[3:])) replystr = 'Added alias ' + whats[2] + '.' return self.reply(connection, replypath, replystr) if whats[0] == '!alias' and whats[1] == 'remove' and len(whats) >= 3: if not self.config.has_section('Alias'): self.config.add_section('Alias') if self.config.remove_option('Alias', whats[2]): replystr = 'Removed alias ' + whats[2] + '.' return self.reply(connection, replypath, replystr) elif whats[0] == '!alias' and whats[1] == 'list': try: if len(whats) > 2: alias = self.config.get('Alias', whats[2]) return self.reply(connection, replypath, alias) else: alist = self.config.options('Alias') alist.remove('debug') alist.sort() liststr = ', '.join(alist) return self.reply(connection, replypath, liststr) except NoSectionError: pass except NoOptionError: pass except NoSectionError: pass # done searching for recursions in this level. we will now operate on # whatever recursion-satisfied string we have, checking for alias and # running module commands # now that that's over, try doing alias work try: alias_list = self.config.options('Alias') for alias in alias_list: alias_re = re.compile(alias) if alias_re.search(what): # we found an alias for our given string, doing a replace command = re.sub(alias, self.config.get('Alias', alias), what) # need to do another recursion scan ret = self.try_recursion(connection, event, nick, userhost, None, command, admin_unlocked) return self.reply(connection, replypath, ret) except NoOptionError: pass except NoSectionError: pass # if we got here, there are no matching aliases, so return what we got return self.reply(connection, replypath, what) def reply(self, connection, replypath, replystr, stop_responding=False): """ Reply over IRC to replypath or return a string with the reply. Utility method to do the proper type of reply (either to IRC, or as a return to caller) depending on the target. Pretty simple, and included in the base class for convenience. It should be the last step for callers: return self.reply(connection, replypath, 'hello') """ if replystr is not None: if replypath is None: return replystr else: connection.privmsg(replypath, replystr) if stop_responding: return "NO MORE" def try_recursion(self, connection, event, nick, userhost, replypath, what, admin_unlocked): """Scan message for subcommands to execute and use as part of this command. Upon seeing a line intended for this module, see if there are subcommands that we should do what is basically a text replacement on. The intent is to allow things like the following: command arg1 [anothercommand arg1 arg2] where the output of anothercommand is command's arg2..n. """ attempt = what start_idx = attempt.find('[') subcmd = attempt[start_idx+1:] end_idx = subcmd.rfind(']') subcmd = subcmd[:end_idx] if start_idx == -1 or end_idx == -1 or len(subcmd) == 0: # no recursion, now let's look for aliases command = self.try_alias(connection, event, nick, userhost, None, attempt, admin_unlocked) # aliases resolved. run result against each module for module in self.modlist: ret = module.do(connection, event, nick, userhost, None, command, admin_unlocked) if ret is not None: # a module had a result for us, post-alias, so return it # TODO: scan all modules with compounding results return self.reply(connection, replypath, ret) # if we got here, text isn't a command, so pass it back, but ONLY if the output # is different from the input if replypath is None or what != command: return self.reply(connection, replypath, command) else: ret = self.try_recursion(connection, event, nick, userhost, None, subcmd, admin_unlocked) if ret is not None: return self.try_alias(connection, event, nick, userhost, replypath, attempt.replace('['+subcmd+']', ret), admin_unlocked) else: return self.try_alias(connection, event, nick, userhost, replypath, attempt, admin_unlocked) def quit_irc(self, connection, msg): for module in self.modlist: module.shutdown() connection.quit(msg) self.save_config() sys.exit() def save_config(self): with open('dr.botzo.cfg', 'w') as cfg: self.config.write(cfg) for module in self.modlist: module.save() return 'Saved.' def load_module(self, modname): """Load a module (in both the python and dr.botzo sense) if not already loaded. """ for module in self.modlist: if modname == module.__class__.__name__: return 'Module ' + modname + ' is already loaded.' # not loaded, let's get to work try: modstr = 'modules.'+modname __import__(modstr) module = sys.modules[modstr] botmod = eval('module.' + modname + '(self, self.config, self.server)') self.modlist.append(botmod) botmod.register_handlers(self.server) # might as well add it to the list modset = set(self.config.get('dr.botzo', 'module_list').split(',')) modset.add(modname) self.config.set('dr.botzo', 'module_list', ','.join(modset)) return 'Module ' + modname + ' loaded.' except ImportError: return 'Module ' + modname + ' not found.' def unload_module(self, modname): """Attempt to unload and del a module if it's loaded.""" modstr = 'modules.'+modname for module in self.modlist: if modname == module.__class__.__name__: # do anything the module needs to do to clean up module.shutdown() # remove module references self.modlist.remove(module) module.unregister_handlers() # del it del(sys.modules[modstr]) del(module) # might as well remove it from the list modset = set(self.config.get('dr.botzo', 'module_list').split(',')) modset.remove(modname) self.config.set('dr.botzo', 'module_list', ','.join(modset)) return 'Module ' + modname + ' unloaded.' # guess it was never loaded return 'Module ' + modname + ' is not loaded.' def reload_module(self, modname): """Attempt to reload a module, by removing it from memory and then re-initializing it. """ ret = self.unload_module(modname) if ret == 'Module ' + modname + ' unloaded.': ret = self.load_module(modname) if ret == 'Module ' + modname + ' loaded.': return 'Module ' + modname + ' reloaded.' return 'Module ' + modname + ' reload failed. Check the console.' # SIGINT signal handler def sigint_handler(self, signal, frame): print(self.save_config()) sys.exit() # vi:tabstop=4:expandtab:autoindent # kate: indent-mode python;indent-width 4;replace-tabs on;