dr.botzo/dr.botzo.py

630 lines
25 KiB
Python

# dr.botzo - a pluggable IRC bot written in Python
# Copyright (C) 2010 Brian S. Stephan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from datetime import datetime
import os
import random
import re
import sys
from urllib2 import urlopen
from urllib import urlencode
from dateutil.parser import *
from dateutil.relativedelta import *
from dateutil.tz import *
import irclib
class Module(object):
"""Base class used for creating classes that have real functionality.
"""
def __init__(self, config, server, modlist):
"""Constructor for a feature module. Inheritors should not do anything special
here, instead they should implement register_handlers and do, or else this will
be a very uneventful affair.
Classes that are interested in allowing an indirect call to their do routine
should add themselves to modlist inside their __init__. This will allow other
modules to call do and see if anything can handle text they may have seen (such
as in recursive commands).
"""
self.config = config
self.modlist = modlist
self.register_handlers(server)
def register_handlers(self, server):
"""This is called by __init__ and sets up server.add_global_handlers. Classes
inheriting from Module should implement this and set up the appropriate handlers,
e.g.:
server.add_global_handler('privmsg', self.on_privmsg)
Module.on_pubmsg and Module.on_privmsg are defined so far, the rest, you're on your
own.
"""
print "looks like someone forgot to implement register_handlers!"
def on_pubmsg(self, connection, event):
"""Does some variable setup and initial sanity checking before calling Module.do,
which should be implemented by subclasses and what can be ultimately responsible
for the work. Of course, you are free to reimplement on_pubmsg on your own too.
"""
nick = irclib.nm_to_n(event.source())
userhost = irclib.nm_to_uh(event.source())
replypath = event.target()
what = event.arguments()[0]
admin_unlocked = False
try:
if userhost == self.config.get('admin', 'userhost'):
admin_unlocked = True
except NoOptionError: pass
# only do commands if the bot has been addressed directly
addressed_pattern = '^' + connection.get_nickname() + '[:,]?\s+'
addressed_re = re.compile(addressed_pattern)
if not addressed_re.match(what):
return
else:
what = addressed_re.sub('', what)
self.do(connection, event, nick, userhost, replypath, what, admin_unlocked)
def on_privmsg(self, connection, event):
"""Does some variable setup and initial sanity checking before calling Module.do,
which should be implemented by subclasses and what can be ultimately responsible
for the work. Of course, you are free to reimplement on_privmsg on your own too.
"""
nick = irclib.nm_to_n(event.source())
userhost = irclib.nm_to_uh(event.source())
replypath = nick
what = event.arguments()[0]
admin_unlocked = False
try:
if userhost == self.config.get('admin', 'userhost'):
admin_unlocked = True
except NoOptionError: pass
self.do(connection, event, nick, userhost, replypath, what, admin_unlocked)
def try_recursion(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
"""Upon seeing a line intended for this module, see if there are subcommands
that we should do what is basically a text replacement on. The intent is to
allow things like the following:
command arg1 [anothercommand arg1 arg2]
where the output of anothercommand is command's arg2..n. It's mostly for
amusement purposes, but maybe there are legitimate uses. This is intended to
be attempted after you've determined the line should be handled by your module.
"""
start_idx = what.find('[')
subcmd = what[start_idx+1:]
end_idx = subcmd.rfind(']')
subcmd = subcmd[:end_idx]
attempt = what
if start_idx == -1 or end_idx == -1:
# no nested commands at all if replypath is a real value, so don't do a damn thing
if replypath is not None:
return attempt
# no more replacements found, see if what we had is workable
else:
for module in self.modlist:
ret = module.do(connection, event, nick, userhost, None, attempt, admin_unlocked)
if ret is not None:
return ret
# if we got here, it's not workable. just return what we got
return attempt
else:
# we have a subcmd, see if there's another one nested
ret = self.try_recursion(connection, event, nick, userhost, None, subcmd, admin_unlocked)
if ret is not None:
blarg = attempt.replace('['+subcmd+']', ret)
if replypath is not None:
return blarg
else:
return self.try_recursion(connection, event, nick, userhost, None, blarg, admin_unlocked)
else:
return attempt
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
"""Implement this method in your subclass to have a fairly-automatic hook into
IRC functionality. This is called by the default on_pubmsg and on_privmsg
"""
print "looks like someone forgot to implement do!"
class GoogleTranslate(Module):
"""Class that translates text via Google Translate.
http://code.google.com/apis/ajaxlanguage/documentation/
"""
def __init__(self, config, server, modlist):
super(GoogleTranslate, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'translate' and len(whats) >= 4:
if replypath is not None:
what = self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked)
whats = what.split(' ')
fromlang = whats[1]
tolang = whats[2]
text = ' '.join(whats[3:])
langpair = '%s|%s' % (fromlang, tolang)
gt_url = 'http://ajax.googleapis.com/ajax/services/language/translate?'
params = urlencode( (('v', 1.0), ('q', text), ('langpair', langpair),) )
url = gt_url + params
content = urlopen(url).read()
start_idx = content.find('"translatedText":"')+18
translation = content[start_idx:]
end_idx = translation.find('"}, "')
translation = translation[:end_idx]
# do some text conversion
translation = translation.replace('\\u0026quot;', '"')
translation = translation.replace('\\u0026amp;', '&')
translation = translation.replace('\\u003c', '<')
translation = translation.replace('\\u0026lt;', '<')
translation = translation.replace('\\u003e', '>')
translation = translation.replace('\\u0026gt;', '>')
translation = translation.replace('\\u0026#39;', '\'')
if replypath is None:
return translation
else:
connection.privmsg(replypath, translation)
class Countdown(Module):
"""Class that adds a countdown item to the bot
"""
def __init__(self, config, server, modlist):
super(Countdown, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'countdown' and len(whats) >= 2:
if replypath is not None:
what = self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked)
whats = what.split(' ')
if whats[1] == 'add' and len(whats) >= 4:
item = whats[2]
target = parse(' '.join(whats[3:]), default=datetime.now().replace(tzinfo=tzlocal()))
if not self.config.has_section('countdown'):
self.config.add_section('countdown')
self.config.set('countdown', item, target.astimezone(tzutc()).isoformat())
replystr = 'added countdown item ' + whats[2]
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
elif whats[1] == 'remove':
try:
if self.config.remove_option('countdown', whats[2]):
replystr = 'removed countdown item ' + whats[2]
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
except NoSectionError: pass
elif whats[1] == 'list':
try:
cdlist = self.config.options('countdown')
cdlist.remove('debug')
cdlist.sort()
liststr = ', '.join(cdlist)
if replypath is None:
return liststr
else:
connection.privmsg(replypath, liststr)
except NoSectionError: pass
else:
try:
timestr = self.config.get('countdown', whats[1])
time = parse(timestr)
rdelta = relativedelta(time, datetime.now().replace(tzinfo=tzlocal()))
relstr = whats[1] + ' will occur in '
if rdelta.years != 0:
relstr += str(rdelta.years) + ' years '
if rdelta.months != 0:
relstr += str(rdelta.months) + ' months '
if rdelta.days != 0:
relstr += str(rdelta.days) + ' days '
if rdelta.hours != 0:
relstr += str(rdelta.hours) + ' hours '
if rdelta.minutes != 0:
relstr += str(rdelta.minutes) + ' minutes '
if rdelta.seconds != 0:
relstr += str(rdelta.seconds) + ' seconds'
#relstr += ' (' + timestr + ')'
if replypath is None:
return relstr
else:
connection.privmsg(replypath, relstr)
except NoOptionError: pass
class Dice(Module):
"""Rolls dice, for RPGs mostly
"""
def __init__(self, config, server, modlist):
super(Dice, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
overallroll = what
rolls = re.split(';\s*', overallroll)
for roll in rolls:
pattern = '^(?:(\d+)#)?(?:(\d+)/)?(\d+)?d(\d+)(?:(\+|\-)(\d+))?(?:\s+(.*))?'
regex = re.compile(pattern)
matches = regex.search(roll)
if matches is not None:
# set variables including defaults for unspecified stuff
faces = int(matches.group(4))
comment = matches.group(7)
if matches.group(1) is None:
times = 1
else:
times = int(matches.group(1))
if matches.group(3) is None:
dice = 1
else:
dice = int(matches.group(3))
if matches.group(2) is None:
top = dice
else:
top = int(matches.group(2))
if matches.group(5) is None or matches.group(6) is None:
modifier = 0
else:
if str(matches.group(5)) == '-':
modifier = -1 * int(matches.group(6))
else:
modifier = int(matches.group(6))
result = ''
if comment is not None:
result += comment + ': '
for t in range(times):
ressubstr = ""
rolls = []
for d in range(dice):
rolls.append(str(random.randint(1, faces)))
rolls.sort()
rolls.reverse()
ressubstr = ','.join(rolls[0:top])
sum = 0
for r in rolls[0:top]:
sum += int(r)
sumplus = sum + modifier
result += str(sumplus) + ' [' + ressubstr
if modifier != 0:
if modifier > 0:
result += ' + ' + str(modifier)
else:
result += ' - ' + str(-1 * modifier)
result += ']'
if t != times-1:
result += ', '
if replypath is None:
return result
else:
connection.privmsg(replypath, result)
class Seen(Module):
"""Keeps track of when people say things in public channels, and reports on when
they last said something.
"""
def __init__(self, config, server, modlist):
super(Seen, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
# whatever it is, store it
if not self.config.has_section('seen'):
self.config.add_section('seen')
self.config.set('seen', nick, userhost + '|:|' + datetime.utcnow().isoformat() + '|:|' + what)
# also see if it's a query
whats = what.split(' ')
if whats[0] == 'seen' and len(whats) >= 2:
if replypath is not None:
what = self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked)
whats = what.split(' ')
query = whats[1]
if query != 'debug':
try:
seendata = self.config.get('seen', query).split('|:|')
converted = datetime.strptime(seendata[1], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzutc())
replystr = 'last saw ' + query + ' at ' + converted.astimezone(tzlocal()).strftime("%Y/%m/%d %H:%M:%S %Z") + ' saying \'' + seendata[2] + '\''
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
except NoOptionError: pass
class IrcAdmin(Module):
"""all kinds of miscellaneous irc stuff
"""
def __init__(self, config, server, modlist):
super(IrcAdmin, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('welcome', self.on_connect)
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def on_connect(self, connection, event):
"""handler for when the bot has connected to IRC
"""
# user modes
try:
usermode = self.config.get('IRC', 'usermode')
connection.mode(botnick, usermode)
except NoOptionError: pass
# join the specified channels
try:
autojoins = self.config.get('channels', 'autojoin').split(',')
for channel in autojoins:
if irclib.is_channel(channel):
connection.join(channel)
except NoOptionError: pass
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
# this could be more sophisticated, but i'm too lazy to do a real port
# right now.
# TODO: sophisticate. also, document all of these
self.sub_join_channel(connection, event, nick, userhost, replypath, what, admin_unlocked)
self.sub_part_channel(connection, event, nick, userhost, replypath, what, admin_unlocked)
self.sub_quit_channel(connection, event, nick, userhost, replypath, what, admin_unlocked)
self.sub_autojoin_manipulate(connection, event, nick, userhost, replypath, what, admin_unlocked)
self.sub_save_config(connection, event, nick, userhost, replypath, what, admin_unlocked)
self.sub_change_nick(connection, event, nick, userhost, replypath, what, admin_unlocked)
def sub_join_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'join' and admin_unlocked and len(whats) >= 2:
channel = whats[1]
if irclib.is_channel(channel):
connection.join(channel)
replystr = 'joined ' + channel
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
def sub_part_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'part' and admin_unlocked and len(whats) >= 2:
channel = whats[1]
if irclib.is_channel(channel):
connection.part(channel, ' '.join(whats[2:]))
replystr = 'parted ' + channel
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
def sub_quit_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'quit' and admin_unlocked:
if replypath is not None:
connection.privmsg(replypath, 'quitting')
connection.quit(' '.join(whats[1:]))
with open('dr.botzo.cfg', 'w') as cfg:
self.config.write(cfg)
def sub_autojoin_manipulate(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'autojoin' and admin_unlocked and len(whats) >= 3:
if whats[1] == 'add':
try:
# get existing list
channel = whats[2]
if irclib.is_channel(channel):
channelset = set(self.config.get('channels', 'autojoin').split(','))
channelset.add(channel)
self.config.set('channels', 'autojoin', ','.join(channelset))
replystr = 'added ' + channel + ' to autojoin'
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
except NoOptionError: pass
elif whats[1] == 'remove':
try:
# get existing list
channel = whats[2]
if irclib.is_channel(channel):
channelset = set(self.config.get('channels', 'autojoin').split(','))
channelset.discard(channel)
self.config.set('channels', 'autojoin', ','.join(channelset))
replystr = 'removed ' + channel + ' from autojoin'
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
except NoOptionError: pass
def sub_save_config(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'save' and admin_unlocked:
with open('dr.botzo.cfg', 'w') as cfg:
self.config.write(cfg)
replystr = 'saved config file'
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
def sub_change_nick(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
if whats[0] == 'nick' and admin_unlocked and len(whats) >= 2:
newnick = whats[1]
connection.nick(newnick)
self.config.set('IRC', 'nick', newnick)
replystr = 'changed nickname'
if replypath is None:
return replystr
else:
connection.privmsg(replypath, replystr)
class FactFile(Module):
"""Returns a random fact/quote/whatever from one or more files
"""
def __init__(self, config, server, modlist):
super(FactFile, self).__init__(config, server, modlist)
modlist.append(self)
def register_handlers(self, server):
server.add_global_handler('pubmsg', self.on_pubmsg)
server.add_global_handler('privmsg', self.on_privmsg)
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
whats = what.split(' ')
try:
filename = self.config.get('fact', whats[0])
if replypath is not None:
what = self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked)
whats = what.split(' ')
# open file
if os.path.isfile(filename):
# http://www.regexprn.com/2008/11/read-random-line-in-large-file-in.html
with open(filename, 'r') as file:
facts = file.readlines()
to_print = facts[random.randint(0, len(facts))]
# return text
if replypath is None:
return to_print.rstrip()
else:
connection.privmsg(replypath, to_print.rstrip())
else:
print('filename in config file for \'' + whats[0] + '\' is wrong')
except NoOptionError: pass
class DrBotServerConnection(irclib.ServerConnection):
def privmsg(self, target, text):
"""Send a PRIVMSG command."""
# TODO: length limiting or splitting
self.send_raw("PRIVMSG %s :%s" % (target, text))
class DrBotIRC(irclib.IRC):
def server(self):
c = DrBotServerConnection(self)
self.connections.append(c)
return c
#####
# init
#####
# read config file
config = ConfigParser({'debug': 'false'})
config.read([os.path.expanduser('~/.dr.botzo.cfg'), 'dr.botzo.cfg'])
# load necessary options
try:
# load connection info
botserver = config.get('IRC', 'server')
botport = config.getint('IRC', 'port')
botnick = config.get('IRC', 'nick')
botircname = config.get('IRC', 'name')
except NoSectionError as e:
sys.exit("Aborted due to error with necessary configuration: " + str(e))
except NoOptionError as e:
sys.exit("Aborted due to error with necessary configuration: " + str(e))
# load additional options
irclib.DEBUG = config.getboolean('IRC', 'debug')
# start up the IRC bot
# create IRC and server objects and connect
irc = DrBotIRC()
server = irc.server().connect(botserver, botport, botnick, botircname)
modlist = []
count = Countdown(config, server, modlist)
dice = Dice(config, server, modlist)
fact = FactFile(config, server, modlist)
admin = IrcAdmin(config, server, modlist)
gt = GoogleTranslate(config, server, modlist)
seen = Seen(config, server, modlist)
# loop forever
irc.process_forever()
# vi:tabstop=4:expandtab:autoindent