dr.botzo/dr_botzo/ircbot/bot.py

662 lines
20 KiB
Python

"""Provide the base IRC client bot which other code can latch onto."""
import bisect
import collections
import logging
import re
import ssl
import sys
from django.conf import settings
import irc.client
from irc.connection import Factory
from irc.dict import IRCDict
import irc.modes
import ircbot.lib as ircbotlib
from ircbot.models import IrcChannel, IrcPlugin
log = logging.getLogger('ircbot.bot')
class PrioritizedRegexHandler(collections.namedtuple('Base', ('priority', 'regex', 'callback'))):
def __lt__(self, other):
"when sorting prioritized handlers, only use the priority"
return self.priority < other.priority
class DrReactor(irc.client.Reactor):
"""Customize the basic IRC library's Reactor with more features."""
def __do_nothing(*args, **kwargs):
pass
def __init__(self, on_connect=__do_nothing, on_disconnect=__do_nothing, on_schedule=__do_nothing):
"""Initialize our custom stuff."""
super(DrReactor, self).__init__(on_connect=on_connect, on_disconnect=on_disconnect,
on_schedule=on_schedule)
self.regex_handlers = {}
def add_global_regex_handler(self, event, regex, handler, priority=0):
"""Adds a global handler function for a specific event type and regex.
Arguments:
event --- Event type (a string).
handler -- Callback function taking connection and event
parameters.
priority --- A number (the lower the number, the higher priority).
The handler function is called whenever the specified event is
triggered in any of the connections and the regex matches.
See documentation for the Event class.
The handler functions are called in priority order (lowest
number is highest priority). If a handler function returns
"NO MORE", no more handlers will be called.
This is basically an extension of add_global_handler(), either may
work, though it turns out most modules probably want this one.
"""
handler = PrioritizedRegexHandler(priority, regex, handler)
with self.mutex:
log.debug(u"in add_global_regex_handler")
event_regex_handlers = self.regex_handlers.setdefault(event, [])
bisect.insort(event_regex_handlers, handler)
def remove_global_regex_handler(self, event, handler):
"""Removes a global regex handler function.
Arguments:
event -- Event type (a string).
handler -- Callback function.
Returns 1 on success, otherwise 0.
"""
with self.mutex:
if not event in self.regex_handlers:
return 0
for h in self.regex_handlers[event]:
if handler == h.callback:
self.regex_handlers[event].remove(h)
return 1
def _handle_event(self, connection, event):
"""Handle an Event event incoming on ServerConnection connection.
Also supports regex handlers.
"""
log.debug(u"in DrReactor._handle_event")
with self.mutex:
# doing regex version first as it has the potential to be more specific
log.debug(u"checking regex handlers for %s", event.type)
matching_handlers = sorted(
self.regex_handlers.get("all_events", []) +
self.regex_handlers.get(event.type, [])
)
log.debug(u"got %d", len(matching_handlers))
for handler in matching_handlers:
log.debug(u"checking %s vs. %s", handler, event.arguments)
for line in event.arguments:
match = re.search(handler.regex, line)
if match:
log.debug(u"match!")
result = handler.callback(connection, event, match)
if result == "NO MORE":
return
matching_handlers = sorted(
self.handlers.get("all_events", []) +
self.handlers.get(event.type, [])
)
for handler in matching_handlers:
result = handler.callback(connection, event)
if result == "NO MORE":
return
class IRCBot(irc.client.SimpleIRCClient):
"""A single-server IRC bot class."""
reactor_class = DrReactor
def __init__(self, reconnection_interval=60):
super(IRCBot, self).__init__()
self.channels = IRCDict()
self.plugins = []
# set up the server list
self.server_list = settings.IRCBOT_SERVER_LIST
# set reconnection interval
if not reconnection_interval or reconnection_interval < 0:
reconnection_interval = 2 ** 31
self.reconnection_interval = reconnection_interval
# set basic stuff
self._nickname = settings.IRCBOT_NICKNAME
self._realname = settings.IRCBOT_REALNAME
# handlers
for i in ['disconnect', 'join', 'kick', 'mode', 'namreply', 'nick', 'part', 'quit', 'welcome']:
self.connection.add_global_handler(i, getattr(self, '_on_' + i), -20)
self.connection.reactor.add_global_regex_handler('pubmsg', r'^!load\s+([\S]+)$',
getattr(self, 'handle_load'), -20)
self.connection.reactor.add_global_regex_handler('privmsg', r'^!load\s+([\S]+)$',
getattr(self, 'handle_load'), -20)
self.connection.reactor.add_global_regex_handler('pubmsg', r'^!unload\s+([\S]+)$',
getattr(self, 'handle_unload'), -20)
self.connection.reactor.add_global_regex_handler('privmsg', r'^!unload\s+([\S]+)$',
getattr(self, 'handle_unload'), -20)
def _connected_checker(self):
if not self.connection.is_connected():
self.connection.execute_delayed(self.reconnection_interval,
self._connected_checker)
self.jump_server()
def _connect(self):
server = self.server_list[0]
try:
# build the connection factory as determined by IPV6/SSL settings
if settings.IRCBOT_SSL:
connect_factory = Factory(wrapper=ssl.wrap_socket, ipv6=settings.IRCBOT_IPV6)
else:
connect_factory = Factory(ipv6=settings.IRCBOT_IPV6)
self.connect(server[0], server[1], self._nickname, server[2], ircname=self._realname,
connect_factory=connect_factory)
except irc.client.ServerConnectionError:
pass
def _on_disconnect(self, c, e):
self.channels = IRCDict()
self.connection.execute_delayed(self.reconnection_interval,
self._connected_checker)
def _on_join(self, c, e):
ch = e.target
nick = e.source.nick
if nick == c.get_nickname():
self.channels[ch] = Channel()
self.channels[ch].add_user(nick)
def _on_kick(self, c, e):
nick = e.arguments[0]
channel = e.target
if nick == c.get_nickname():
del self.channels[channel]
else:
self.channels[channel].remove_user(nick)
def _on_mode(self, c, e):
modes = irc.modes.parse_channel_modes(" ".join(e.arguments))
t = e.target
if irc.client.is_channel(t):
ch = self.channels[t]
for mode in modes:
if mode[0] == "+":
f = ch.set_mode
else:
f = ch.clear_mode
f(mode[1], mode[2])
else:
# Mode on self... XXX
pass
def _on_namreply(self, c, e):
"""Get the list of names in a channel.
e.arguments[0] == "@" for secret channels,
"*" for private channels,
"=" for others (public channels)
e.arguments[1] == channel
e.arguments[2] == nick list
"""
ch_type, channel, nick_list = e.arguments
if channel == '*':
# User is not in any visible channel
# http://tools.ietf.org/html/rfc2812#section-3.2.5
return
for nick in nick_list.split():
nick_modes = []
if nick[0] in self.connection.features.prefix:
nick_modes.append(self.connection.features.prefix[nick[0]])
nick = nick[1:]
for mode in nick_modes:
self.channels[channel].set_mode(mode, nick)
self.channels[channel].add_user(nick)
def _on_nick(self, c, e):
before = e.source.nick
after = e.target
for ch in self.channels.values():
if ch.has_user(before):
ch.change_nick(before, after)
def _on_part(self, c, e):
nick = e.source.nick
channel = e.target
if nick == c.get_nickname():
del self.channels[channel]
else:
self.channels[channel].remove_user(nick)
def _on_quit(self, c, e):
nick = e.source.nick
for ch in self.channels.values():
if ch.has_user(nick):
ch.remove_user(nick)
def _on_welcome(self, connection, event):
"""Initialize/run a bunch of on-connection stuff.
Set the nickmask that the ircd tells us is us, autojoin channels, etc.
Args:
connection source connection
event incoming event
"""
what = event.arguments[0]
log.debug("welcome: %s", what)
for chan in IrcChannel.objects.filter(autojoin=True):
log.info(u"autojoining %s", chan.name)
self.connection.join(chan)
for plugin in IrcPlugin.objects.filter(autoload=True):
log.info(u"autoloading %s", plugin.path)
self._load_plugin(connection, event, plugin.path, feedback=False)
match = re.search(r'(\S+!\S+@\S+)', what)
if match:
self.nickmask = match.group(1)
log.debug("setting nickmask: %s", self.nickmask)
def die(self, msg="Bye, cruel world!"):
"""Let the bot die.
Arguments:
msg -- Quit message.
"""
self.connection.disconnect(msg)
sys.exit(0)
def disconnect(self, msg="I'll be back!"):
"""Disconnect the bot.
The bot will try to reconnect after a while.
Arguments:
msg -- Quit message.
"""
self.connection.disconnect(msg)
def get_version(self):
"""Returns the bot version.
Used when answering a CTCP VERSION request.
"""
return "Python irc.bot ({version})".format(
version=irc.client.VERSION_STRING)
def jump_server(self, msg="Changing servers"):
"""Connect to a new server, potentially disconnecting from the current one."""
if self.connection.is_connected():
self.connection.disconnect(msg)
self.server_list.append(self.server_list.pop(0))
self._connect()
def on_ctcp(self, c, e):
"""Default handler for ctcp events.
Replies to VERSION and PING requests and relays DCC requests
to the on_dccchat method.
"""
nick = e.source.nick
if e.arguments[0] == "VERSION":
c.ctcp_reply(nick, "VERSION " + self.get_version())
elif e.arguments[0] == "PING":
if len(e.arguments) > 1:
c.ctcp_reply(nick, "PING " + e.arguments[1])
elif e.arguments[0] == "DCC" and e.arguments[1].split(" ", 1)[0] == "CHAT":
self.on_dccchat(c, e)
def on_dccchat(self, c, e):
pass
def handle_load(self, connection, event, match):
"""Handle IRC requests to load a plugin."""
log.debug(u"is admin?: %s", str(ircbotlib.is_admin(event.source)))
if ircbotlib.is_admin(event.source):
plugin_path = match.group(1)
log.debug(u"calling _load_plugin on %s", plugin_path)
self._load_plugin(connection, event, plugin_path)
def _load_plugin(self, connection, event, plugin_path, feedback=True):
"""Load an IRC plugin.
The general assumption here is that a plugin's init loads its hooks and handlers.
"""
log.debug(u"trying to load %s", plugin_path)
dest = None
if feedback:
if irc.client.is_channel(event.target):
dest = event.target
else:
dest = irc.client.NickMask(event.source).nick
for path, plugin in self.plugins:
if plugin_path == path:
if feedback:
self.privmsg(dest, "Plugin '{0:s}' is already loaded.".format(plugin_path))
return
# not loaded, let's get to work
try:
__import__(plugin_path)
module = sys.modules[plugin_path]
plugin = module.plugin(self, connection, event)
plugin.start()
self.plugins.append((plugin_path, plugin))
# give it a model
plugin_model, c = IrcPlugin.objects.get_or_create(path=plugin_path)
if feedback:
self.privmsg(dest, "Plugin '{0:s}' loaded.".format(plugin_path))
except ImportError as e:
log.error("Error loading '{0:s}'".format(plugin_path))
log.exception(e)
if feedback:
self.privmsg(dest, "Plugin '{0:s}' could not be loaded.".format(plugin_path))
def handle_unload(self, connection, event, match):
"""Handle IRC requests to unload a plugin."""
log.debug(u"is admin?: %s", str(ircbotlib.is_admin(event.source)))
if ircbotlib.is_admin(event.source):
plugin_path = match.group(1)
log.debug(u"calling _unload_plugin on %s", plugin_path)
self._unload_plugin(connection, event, plugin_path)
def _unload_plugin(self, connection, event, plugin_path):
"""Attempt to unload and del a module if it's loaded."""
log.debug(u"trying to unload %s", plugin_path)
if irc.client.is_channel(event.target):
dest = event.target
else:
dest = irc.client.NickMask(event.source).nick
for path, plugin in self.plugins:
if plugin_path == path:
self.plugins.remove((path, plugin))
plugin.stop()
del plugin
del sys.modules[plugin_path]
self.privmsg(dest, "Plugin '{0:s}' unloaded.".format(plugin_path))
return
# guess it was never loaded
self.privmsg(dest, "Plugin '{0:s}' is not loaded.".format(plugin_path))
def privmsg(self, target, text):
"""Send a PRIVMSG command.
Args:
target the destination nick/channel
text the message to send
"""
if not target:
return
log.debug("OUTGOING PRIVMSG: t[%s] m[%s]", target, text)
splitter = "..."
# split messages that are too long. Max length is 512.
# TODO: this does not properly handle when the nickmask has been
# masked by the ircd
# is the above still the case?
space = 512 - len('\r\n') - len(' PRIVMSG ') - len(target) - len(' :') - len(self.nickmask) - len(' :')
splitspace = space - (len(splitter) + 1)
if len(text) > space:
times = 1
while len(text) > splitspace:
splitpos = text.rfind(' ', 0, splitspace)
splittext = text[0:splitpos] + ' ' + splitter
text = splitter + ' ' + text[splitpos+1:]
self.connection.send_raw("PRIVMSG {0:s} :{1:s}".format(target, splittext))
times = times + 1
if times >= 4:
# this is stupidly long, abort
return
# done splitting
self.connection.send_raw("PRIVMSG {0:s} :{1:s}".format(target, text))
else:
self.connection.send_raw("PRIVMSG {0:s} :{1:s}".format(target, text))
def start(self):
"""Start the bot."""
self._connect()
super(IRCBot, self).start()
def sigint_handler(self, signal, frame):
"""Cleanly shutdown on SIGINT."""
log.debug(u"shutting down")
for path, plugin in self.plugins:
log.debug(u"trying to shut down %s", path)
self.plugins.remove((path, plugin))
plugin.stop()
del plugin
del sys.modules[path]
self.disconnect("Shutting down...")
sys.exit()
class Channel(object):
"""A class for keeping information about an IRC channel."""
def __init__(self):
self.userdict = IRCDict()
self.operdict = IRCDict()
self.voiceddict = IRCDict()
self.ownerdict = IRCDict()
self.halfopdict = IRCDict()
self.modes = {}
def users(self):
"""Returns an unsorted list of the channel's users."""
return self.userdict.keys()
def opers(self):
"""Returns an unsorted list of the channel's operators."""
return self.operdict.keys()
def voiced(self):
"""Returns an unsorted list of the persons that have voice
mode set in the channel."""
return self.voiceddict.keys()
def owners(self):
"""Returns an unsorted list of the channel's owners."""
return self.ownerdict.keys()
def halfops(self):
"""Returns an unsorted list of the channel's half-operators."""
return self.halfopdict.keys()
def has_user(self, nick):
"""Check whether the channel has a user."""
return nick in self.userdict
def is_oper(self, nick):
"""Check whether a user has operator status in the channel."""
return nick in self.operdict
def is_voiced(self, nick):
"""Check whether a user has voice mode set in the channel."""
return nick in self.voiceddict
def is_owner(self, nick):
"""Check whether a user has owner status in the channel."""
return nick in self.ownerdict
def is_halfop(self, nick):
"""Check whether a user has half-operator status in the channel."""
return nick in self.halfopdict
def add_user(self, nick):
self.userdict[nick] = 1
def remove_user(self, nick):
for d in self.userdict, self.operdict, self.voiceddict:
if nick in d:
del d[nick]
def change_nick(self, before, after):
self.userdict[after] = self.userdict.pop(before)
if before in self.operdict:
self.operdict[after] = self.operdict.pop(before)
if before in self.voiceddict:
self.voiceddict[after] = self.voiceddict.pop(before)
def set_userdetails(self, nick, details):
if nick in self.userdict:
self.userdict[nick] = details
def set_mode(self, mode, value=None):
"""Set mode on the channel.
Arguments:
mode -- The mode (a single-character string).
value -- Value
"""
if mode == "o":
self.operdict[value] = 1
elif mode == "v":
self.voiceddict[value] = 1
elif mode == "q":
self.ownerdict[value] = 1
elif mode == "h":
self.halfopdict[value] = 1
else:
self.modes[mode] = value
def clear_mode(self, mode, value=None):
"""Clear mode on the channel.
Arguments:
mode -- The mode (a single-character string).
value -- Value
"""
try:
if mode == "o":
del self.operdict[value]
elif mode == "v":
del self.voiceddict[value]
elif mode == "q":
del self.ownerdict[value]
elif mode == "h":
del self.halfopdict[value]
else:
del self.modes[mode]
except KeyError:
pass
def has_mode(self, mode):
return mode in self.modes
def is_moderated(self):
return self.has_mode("m")
def is_secret(self):
return self.has_mode("s")
def is_protected(self):
return self.has_mode("p")
def has_topic_lock(self):
return self.has_mode("t")
def is_invite_only(self):
return self.has_mode("i")
def has_allow_external_messages(self):
return self.has_mode("n")
def has_limit(self):
return self.has_mode("l")
def limit(self):
if self.has_limit():
return self.modes["l"]
else:
return None
def has_key(self):
return self.has_mode("k")