354 lines
13 KiB
Python
354 lines
13 KiB
Python
"""
|
|
DrBotIRC - customizations of irclib, for dr.botzo
|
|
Copyright (C) 2011 Brian S. Stephan
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
import bisect
|
|
from ConfigParser import NoOptionError, NoSectionError
|
|
import re
|
|
import signal
|
|
import socket
|
|
import sys
|
|
|
|
from extlib import irclib
|
|
|
|
class DrBotServerConnection(irclib.ServerConnection):
|
|
|
|
"""Subclass irclib's ServerConnection, in order to expand privmsg."""
|
|
|
|
nickmask = None
|
|
|
|
def __init__(self, irclibobj):
|
|
irclib.ServerConnection.__init__(self, irclibobj)
|
|
|
|
# temporary. hopefully on_welcome() will set this
|
|
self.nickmask = socket.getfqdn()
|
|
|
|
self.add_global_handler('welcome', self.on_welcome, 1)
|
|
|
|
def on_welcome(self, connection, event):
|
|
"""Set the nickmask that the ircd tells us is us."""
|
|
what = event.arguments()[0]
|
|
|
|
match = re.search('(\S+!\S+@\S+)', what)
|
|
if match:
|
|
self.nickmask = match.group(1)
|
|
|
|
def privmsg(self, target, text):
|
|
"""Send a PRIVMSG command."""
|
|
|
|
splitter = "..."
|
|
|
|
# split messages that are too long. Max length is 512.
|
|
# TODO: this does not properly handle when the nickmask has been
|
|
# masked by the ircd
|
|
# is the above still the case?
|
|
space = 512 - len('\r\n') - len(' PRIVMSG ') - len(target) - len(' :') - len(self.nickmask) - len(' :')
|
|
splitspace = space - (len(splitter) + 1)
|
|
|
|
if len(text) > space:
|
|
times = 1
|
|
|
|
while len(text) > splitspace:
|
|
splitpos = text.rfind(' ', 0, splitspace)
|
|
splittext = text[0:splitpos] + ' ' + splitter
|
|
text = splitter + ' ' + text[splitpos+1:]
|
|
self.send_raw("PRIVMSG %s :%s" % (target, splittext))
|
|
|
|
times = times + 1
|
|
if times >= 4:
|
|
return
|
|
|
|
# done splitting
|
|
self.send_raw("PRIVMSG %s :%s" % (target, text))
|
|
else:
|
|
self.send_raw("PRIVMSG %s :%s" % (target, text))
|
|
|
|
class DrBotIRC(irclib.IRC):
|
|
|
|
"""Implement a customized irclib IRC."""
|
|
|
|
modlist = []
|
|
internal_bus = []
|
|
config = None
|
|
server = None
|
|
|
|
def __init__(self, config):
|
|
irclib.IRC.__init__(self)
|
|
|
|
self.config = config
|
|
|
|
# handle SIGINT
|
|
signal.signal(signal.SIGINT, self.sigint_handler)
|
|
|
|
def server(self):
|
|
"""Create a DrBotServerConnection."""
|
|
self.server = DrBotServerConnection(self)
|
|
self.connections.append(self.server)
|
|
|
|
self.server.add_global_handler('pubmsg', self.on_pubmsg, 1)
|
|
self.server.add_global_handler('privmsg', self.on_pubmsg, 1)
|
|
|
|
return self.server
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
"""See if there is an alias ("!command") in the text, and if so, translate it into
|
|
an internal bot command and run it.
|
|
"""
|
|
|
|
nick = irclib.nm_to_n(event.source())
|
|
userhost = irclib.nm_to_uh(event.source())
|
|
replypath = event.target()
|
|
what = event.arguments()[0]
|
|
admin_unlocked = False
|
|
|
|
# privmsg
|
|
if replypath == connection.get_nickname():
|
|
replypath = nick
|
|
|
|
try:
|
|
if userhost == self.config.get('dr.botzo', 'admin_userhost'):
|
|
admin_unlocked = True
|
|
except NoOptionError: pass
|
|
|
|
# first see if the aliases are being directly manipulated via add/remove
|
|
whats = what.split(' ')
|
|
try:
|
|
if whats[0] == '!alias' and whats[1] == 'add' and len(whats) >= 4:
|
|
if not self.config.has_section('Alias'):
|
|
self.config.add_section('Alias')
|
|
|
|
self.config.set('Alias', whats[2], ' '.join(whats[3:]))
|
|
replystr = 'Added alias ' + whats[2] + '.'
|
|
return self.reply(connection, replypath, replystr)
|
|
if whats[0] == '!alias' and whats[1] == 'remove' and len(whats) >= 3:
|
|
if not self.config.has_section('Alias'):
|
|
self.config.add_section('Alias')
|
|
|
|
if self.config.remove_option('Alias', whats[2]):
|
|
replystr = 'Removed alias ' + whats[2] + '.'
|
|
return self.reply(connection, replypath, replystr)
|
|
elif whats[0] == '!alias' and whats[1] == 'list':
|
|
try:
|
|
if len(whats) > 2:
|
|
alias = self.config.get('Alias', whats[2])
|
|
return self.reply(connection, replypath, alias)
|
|
else:
|
|
alist = self.config.options('Alias')
|
|
alist.remove('debug')
|
|
alist.sort()
|
|
liststr = ', '.join(alist)
|
|
return self.reply(connection, replypath, liststr)
|
|
except NoSectionError: pass
|
|
except NoOptionError: pass
|
|
except NoSectionError: pass
|
|
|
|
return self.reply(connection, replypath, self.try_recursion(connection, event, nick, userhost, what, admin_unlocked))
|
|
|
|
def try_alias(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
# try doing alias work
|
|
try:
|
|
alias_list = self.config.options('Alias')
|
|
|
|
for alias in alias_list:
|
|
alias_re = re.compile(alias)
|
|
if alias_re.search(what):
|
|
# we found an alias for our given string, doing a replace
|
|
command = re.sub(alias, self.config.get('Alias', alias), what)
|
|
|
|
# i guess someone could have an alias of an alias... try again
|
|
command = self.try_alias(connection, event, nick, userhost, command, admin_unlocked)
|
|
return command
|
|
except NoOptionError: pass
|
|
except NoSectionError: pass
|
|
|
|
# if we got here, there are no matching aliases, so return what we got
|
|
return what
|
|
|
|
def reply(self, connection, replypath, replystr, stop_responding=False):
|
|
"""Reply over IRC to replypath or return a string with the reply."""
|
|
|
|
if replystr is not None:
|
|
if replypath is None:
|
|
return replystr
|
|
else:
|
|
replies = replystr.split('\n')
|
|
for reply in replies:
|
|
connection.privmsg(replypath, reply)
|
|
if stop_responding:
|
|
return "NO MORE"
|
|
|
|
def try_recursion(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Scan message for subcommands to execute and use as part of this command.
|
|
|
|
Upon seeing a line intended for this module, see if there are subcommands
|
|
that we should do what is basically a text replacement on. The intent is to
|
|
allow things like the following:
|
|
|
|
command arg1 [anothercommand arg1 arg2]
|
|
|
|
where the output of anothercommand is command's arg2..n.
|
|
"""
|
|
|
|
attempt = what
|
|
|
|
# check for aliases
|
|
attempt = self.try_alias(connection, event, nick, userhost, attempt, admin_unlocked)
|
|
|
|
# begin recursion search
|
|
|
|
start_idx = attempt.find('[')
|
|
subcmd = attempt[start_idx+1:]
|
|
end_idx = subcmd.rfind(']')
|
|
subcmd = subcmd[:end_idx]
|
|
|
|
if start_idx == -1 or end_idx == -1 or len(subcmd) == 0:
|
|
# no recursion, so see if there's a module to handle this
|
|
return self.scan_modules(connection, event, nick, userhost, attempt, admin_unlocked)
|
|
else:
|
|
# found recursion, search again
|
|
ret = self.try_recursion(connection, event, nick, userhost, subcmd, admin_unlocked)
|
|
if ret is not None:
|
|
# recursion search had a hit, replace [foo] with it and re-recurse
|
|
return self.try_recursion(connection, event, nick, userhost, attempt.replace('['+subcmd+']', ret), admin_unlocked)
|
|
else:
|
|
# recursion search didn't have a hit, so see if there's a module to handle this
|
|
return self.scan_modules(connection, event, nick, userhost, attempt, admin_unlocked)
|
|
|
|
def scan_modules(self, connection, event, nick, userhost, attempt, admin_unlocked):
|
|
"""Walk the loaded modules, see if any reply to input text."""
|
|
|
|
# aliases resolved. run result against each module
|
|
for (priority, handler) in self.internal_bus:
|
|
try:
|
|
ret = handler(connection, event, nick, userhost, attempt, admin_unlocked)
|
|
if ret is not None:
|
|
# a module had a result for us, post-alias, so return it
|
|
# TODO: scan all modules with compounding results
|
|
return ret
|
|
except Exception as e:
|
|
print('EXCEPTION: ' + str(e))
|
|
|
|
def quit_irc(self, connection, msg):
|
|
for module in self.modlist:
|
|
module.save()
|
|
module.shutdown()
|
|
|
|
connection.quit(msg)
|
|
print(self.save_config())
|
|
sys.exit()
|
|
|
|
def save_modules(self):
|
|
for module in self.modlist:
|
|
module.save()
|
|
|
|
def save_config(self):
|
|
with open('dr.botzo.cfg', 'w') as cfg:
|
|
self.config.write(cfg)
|
|
|
|
return 'Saved config.'
|
|
|
|
def load_module(self, modname):
|
|
"""Load a module (in both the python and dr.botzo sense) if not
|
|
already loaded.
|
|
"""
|
|
|
|
for module in self.modlist:
|
|
if modname == module.__class__.__name__:
|
|
return 'Module ' + modname + ' is already loaded.'
|
|
|
|
# not loaded, let's get to work
|
|
try:
|
|
modstr = 'modules.'+modname
|
|
__import__(modstr)
|
|
module = sys.modules[modstr]
|
|
botmod = eval('module.' + modname + '(self, self.config, self.server)')
|
|
self.modlist.append(botmod)
|
|
bisect.insort(self.internal_bus, (botmod.priority(), botmod.do))
|
|
botmod.register_handlers(self.server)
|
|
|
|
# might as well add it to the list
|
|
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
|
|
modset.add(modname)
|
|
self.config.set('dr.botzo', 'module_list', ','.join(modset))
|
|
|
|
return 'Module ' + modname + ' loaded.'
|
|
except ImportError:
|
|
return 'Module ' + modname + ' not found.'
|
|
|
|
def unload_module(self, modname):
|
|
"""Attempt to unload and del a module if it's loaded."""
|
|
|
|
modstr = 'modules.'+modname
|
|
for module in self.modlist:
|
|
if modname == module.__class__.__name__:
|
|
# do anything the module needs to do to clean up
|
|
module.save()
|
|
module.shutdown()
|
|
|
|
# remove module references
|
|
self.modlist.remove(module)
|
|
self.internal_bus.remove((module.priority(), module.do))
|
|
module.unregister_handlers()
|
|
|
|
# del it
|
|
del(sys.modules[modstr])
|
|
del(module)
|
|
|
|
# might as well remove it from the list
|
|
modset = set(self.config.get('dr.botzo', 'module_list').split(','))
|
|
modset.remove(modname)
|
|
self.config.set('dr.botzo', 'module_list', ','.join(modset))
|
|
|
|
return 'Module ' + modname + ' unloaded.'
|
|
|
|
# guess it was never loaded
|
|
return 'Module ' + modname + ' is not loaded.'
|
|
|
|
def reload_module(self, modname):
|
|
"""Attempt to reload a module, by removing it from memory and then
|
|
re-initializing it.
|
|
"""
|
|
|
|
ret = self.unload_module(modname)
|
|
if ret == 'Module ' + modname + ' unloaded.':
|
|
ret = self.load_module(modname)
|
|
if ret == 'Module ' + modname + ' loaded.':
|
|
return 'Module ' + modname + ' reloaded.'
|
|
|
|
return 'Module ' + modname + ' reload failed. Check the console.'
|
|
|
|
def list_modules(self):
|
|
"""List loaded modules."""
|
|
|
|
modnames = []
|
|
for module in self.modlist:
|
|
modnames.append(module.__class__.__name__)
|
|
|
|
return modnames
|
|
|
|
# SIGINT signal handler
|
|
def sigint_handler(self, signal, frame):
|
|
for module in self.modlist:
|
|
module.save()
|
|
module.shutdown()
|
|
|
|
print(self.save_config())
|
|
sys.exit()
|
|
|
|
# vi:tabstop=4:expandtab:autoindent
|
|
# kate: indent-mode python;indent-width 4;replace-tabs on;
|