dr.botzo/dr_botzo/ircbot/DrBotIRC.py

822 lines
27 KiB
Python

"""
DrBotIRC - customizations of irclib, for dr.botzo
Copyright (C) 2011 Brian S. Stephan
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import bisect
import copy
from ConfigParser import NoOptionError, NoSectionError
import logging
import re
import signal
from SimpleXMLRPCServer import SimpleXMLRPCServer
import socket
import sys
import thread
from extlib import irclib
log = logging.getLogger('drbotzo')
class DrBotzoMethods:
"""Methods to expose to the XML-RPC server."""
def __init__(self, irc):
"""Store the same stuff the core module would, since we'll probably
need it.
Args:
irc the irc instance to save a reference to
"""
self.irc = irc
def say(self, target, message):
"""Say a message in a channel/to a nick.
Args:
target the nick/channel to privmsg
message the message to send
Returns:
"OK", since there's no other feedback to give.
"""
self.irc.server.privmsg(target, message)
return "OK"
def execute_module_method(self, modname, method, args):
"""Execute the method (with arguments) of the specified module.
Args:
modname the loaded module to retrieve
method the method to call from that module
args the arguments to provide to the method as a tuple
Returns:
An error string, or the outcome of the method call.
"""
for module in self.irc.modlist:
if modname == module.__class__.__name__:
try:
func = getattr(module, method)
except AttributeError:
return ("couldn't find '{0:s}' in found module "
"'{1:s}'".format(method, modname))
if hasattr(func, '__call__'):
return func(*args)
else:
return ("'{0:s}' in found module '{1:s}' is not "
"callable".format(method, modname))
return "couldn't find module '{0:s}'".format(modname)
class DrBotServerConnection(irclib.ServerConnection):
"""Subclass irclib's ServerConnection, in order to expand privmsg."""
nickmask = None
def __init__(self, irclibobj, nickname=None, username=None):
"""Instantiate the server connection.
Also start guessing at the nickmask and get ready to do on_welcome
stuff.
Args:
irclibobj the irclib instance to connect with
nickname the nickname to use in nickmask guess
username the username to use in nickmask guess
"""
irclib.ServerConnection.__init__(self, irclibobj)
# temporary. hopefully on_welcome() will set this, but this should be
# a pretty good guess if not
nick = nickname
user = username if username is not None else nick
host = socket.getfqdn()
self.nickmask = "{0:s}!~{1:s}@{2:s}".format(nick, user, host)
log.debug("guessing at nickmask '{0:s}'".format(self.nickmask))
self.add_global_handler('welcome', self.on_welcome, 1)
def on_welcome(self, connection, event):
"""Set the nickmask that the ircd tells us is us.
Args:
connection source connection
event incoming event
"""
what = event.arguments()[0]
log.debug("welcome: {0:s}".format(what))
match = re.search('(\S+!\S+@\S+)', what)
if match:
self.nickmask = match.group(1)
log.debug("setting nickmask: {0:s}".format(self.nickmask))
def privmsg(self, target, text):
"""Send a PRIVMSG command.
Args:
target the destination nick/channel
text the message to send
"""
log.debug("OUTGOING PRIVMSG: t[{0:s}] m[{1:s}]".format(target, text))
splitter = "..."
# split messages that are too long. Max length is 512.
# TODO: this does not properly handle when the nickmask has been
# masked by the ircd
# is the above still the case?
space = 512 - len('\r\n') - len(' PRIVMSG ') - len(target) - len(' :') - len(self.nickmask) - len(' :')
splitspace = space - (len(splitter) + 1)
if len(text) > space:
times = 1
while len(text) > splitspace:
splitpos = text.rfind(' ', 0, splitspace)
splittext = text[0:splitpos] + ' ' + splitter
text = splitter + ' ' + text[splitpos+1:]
self.send_raw("PRIVMSG {0:s} :{1:s}".format(target, splittext))
times = times + 1
if times >= 4:
# this is stupidly long, abort
return
# done splitting
self.send_raw("PRIVMSG {0:s} :{1:s}".format(target, text))
else:
self.send_raw("PRIVMSG {0:s} :{1:s}".format(target, text))
class DrBotIRC(irclib.IRC):
"""Implement a customized irclib IRC."""
modlist = []
config = None
server = None
def __init__(self, config):
"""Initialize XML-RPC interface and save references.
Args:
config the config structure to load stuff from
"""
irclib.IRC.__init__(self)
self.config = config
self.xmlrpc = None
self.regex_handlers = dict()
# handle SIGINT
signal.signal(signal.SIGINT, self.sigint_handler)
# load XML-RPC server
try:
if self.config.has_section('XMLRPC'):
host = self.config.get('XMLRPC', 'host')
port = self.config.getint('XMLRPC', 'port')
if host and port:
self.funcs = DrBotzoMethods(self)
self.xmlrpc = SimpleXMLRPCServer((host, port), logRequests=False)
self.xmlrpc.register_introspection_functions()
self.xmlrpc.register_instance(self.funcs)
thread.start_new_thread(self._xmlrpc_listen, ())
except NoOptionError: pass
def server(self):
"""Create a DrBotServerConnection.
Returns:
The newly created DrBotServerConnection.
"""
# get the nick and user name so we can provide it to
# DrBotServerConnection as a hint for the nickmask
user = None
nick = None
try:
if self.config.has_section('dr.botzo'):
user = self.config.get('dr.botzo', 'user')
nick = self.config.get('dr.botzo', 'nick')
except NoOptionError: pass
self.server = DrBotServerConnection(self, user, nick)
# though we only handle one connection, the underlying library supports
# multiple. append the new one though we intend no others
self.connections.append(self.server)
return self.server
def add_global_regex_handler(self, event, regex, handler, priority=0):
"""Adds a global handler function for a specific event type and regex.
The handler function is called whenever the specified event is
triggered in any of the connections and the regex matches.
See documentation for the Event class.
The handler functions are called in priority order (lowest
number is highest priority). If a handler function returns
"NO MORE", no more handlers will be called.
This is basically an extension of add_global_handler(), either may
work, though it turns out most modules probably want this one.
The provided method should take as arguments:
* nick the nick creating the event, or None
* userhost the userhost creating the event, or None
* event list of the raw IRC events, which may be None
* from_admin whether or not the event came from an admin
* groups list of match.groups(), from re.search()
Args:
event event type (a string), as in numeric_events
regex regex string to match before doing callback invocation
handler callback function to invoke
priority integer, the lower number, the higher priority
"""
if type(event) != list:
event = [event]
for ev in event:
if not ev in self.regex_handlers:
self.regex_handlers[ev] = []
bisect.insort(self.regex_handlers[ev], ((priority, regex, handler)))
def remove_global_regex_handler(self, event, regex, handler):
"""Removes a global regex handler function.
Args:
event list of event type (a string), as in numeric_events
regex regex string that was used in matching
handler callback function to remove
Returns:
1 on success, otherwise 0.
"""
if type(event) != list:
event = [event]
for ev in event:
if not ev in self.regex_handlers:
continue
for h in self.regex_handlers[ev]:
if regex == h[1] and handler == h[2]:
self.regex_handlers[ev].remove(h)
def _handle_event(self, connection, event):
"""Override event handler to do recursion and regex checking.
Args:
connection source connection
event incoming event
"""
log.debug("EVENT: e[{0:s}] s[{1:s}] t[{2:s}] "
"a[{3:s}]".format(event.eventtype(), event.source(),
event.target(), event.arguments()))
try:
nick = irclib.nm_to_n(event.source())
except (IndexError, AttributeError):
nick = ''
try:
if self.config.has_section('Ignore'):
alias = self.config.get('Ignore', nick.lower())
if alias:
log.debug("ignoring {0:s} as per config file".format(nick))
return
except NoOptionError: pass
self.try_alias_cmds(connection, event)
self.try_recursion(connection, event)
self.try_alias(connection, event)
nick = None
userhost = None
admin = False
if event.source() is not None:
nick = irclib.nm_to_n(event.source())
try:
userhost = irclib.nm_to_uh(event.source())
except IndexError: pass
try:
if userhost == self.config.get('dr.botzo', 'admin_userhost'):
admin = True
except NoOptionError: pass
# try regex handlers first, since they're more specific
rh = self.regex_handlers
trh = sorted(rh.get('all_events', []) + rh.get(event.eventtype(), []))
for handler in trh:
try:
prio, regex, method = handler
for line in event.arguments():
match = re.search(regex, line)
if match:
log.debug("pattern matched, calling "
"{0:s}".format(method))
# pattern matched, call method with pattern groups
ret = method(nick, userhost, event, admin,
match.groups())
if ret == 'NO MORE':
return
except Exception as e:
log.error("exception floated up to DrBotIrc!")
log.exception(e)
h = self.handlers
th = sorted(h.get('all_events', []) + h.get(event.eventtype(), []))
for handler in th:
try:
prio, method = handler
ret = method(connection, event)
if ret == 'NO MORE':
return
except Exception as e:
log.error("exception floated up to DrBotIrc!")
log.exception(e)
def xmlrpc_register_function(self, func, name):
"""Add a method to the XML-RPC interface.
Args:
func the method to register
name the name to expose the method as
"""
if func and self.xmlrpc:
if hasattr(func, '__call__'):
self.xmlrpc.register_function(func, name)
def _xmlrpc_listen(self):
"""Begin listening.
Hopefully this was called in a new thread.
"""
self.xmlrpc.serve_forever()
def try_to_replace_event_text_with_module_text(self, connection, event):
"""Do something very similar to _handle_event, but for recursion.
The intent here is that we replace [text] with whatever a module
provides to us.
Args:
connection source connection
event incoming event
"""
replies = []
nick = None
userhost = None
admin = False
if event.source() is not None:
nick = irclib.nm_to_n(event.source())
try:
userhost = irclib.nm_to_uh(event.source())
except IndexError: pass
try:
if userhost == self.config.get('dr.botzo', 'admin_userhost'):
admin = True
except NoOptionError: pass
# try regex handlers first, since they're more specific
rh = self.regex_handlers
trh = sorted(rh.get('all_events', []) + rh.get(event.eventtype(), []))
for handler in trh:
try:
prio, regex, method = handler
for line in event.arguments():
match = re.search(regex, line)
if match:
log.debug("pattern matched, calling "
"{0:s}".format(method))
# pattern matched, call method with pattern groups
ret = method(nick, userhost, event, admin,
match.groups())
if ret:
replies.append(ret)
except Exception as e:
log.error("exception floated up to DrBotIrc!")
log.exception(e)
h = self.handlers
th = sorted(h.get('all_events', []) + h.get(event.eventtype(), []))
for handler in th:
try:
prio, method = handler
ret = method(connection, event)
if ret:
replies.append(ret)
except Exception as e:
log.error("exception floated up to DrBotIrc!")
log.exception(e)
if len(replies):
event.arguments()[0] = '\n'.join(replies)
def try_alias_cmds(self, connection, event):
"""See if there is an alias ("!command") in the text, and if so
do alias manipulation before any other recursion or aliasing.
Args:
connection source connection
event incoming event
Returns:
The outcome of the alias command, if the text had one.
"""
try:
nick = irclib.nm_to_n(event.source())
except (IndexError, AttributeError):
nick = ''
try:
userhost = irclib.nm_to_uh(event.source())
except (IndexError, AttributeError):
userhost = ''
replypath = event.target()
try:
what = event.arguments()[0]
except (IndexError, AttributeError):
what = ''
admin_unlocked = False
# privmsg
if replypath == connection.get_nickname():
replypath = nick
try:
if userhost == self.config.get('dr.botzo', 'admin_userhost'):
admin_unlocked = True
except NoOptionError: pass
# first see if the aliases are being directly manipulated via add/remove
whats = what.split(' ')
if len(whats) <= 1:
return
try:
if whats[0] == '!alias' and whats[1] == 'add' and len(whats) >= 4:
if not self.config.has_section('Alias'):
self.config.add_section('Alias')
self.config.set('Alias', whats[2], ' '.join(whats[3:]))
replystr = "Added alias '{0:s}'.".format(whats[2])
return self.reply(event, replystr)
if whats[0] == '!alias' and whats[1] == 'remove' and len(whats) >= 3:
if not self.config.has_section('Alias'):
self.config.add_section('Alias')
if self.config.remove_option('Alias', whats[2]):
replystr = "Removed alias '{0:s}'.".format(whats[2])
return self.reply(event, replystr)
elif whats[0] == '!alias' and whats[1] == 'list':
try:
if len(whats) > 2:
alias = self.config.get('Alias', whats[2])
return self.reply(event, alias)
else:
alist = self.config.options('Alias')
alist.remove('debug')
alist.sort()
liststr = ', '.join(alist)
return self.reply(event, liststr)
except NoSectionError: pass
except NoOptionError: pass
except NoSectionError: pass
def try_alias(self, connection, event):
"""Try turning aliases into commands.
Args:
connection source connection
event incoming event
Returns:
The de-aliased event object.
"""
try:
what = event.arguments()[0]
alias_list = self.config.options('Alias')
for alias in alias_list:
alias_re = re.compile(alias, re.IGNORECASE)
if alias_re.search(what) and alias != 'debug':
# we found an alias for our given string, doing a replace
command = re.sub(alias, self.config.get('Alias', alias), what, flags=re.IGNORECASE)
event.arguments()[0] = command
# now we have to check it for recursions again
self.try_recursion(connection, event)
# i guess someone could have an alias of an alias... try again
return self.try_alias(connection, event)
except NoOptionError: pass
except NoSectionError: pass
except IndexError: pass
# if we got here, there are no matching aliases, so return what we got
return event
def try_recursion(self, connection, event):
"""Scan message for subcommands to execute and use as part of this command.
Upon seeing a line intended for this module, see if there are subcommands
that we should do what is basically a text replacement on. The intent is to
allow things like the following:
command arg1 [anothercommand arg1 arg2]
where the output of anothercommand is command's arg2..n.
Args:
connection source connection
event incoming event
"""
try:
# begin recursion search
attempt = event.arguments()[0]
start_idx = attempt.find('[')
subcmd = attempt[start_idx+1:]
end_idx = subcmd.rfind(']')
subcmd = subcmd[:end_idx]
if start_idx != -1 and end_idx != -1 and len(subcmd) > 0:
# found recursion candidate
# copy the event and see if IT has recursion to do
newevent = copy.deepcopy(event)
newevent.arguments()[0] = subcmd
newevent._recursing = True
self.try_recursion(connection, newevent)
# recursion over, check for aliases
self.try_alias(connection, newevent)
# now that we have a string that has been de-aliased and
# recursed all the way deeper into its text, see if any
# modules can do something with it. this calls the same
# event handlers in the same way as if this were a native
# event.
self.try_to_replace_event_text_with_module_text(connection, newevent)
# we have done all we can do with the sub-event. whatever
# the text of that event now is, we should replace the parent
# event's [] section with it.
oldtext = event.arguments()[0]
newtext = oldtext.replace('['+subcmd+']', newevent.arguments()[0])
event.arguments()[0] = newtext
# we have now resolved the []. recursion will unfold, replacing
# it further and further, until we eventually get back to the
# original irc event in _handle_event, which will do one
# last search on the text.
except IndexError: pass
def quit_irc(self, connection, msg):
"""Quit IRC, disconnect, shut everything down.
Args:
connection the connection to quit
msg the message to send while quitting
"""
for module in self.modlist:
module.save()
module.shutdown()
connection.quit(msg)
log.info(self.save_config())
self._xmlrpc_shutdown()
log.info("Bot shutting down.")
sys.exit()
def reply(self, event, replystr, stop=False):
"""Reply over IRC to replypath or return a string with the reply.
The primary utility for this is to properly handle recursion. The
recursion code in DrBotIRC will set up a couple hints that this method
picks up on and will appropriately send an IRC event or return a
string.
Unless you know what you are doing, the modules you write should use
this method rather than send a privmsg reply, as failing to call this
method will certainly have recursion do odd things with your module.
Args:
event incoming event
replystr the message to reply with
stop whether or not to let other handlers see this
Returns:
The replystr if the event is inside recursion, or, potentially,
"NO MORE" to stop other event handlers from acting.
"""
replypath = event.target()
# if this is a privmsg, reply to the sender
if replypath == self.server.get_nickname():
replypath = irclib.nm_to_n(event.source())
if replystr is not None:
if event._recursing:
return replystr
else:
replies = replystr.split('\n')
for reply in replies:
self.server.privmsg(replypath, reply)
if stop:
return "NO MORE"
def save_modules(self):
"""Call each module's save(), in case they have something to do."""
for module in self.modlist:
module.save()
def _xmlrpc_shutdown(self):
"""Shut down the XML-RPC server."""
if self.xmlrpc is not None:
self.xmlrpc.shutdown()
self.xmlrpc.server_close()
def save_config(self):
"""Write the config file to disk.
Returns:
Short string indicating success.
"""
with open('dr.botzo.cfg', 'w') as cfg:
self.config.write(cfg)
return "Saved config."
def load_module(self, modname):
"""Load a module (in both the python and dr.botzo sense) if not
already loaded.
Args:
modname the module to attempt to load
Returns:
String describing the outcome of the load attempt.
"""
for module in self.modlist:
if modname == module.__class__.__name__:
return "Module '{0:s}' is already loaded.".format(modname)
# not loaded, let's get to work
try:
modstr = 'modules.'+modname
__import__(modstr)
module = sys.modules[modstr]
botmod = eval('module.' + modname + '(self, self.config)')
self.modlist.append(botmod)
botmod.register_handlers()
# might as well add it to the list
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
modset.add(modname)
self.config.set('dr.botzo', 'module_list', ','.join(modset))
return "Module '{0:s}' loaded.".format(modname)
except ImportError as e:
log.error("Error loading '{0:s}'".format(modname))
log.exception(e)
return "Module '{0:s}' could not be loaded.".format(modname)
def unload_module(self, modname):
"""Attempt to unload and del a module if it's loaded.
Args:
modname the module to attempt to unload
Returns:
String describing the outcome of the unload attempt.
"""
modstr = 'modules.'+modname
for module in self.modlist:
if modname == module.__class__.__name__:
# do anything the module needs to do to clean up
module.save()
module.shutdown()
# remove module references
self.modlist.remove(module)
module.unregister_handlers()
# del it
del(sys.modules[modstr])
del(module)
# might as well remove it from the list
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
modset.remove(modname)
self.config.set('dr.botzo', 'module_list', ','.join(modset))
return "Module '{0:s}' unloaded.".format(modname)
# guess it was never loaded
return "Module '{0:s}' is not loaded.".format(modname)
def list_modules(self):
"""List loaded modules.
Returns:
A list of the loaded modules' names.
"""
modnames = []
for module in self.modlist:
modnames.append(module.__class__.__name__)
return modnames
def sigint_handler(self, signal, frame):
"""Cleanly shutdown on SIGINT."""
for module in self.modlist:
module.save()
module.shutdown()
log.info(self.save_config())
self._xmlrpc_shutdown()
sys.exit()
# vi:tabstop=4:expandtab:autoindent
# kate: indent-mode python;indent-width 4;replace-tabs on;