# dr.botzo - a pluggable IRC bot written in Python # Copyright (C) 2010 Brian S. Stephan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from ConfigParser import ConfigParser, NoSectionError, NoOptionError from datetime import datetime import os import random import re import sys from urllib2 import urlopen from urllib import urlencode from dateutil.parser import * from dateutil.relativedelta import * from dateutil.tz import * from irclib import irclib class Module(object): # Base class used for creating classes that have real functionality. def __init__(self, config, server, modlist): # Constructor for a feature module. Inheritors should not do anything special # here, instead they should implement register_handlers and do, or else this will # be a very uneventful affair. # # Classes that are interested in allowing an indirect call to their do routine # should add themselves to modlist inside their __init__. This will allow other # modules to call do and see if anything can handle text they may have seen (such # as in recursive commands). self.config = config self.modlist = modlist self.register_handlers(server) def register_handlers(self, server): # This is called by __init__ and sets up server.add_global_handlers. Classes # inheriting from Module should implement this and set up the appropriate handlers, # e.g.: # # server.add_global_handler('privmsg', self.on_privmsg) # # Module.on_pubmsg and Module.on_privmsg are defined so far, the rest, you're on your # own. print "looks like someone forgot to implement register_handlers!" def on_pubmsg(self, connection, event): # Does some variable setup and initial sanity checking before calling Module.do, # which should be implemented by subclasses and what can be ultimately responsible # for the work. Of course, you are free to reimplement on_pubmsg on your own too. nick = irclib.nm_to_n(event.source()) userhost = irclib.nm_to_uh(event.source()) replypath = event.target() what = event.arguments()[0] admin_unlocked = False try: if userhost == self.config.get('admin', 'userhost'): admin_unlocked = True except NoOptionError: pass # only do commands if the bot has been addressed directly addressed_pattern = '^' + connection.get_nickname() + '[:,]?\s+' addressed_re = re.compile(addressed_pattern) if not addressed_re.match(what): return else: what = addressed_re.sub('', what) self.do(connection, event, nick, userhost, replypath, what, admin_unlocked) def on_privmsg(self, connection, event): # Does some variable setup and initial sanity checking before calling Module.do, # which should be implemented by subclasses and what can be ultimately responsible # for the work. Of course, you are free to reimplement on_privmsg on your own too. nick = irclib.nm_to_n(event.source()) userhost = irclib.nm_to_uh(event.source()) replypath = nick what = event.arguments()[0] admin_unlocked = False try: if userhost == self.config.get('admin', 'userhost'): admin_unlocked = True except NoOptionError: pass if replypath is not None: what = self.try_recursion(connection, event, nick, userhost, replypath, what, admin_unlocked) self.do(connection, event, nick, userhost, replypath, what, admin_unlocked) def try_recursion(self, connection, event, nick, userhost, replypath, what, admin_unlocked): # Upon seeing a line intended for this module, see if there are subcommands # that we should do what is basically a text replacement on. The intent is to # allow things like the following: # # command arg1 [anothercommand arg1 arg2] # # where the output of anothercommand is command's arg2..n. It's mostly for # amusement purposes, but maybe there are legitimate uses. This is intended to # be attempted after you've determined the line should be handled by your module. start_idx = what.find('[') subcmd = what[start_idx+1:] end_idx = subcmd.rfind(']') subcmd = subcmd[:end_idx] attempt = what if start_idx == -1 or end_idx == -1: # no nested commands at all if replypath is a real value, so don't do a damn thing if replypath is not None: return attempt # no more replacements found, see if what we had is workable else: for module in self.modlist: ret = module.do(connection, event, nick, userhost, None, attempt, admin_unlocked) if ret is not None: return ret # if we got here, it's not workable. just return what we got return attempt else: # we have a subcmd, see if there's another one nested ret = self.try_recursion(connection, event, nick, userhost, None, subcmd, admin_unlocked) if ret is not None: blarg = attempt.replace('['+subcmd+']', ret) if replypath is not None: return blarg else: return self.try_recursion(connection, event, nick, userhost, None, blarg, admin_unlocked) else: return attempt def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): # Implement this method in your subclass to have a fairly-automatic hook into # IRC functionality. This is called by the default on_pubmsg and on_privmsg print "looks like someone forgot to implement do!" class GoogleTranslate(Module): # Class that translates text via Google Translate. # # http://code.google.com/apis/ajaxlanguage/documentation/ def __init__(self, config, server, modlist): super(GoogleTranslate, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'translate' and len(whats) >= 4: fromlang = whats[1] tolang = whats[2] text = ' '.join(whats[3:]) langpair = '%s|%s' % (fromlang, tolang) gt_url = 'http://ajax.googleapis.com/ajax/services/language/translate?' params = urlencode( (('v', 1.0), ('q', text), ('langpair', langpair),) ) url = gt_url + params content = urlopen(url).read() start_idx = content.find('"translatedText":"')+18 translation = content[start_idx:] end_idx = translation.find('"}, "') translation = translation[:end_idx] # do some text conversion translation = translation.replace('\\u0026quot;', '"') translation = translation.replace('\\u0026amp;', '&') translation = translation.replace('\\u003c', '<') translation = translation.replace('\\u0026lt;', '<') translation = translation.replace('\\u003e', '>') translation = translation.replace('\\u0026gt;', '>') translation = translation.replace('\\u0026#39;', '\'') if replypath is None: return translation else: connection.privmsg(replypath, translation) class Countdown(Module): # Class that adds a countdown item to the bot def __init__(self, config, server, modlist): super(Countdown, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'countdown' and len(whats) >= 2: if whats[1] == 'add' and len(whats) >= 4: item = whats[2] target = parse(' '.join(whats[3:]), default=datetime.now().replace(tzinfo=tzlocal())) if not self.config.has_section('countdown'): self.config.add_section('countdown') self.config.set('countdown', item, target.astimezone(tzutc()).isoformat()) replystr = 'added countdown item ' + whats[2] if replypath is None: return replystr else: connection.privmsg(replypath, replystr) elif whats[1] == 'remove': try: if self.config.remove_option('countdown', whats[2]): replystr = 'removed countdown item ' + whats[2] if replypath is None: return replystr else: connection.privmsg(replypath, replystr) except NoSectionError: pass elif whats[1] == 'list': try: cdlist = self.config.options('countdown') cdlist.remove('debug') cdlist.sort() liststr = ', '.join(cdlist) if replypath is None: return liststr else: connection.privmsg(replypath, liststr) except NoSectionError: pass else: try: timestr = self.config.get('countdown', whats[1]) time = parse(timestr) rdelta = relativedelta(time, datetime.now().replace(tzinfo=tzlocal())) relstr = whats[1] + ' will occur in ' if rdelta.years != 0: relstr += str(rdelta.years) + ' years ' if rdelta.months != 0: relstr += str(rdelta.months) + ' months ' if rdelta.days != 0: relstr += str(rdelta.days) + ' days ' if rdelta.hours != 0: relstr += str(rdelta.hours) + ' hours ' if rdelta.minutes != 0: relstr += str(rdelta.minutes) + ' minutes ' if rdelta.seconds != 0: relstr += str(rdelta.seconds) + ' seconds' #relstr += ' (' + timestr + ')' if replypath is None: return relstr else: connection.privmsg(replypath, relstr) except NoOptionError: pass class Dice(Module): # Rolls dice, for RPGs mostly def __init__(self, config, server, modlist): super(Dice, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): overallroll = what rolls = re.split(';\s*', overallroll) for roll in rolls: pattern = '^(?:(\d+)#)?(?:(\d+)/)?(\d+)?d(\d+)(?:(\+|\-)(\d+))?(?:\s+(.*))?' regex = re.compile(pattern) matches = regex.search(roll) if matches is not None: # set variables including defaults for unspecified stuff faces = int(matches.group(4)) comment = matches.group(7) if matches.group(1) is None: times = 1 else: times = int(matches.group(1)) if matches.group(3) is None: dice = 1 else: dice = int(matches.group(3)) if matches.group(2) is None: top = dice else: top = int(matches.group(2)) if matches.group(5) is None or matches.group(6) is None: modifier = 0 else: if str(matches.group(5)) == '-': modifier = -1 * int(matches.group(6)) else: modifier = int(matches.group(6)) result = '' if comment is not None: result += comment + ': ' for t in range(times): ressubstr = "" rolls = [] for d in range(dice): rolls.append(str(random.randint(1, faces))) rolls.sort() rolls.reverse() ressubstr = ','.join(rolls[0:top]) sum = 0 for r in rolls[0:top]: sum += int(r) sumplus = sum + modifier result += str(sumplus) + ' [' + ressubstr if modifier != 0: if modifier > 0: result += ' + ' + str(modifier) else: result += ' - ' + str(-1 * modifier) result += ']' if t != times-1: result += ', ' if replypath is None: return result else: connection.privmsg(replypath, result) class Seen(Module): # Keeps track of when people say things in public channels, and reports on when # they last said something. def __init__(self, config, server, modlist): super(Seen, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): # whatever it is, store it if not self.config.has_section('seen'): self.config.add_section('seen') self.config.set('seen', nick, userhost + '|:|' + datetime.utcnow().isoformat() + '|:|' + what) # also see if it's a query whats = what.split(' ') if whats[0] == 'seen' and len(whats) >= 2: query = whats[1] if query != 'debug': try: seendata = self.config.get('seen', query).split('|:|') converted = datetime.strptime(seendata[1], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzutc()) replystr = 'last saw ' + query + ' at ' + converted.astimezone(tzlocal()).strftime("%Y/%m/%d %H:%M:%S %Z") + ' saying \'' + seendata[2] + '\'' if replypath is None: return replystr else: connection.privmsg(replypath, replystr) except NoOptionError: pass class IrcAdmin(Module): # All kinds of miscellaneous irc stuff def __init__(self, config, server, modlist): super(IrcAdmin, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('welcome', self.on_connect) server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def on_connect(self, connection, event): """handler for when the bot has connected to IRC """ # user modes try: usermode = self.config.get('IRC', 'usermode') connection.mode(botnick, usermode) except NoOptionError: pass # join the specified channels try: autojoins = self.config.get('channels', 'autojoin').split(',') for channel in autojoins: if irclib.is_channel(channel): connection.join(channel) except NoOptionError: pass def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): # this could be more sophisticated, but i'm too lazy to do a real port # right now. # TODO: sophisticate. also, document all of these self.sub_join_channel(connection, event, nick, userhost, replypath, what, admin_unlocked) self.sub_part_channel(connection, event, nick, userhost, replypath, what, admin_unlocked) self.sub_quit_channel(connection, event, nick, userhost, replypath, what, admin_unlocked) self.sub_autojoin_manipulate(connection, event, nick, userhost, replypath, what, admin_unlocked) self.sub_save_config(connection, event, nick, userhost, replypath, what, admin_unlocked) self.sub_change_nick(connection, event, nick, userhost, replypath, what, admin_unlocked) def sub_join_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'join' and admin_unlocked and len(whats) >= 2: channel = whats[1] if irclib.is_channel(channel): connection.join(channel) replystr = 'joined ' + channel if replypath is None: return replystr else: connection.privmsg(replypath, replystr) def sub_part_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'part' and admin_unlocked and len(whats) >= 2: channel = whats[1] if irclib.is_channel(channel): connection.part(channel, ' '.join(whats[2:])) replystr = 'parted ' + channel if replypath is None: return replystr else: connection.privmsg(replypath, replystr) def sub_quit_channel(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'quit' and admin_unlocked: if replypath is not None: connection.privmsg(replypath, 'quitting') connection.quit(' '.join(whats[1:])) with open('dr.botzo.cfg', 'w') as cfg: self.config.write(cfg) def sub_autojoin_manipulate(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'autojoin' and admin_unlocked and len(whats) >= 3: if whats[1] == 'add': try: # get existing list channel = whats[2] if irclib.is_channel(channel): channelset = set(self.config.get('channels', 'autojoin').split(',')) channelset.add(channel) self.config.set('channels', 'autojoin', ','.join(channelset)) replystr = 'added ' + channel + ' to autojoin' if replypath is None: return replystr else: connection.privmsg(replypath, replystr) except NoOptionError: pass elif whats[1] == 'remove': try: # get existing list channel = whats[2] if irclib.is_channel(channel): channelset = set(self.config.get('channels', 'autojoin').split(',')) channelset.discard(channel) self.config.set('channels', 'autojoin', ','.join(channelset)) replystr = 'removed ' + channel + ' from autojoin' if replypath is None: return replystr else: connection.privmsg(replypath, replystr) except NoOptionError: pass def sub_save_config(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'save' and admin_unlocked: with open('dr.botzo.cfg', 'w') as cfg: self.config.write(cfg) replystr = 'saved config file' if replypath is None: return replystr else: connection.privmsg(replypath, replystr) def sub_change_nick(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') if whats[0] == 'nick' and admin_unlocked and len(whats) >= 2: newnick = whats[1] connection.nick(newnick) self.config.set('IRC', 'nick', newnick) replystr = 'changed nickname' if replypath is None: return replystr else: connection.privmsg(replypath, replystr) class FactFile(Module): # Returns a random fact/quote/whatever from one or more files def __init__(self, config, server, modlist): super(FactFile, self).__init__(config, server, modlist) modlist.append(self) def register_handlers(self, server): server.add_global_handler('pubmsg', self.on_pubmsg) server.add_global_handler('privmsg', self.on_privmsg) def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked): whats = what.split(' ') try: filename = self.config.get('fact', whats[0]) # open file if os.path.isfile(filename): # http://www.regexprn.com/2008/11/read-random-line-in-large-file-in.html with open(filename, 'r') as file: facts = file.readlines() to_print = facts[random.randint(0, len(facts))] # return text if replypath is None: return to_print.rstrip() else: connection.privmsg(replypath, to_print.rstrip()) else: print('filename in config file for \'' + whats[0] + '\' is wrong') except NoOptionError: pass class DrBotServerConnection(irclib.ServerConnection): def privmsg(self, target, text): # Send a PRIVMSG command. # TODO: length limiting or splitting self.send_raw("PRIVMSG %s :%s" % (target, text)) class DrBotIRC(irclib.IRC): def server(self): c = DrBotServerConnection(self) self.connections.append(c) return c # read config file config = ConfigParser({'debug': 'false'}) config.read([os.path.expanduser('~/.dr.botzo.cfg'), 'dr.botzo.cfg']) # load necessary options try: # load connection info botserver = config.get('IRC', 'server') botport = config.getint('IRC', 'port') botnick = config.get('IRC', 'nick') botircname = config.get('IRC', 'name') except NoSectionError as e: sys.exit("Aborted due to error with necessary configuration: " + str(e)) except NoOptionError as e: sys.exit("Aborted due to error with necessary configuration: " + str(e)) # load additional options irclib.DEBUG = config.getboolean('IRC', 'debug') # start up the IRC bot # create IRC and server objects and connect irc = DrBotIRC() server = irc.server().connect(botserver, botport, botnick, botircname) modlist = [] moduleList = [ "Countdown", "Dice", "FactFile", "IrcAdmin", "GoogleTranslate", "Seen" ] modObjs = [] for mod in moduleList: modObjs.append(mod(config, server modlist)) count = Countdown(config, server, modlist) dice = Dice(config, server, modlist) fact = FactFile(config, server, modlist) admin = IrcAdmin(config, server, modlist) gt = GoogleTranslate(config, server, modlist) seen = Seen(config, server, modlist) # loop forever irc.process_forever() # vi:tabstop=4:expandtab:autoindent