dr.botzo/DrBotIRC.py

507 lines
17 KiB
Python

"""
DrBotIRC - customizations of irclib, for dr.botzo
Copyright (C) 2011 Brian S. Stephan
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import bisect
import copy
from ConfigParser import NoOptionError, NoSectionError
import logging
import re
import signal
from SimpleXMLRPCServer import SimpleXMLRPCServer
import socket
import sys
import thread
import traceback
from extlib import irclib
class DrBotzoMethods:
"""Methods to expose to the XML-RPC server."""
def __init__(self, irc):
"""Store the same stuff the core module would, since we'll probably need it."""
self.irc = irc
self.log = logging.getLogger('dr.botzo')
def echo(self, message):
"""
Just reply to the client, for testing purposes.
Keyword arguments:
message - the message to echo
"""
return message
def say(self, target, message):
"""
Say a message in a channel/to a nick.
Keyword arguments:
target - the nick/channel to privmsg
message - the message to send
"""
self.irc.server.privmsg(target, message)
return "OK"
def execute_module_method(self, modname, method, args):
"""
Execute the method (with arguments) of the specified module.
Keyword arguments:
modname - the loaded module to retrieve
method - the method to call from that module
args - the arguments to provide to the method as a pythonic tuple
"""
for module in self.irc.modlist:
if modname == module.__class__.__name__:
try:
func = getattr(module, method)
except AttributeError:
return "couldn't find " + method + " in found module " + modname
if hasattr(func, '__call__'):
return func(*args)
else:
return method + " in found module " + modname + " is not callable"
return "couldn't find " + modname
class DrBotServerConnection(irclib.ServerConnection):
"""Subclass irclib's ServerConnection, in order to expand privmsg."""
nickmask = None
def __init__(self, irclibobj):
irclib.ServerConnection.__init__(self, irclibobj)
# temporary. hopefully on_welcome() will set this
self.nickmask = socket.getfqdn()
self.log = logging.getLogger('dr.botzo')
self.add_global_handler('welcome', self.on_welcome, 1)
def on_welcome(self, connection, event):
"""Set the nickmask that the ircd tells us is us."""
what = event.arguments()[0]
match = re.search('(\S+!\S+@\S+)', what)
if match:
self.nickmask = match.group(1)
def privmsg(self, target, text):
"""Send a PRIVMSG command."""
splitter = "..."
# split messages that are too long. Max length is 512.
# TODO: this does not properly handle when the nickmask has been
# masked by the ircd
# is the above still the case?
space = 512 - len('\r\n') - len(' PRIVMSG ') - len(target) - len(' :') - len(self.nickmask) - len(' :')
splitspace = space - (len(splitter) + 1)
if len(text) > space:
times = 1
while len(text) > splitspace:
splitpos = text.rfind(' ', 0, splitspace)
splittext = text[0:splitpos] + ' ' + splitter
text = splitter + ' ' + text[splitpos+1:]
self.send_raw("PRIVMSG %s :%s" % (target, splittext))
times = times + 1
if times >= 4:
return
# done splitting
self.send_raw("PRIVMSG %s :%s" % (target, text))
else:
self.send_raw("PRIVMSG %s :%s" % (target, text))
class DrBotIRC(irclib.IRC):
"""Implement a customized irclib IRC."""
modlist = []
config = None
server = None
def __init__(self, config):
irclib.IRC.__init__(self)
self.config = config
self.xmlrpc = None
self.log = logging.getLogger('dr.botzo')
# handle SIGINT
signal.signal(signal.SIGINT, self.sigint_handler)
# load XML-RPC server
try:
if self.config.has_section('XMLRPC'):
host = self.config.get('XMLRPC', 'host')
port = self.config.getint('XMLRPC', 'port')
if host and port:
self.funcs = DrBotzoMethods(self)
self.xmlrpc = SimpleXMLRPCServer((host, port))
self.xmlrpc.register_introspection_functions()
self.xmlrpc.register_instance(self.funcs)
thread.start_new_thread(self._xmlrpc_listen, ())
except NoOptionError: pass
def server(self):
"""Create a DrBotServerConnection."""
self.server = DrBotServerConnection(self)
self.connections.append(self.server)
return self.server
def _handle_event(self, connection, event):
"""Override event handler to do recursion."""
try:
nick = irclib.nm_to_n(event.source())
except (IndexError, AttributeError):
nick = ''
try:
if self.config.has_section('Ignore'):
alias = self.config.get('Ignore', nick.lower())
if alias:
self.log.debug("ignoring " + nick + " as per config file")
return
except NoOptionError: pass
self.try_alias_cmds(connection, event)
self.try_recursion(connection, event)
self.try_alias(connection, event)
h = self.handlers
for handler in h.get("all_events", []) + h.get(event.eventtype(), []):
try:
if handler[1](connection, event) == "NO MORE":
return
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.log.error("exception floated up to DrBotIrc!")
self.log.error(exc_type + ": " + exc_value)
self.log.error(exc_traceback)
def xmlrpc_register_function(self, func, name):
"""Add a method to the XML-RPC interface."""
if func and self.xmlrpc:
if hasattr(func, '__call__'):
self.xmlrpc.register_function(func, name)
def _xmlrpc_listen(self):
"""Begin listening. Hopefully this was called in a new thread."""
self.xmlrpc.serve_forever()
def try_to_replace_event_text_with_module_text(self, connection, event):
"""Do something very similar to _handle_event, but for recursion.
The intent here is that we replace [text] with whatever a module
provides to us.
"""
h = self.handlers
replies = []
for handler in h.get("all_events", []) + h.get(event.eventtype(), []):
ret = handler[1](connection, event)
if ret:
replies.append(ret)
if len(replies):
event.arguments()[0] = '\n'.join(replies)
def try_alias_cmds(self, connection, event):
"""See if there is an alias ("!command") in the text, and if so
do alias manipulation before any other recursion or aliasing.
"""
try:
nick = irclib.nm_to_n(event.source())
except (IndexError, AttributeError):
nick = ''
try:
userhost = irclib.nm_to_uh(event.source())
except (IndexError, AttributeError):
userhost = ''
replypath = event.target()
try:
what = event.arguments()[0]
except (IndexError, AttributeError):
what = ''
admin_unlocked = False
# privmsg
if replypath == connection.get_nickname():
replypath = nick
try:
if userhost == self.config.get('dr.botzo', 'admin_userhost'):
admin_unlocked = True
except NoOptionError: pass
# first see if the aliases are being directly manipulated via add/remove
whats = what.split(' ')
if len(whats) <= 1:
return
try:
if whats[0] == '!alias' and whats[1] == 'add' and len(whats) >= 4:
if not self.config.has_section('Alias'):
self.config.add_section('Alias')
self.config.set('Alias', whats[2], ' '.join(whats[3:]))
replystr = 'Added alias ' + whats[2] + '.'
return self.reply(connection, event, replystr)
if whats[0] == '!alias' and whats[1] == 'remove' and len(whats) >= 3:
if not self.config.has_section('Alias'):
self.config.add_section('Alias')
if self.config.remove_option('Alias', whats[2]):
replystr = 'Removed alias ' + whats[2] + '.'
return self.reply(connection, event, replystr)
elif whats[0] == '!alias' and whats[1] == 'list':
try:
if len(whats) > 2:
alias = self.config.get('Alias', whats[2])
return self.reply(connection, event, alias)
else:
alist = self.config.options('Alias')
alist.remove('debug')
alist.sort()
liststr = ', '.join(alist)
return self.reply(connection, event, liststr)
except NoSectionError: pass
except NoOptionError: pass
except NoSectionError: pass
def try_alias(self, connection, event):
# try doing alias work
try:
what = event.arguments()[0]
alias_list = self.config.options('Alias')
for alias in alias_list:
alias_re = re.compile(alias, re.IGNORECASE)
if alias_re.search(what) and alias != 'debug':
# we found an alias for our given string, doing a replace
command = re.sub(alias, self.config.get('Alias', alias), what, flags=re.IGNORECASE)
event.arguments()[0] = command
# now we have to check it for recursions again
self.try_recursion(connection, event)
# i guess someone could have an alias of an alias... try again
return self.try_alias(connection, event)
except NoOptionError: pass
except NoSectionError: pass
except IndexError: pass
# if we got here, there are no matching aliases, so return what we got
return event
def try_recursion(self, connection, event):
"""Scan message for subcommands to execute and use as part of this command.
Upon seeing a line intended for this module, see if there are subcommands
that we should do what is basically a text replacement on. The intent is to
allow things like the following:
command arg1 [anothercommand arg1 arg2]
where the output of anothercommand is command's arg2..n.
"""
try:
# begin recursion search
attempt = event.arguments()[0]
start_idx = attempt.find('[')
subcmd = attempt[start_idx+1:]
end_idx = subcmd.rfind(']')
subcmd = subcmd[:end_idx]
if start_idx != -1 and end_idx != -1 and len(subcmd) > 0:
# found recursion candidate
# copy the event and see if IT has recursion to do
newevent = copy.deepcopy(event)
newevent.arguments()[0] = subcmd
newevent._recursing = True
self.try_recursion(connection, newevent)
# recursion over, check for aliases
self.try_alias(connection, newevent)
# now that we have a string that has been de-aliased and
# recursed all the way deeper into its text, see if any
# modules can do something with it. this calls the same
# event handlers in the same way as if this were a native
# event.
self.try_to_replace_event_text_with_module_text(connection, newevent)
# we have done all we can do with the sub-event. whatever
# the text of that event now is, we should replace the parent
# event's [] section with it.
oldtext = event.arguments()[0]
newtext = oldtext.replace('['+subcmd+']', newevent.arguments()[0])
event.arguments()[0] = newtext
# we have now resolved the []. recursion will unfold, replacing
# it further and further, until we eventually get back to the
# original irc event in _handle_event, which will do one
# last search on the text.
except IndexError: pass
def quit_irc(self, connection, msg):
for module in self.modlist:
module.save()
module.shutdown()
connection.quit(msg)
self.log.info(self.save_config())
self._xmlrpc_shutdown()
sys.exit()
def reply(self, connection, event, replystr, stop_responding=False):
"""Reply over IRC to replypath or return a string with the reply."""
replypath = event.target()
# check for privmsg
if replypath == connection.get_nickname():
replypath = irclib.nm_to_n(event.source())
if replystr is not None:
if event._recursing:
return replystr
else:
replies = replystr.split('\n')
for reply in replies:
connection.privmsg(replypath, reply)
if stop_responding:
return "NO MORE"
def save_modules(self):
for module in self.modlist:
module.save()
def _xmlrpc_shutdown(self):
"""Shut down the XML-RPC server."""
if self.xmlrpc is not None:
self.xmlrpc.shutdown()
self.xmlrpc.server_close()
def save_config(self):
with open('dr.botzo.cfg', 'w') as cfg:
self.config.write(cfg)
return 'Saved config.'
def load_module(self, modname):
"""Load a module (in both the python and dr.botzo sense) if not
already loaded.
"""
for module in self.modlist:
if modname == module.__class__.__name__:
return 'Module ' + modname + ' is already loaded.'
# not loaded, let's get to work
try:
modstr = 'modules.'+modname
__import__(modstr)
module = sys.modules[modstr]
botmod = eval('module.' + modname + '(self, self.config, self.server)')
self.modlist.append(botmod)
botmod.register_handlers()
# might as well add it to the list
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
modset.add(modname)
self.config.set('dr.botzo', 'module_list', ','.join(modset))
return 'Module ' + modname + ' loaded.'
except ImportError:
return 'Module ' + modname + ' not found.'
def unload_module(self, modname):
"""Attempt to unload and del a module if it's loaded."""
modstr = 'modules.'+modname
for module in self.modlist:
if modname == module.__class__.__name__:
# do anything the module needs to do to clean up
module.save()
module.shutdown()
# remove module references
self.modlist.remove(module)
module.unregister_handlers()
# del it
del(sys.modules[modstr])
del(module)
# might as well remove it from the list
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
modset.remove(modname)
self.config.set('dr.botzo', 'module_list', ','.join(modset))
return 'Module ' + modname + ' unloaded.'
# guess it was never loaded
return 'Module ' + modname + ' is not loaded.'
def list_modules(self):
"""List loaded modules."""
modnames = []
for module in self.modlist:
modnames.append(module.__class__.__name__)
return modnames
# SIGINT signal handler
def sigint_handler(self, signal, frame):
for module in self.modlist:
module.save()
module.shutdown()
self.log.info(self.save_config())
self._xmlrpc_shutdown()
sys.exit()
# vi:tabstop=4:expandtab:autoindent
# kate: indent-mode python;indent-width 4;replace-tabs on;