dr.botzo/dr_botzo/ircbot/bot.py

384 lines
10 KiB
Python
Raw Normal View History

2015-05-09 18:56:26 -05:00
"""Provide the base IRC client bot which other code can latch onto."""
import logging
2015-05-09 18:56:26 -05:00
import ssl
import sys
from django.conf import settings
import irc.client
from irc.connection import Factory
from irc.dict import IRCDict
import irc.modes
from ircbot.models import IrcChannel
log = logging.getLogger('ircbot.bot')
2015-05-09 18:56:26 -05:00
class IRCBot(irc.client.SimpleIRCClient):
"""A single-server IRC bot class."""
def __init__(self, reconnection_interval=60):
super(IRCBot, self).__init__()
self.channels = IRCDict()
# set up the server list
self.server_list = settings.IRCBOT_SERVER_LIST
# set reconnection interval
if not reconnection_interval or reconnection_interval < 0:
reconnection_interval = 2 ** 31
self.reconnection_interval = reconnection_interval
# set basic stuff
self._nickname = settings.IRCBOT_NICKNAME
self._realname = settings.IRCBOT_REALNAME
# handlers
for i in ['disconnect', 'join', 'kick', 'mode', 'namreply', 'nick', 'part', 'quit', 'endofmotd']:
2015-05-09 18:56:26 -05:00
self.connection.add_global_handler(i, getattr(self, '_on_' + i), -20)
def _connected_checker(self):
if not self.connection.is_connected():
self.connection.execute_delayed(self.reconnection_interval,
self._connected_checker)
self.jump_server()
def _connect(self):
server = self.server_list[0]
try:
# build the connection factory as determined by IPV6/SSL settings
if settings.IRCBOT_SSL:
connect_factory = Factory(wrapper=ssl.wrap_socket, ipv6=settings.IRCBOT_IPV6)
else:
connect_factory = Factory(ipv6=settings.IRCBOT_IPV6)
self.connect(server[0], server[1], self._nickname, server[2], ircname=self._realname,
connect_factory=connect_factory)
except irc.client.ServerConnectionError:
pass
def _on_disconnect(self, c, e):
self.channels = IRCDict()
self.connection.execute_delayed(self.reconnection_interval,
self._connected_checker)
def _on_join(self, c, e):
ch = e.target
nick = e.source.nick
if nick == c.get_nickname():
self.channels[ch] = Channel()
self.channels[ch].add_user(nick)
def _on_kick(self, c, e):
nick = e.arguments[0]
channel = e.target
if nick == c.get_nickname():
del self.channels[channel]
else:
self.channels[channel].remove_user(nick)
def _on_mode(self, c, e):
modes = irc.modes.parse_channel_modes(" ".join(e.arguments))
t = e.target
if irc.client.is_channel(t):
ch = self.channels[t]
for mode in modes:
if mode[0] == "+":
f = ch.set_mode
else:
f = ch.clear_mode
f(mode[1], mode[2])
else:
# Mode on self... XXX
pass
def _on_endofmotd(self, c, e):
"""Join autojoin channels when the MOTD is over."""
for chan in IrcChannel.objects.filter(autojoin=True):
log.info(u"autojoining %s", chan.name)
self.connection.join(chan)
2015-05-09 18:56:26 -05:00
def _on_namreply(self, c, e):
"""Get the list of names in a channel.
e.arguments[0] == "@" for secret channels,
"*" for private channels,
"=" for others (public channels)
e.arguments[1] == channel
e.arguments[2] == nick list
"""
ch_type, channel, nick_list = e.arguments
if channel == '*':
# User is not in any visible channel
# http://tools.ietf.org/html/rfc2812#section-3.2.5
return
for nick in nick_list.split():
nick_modes = []
if nick[0] in self.connection.features.prefix:
nick_modes.append(self.connection.features.prefix[nick[0]])
nick = nick[1:]
for mode in nick_modes:
self.channels[channel].set_mode(mode, nick)
self.channels[channel].add_user(nick)
def _on_nick(self, c, e):
before = e.source.nick
after = e.target
for ch in self.channels.values():
if ch.has_user(before):
ch.change_nick(before, after)
def _on_part(self, c, e):
nick = e.source.nick
channel = e.target
if nick == c.get_nickname():
del self.channels[channel]
else:
self.channels[channel].remove_user(nick)
def _on_quit(self, c, e):
nick = e.source.nick
for ch in self.channels.values():
if ch.has_user(nick):
ch.remove_user(nick)
def die(self, msg="Bye, cruel world!"):
"""Let the bot die.
Arguments:
msg -- Quit message.
"""
self.connection.disconnect(msg)
sys.exit(0)
def disconnect(self, msg="I'll be back!"):
"""Disconnect the bot.
The bot will try to reconnect after a while.
Arguments:
msg -- Quit message.
"""
self.connection.disconnect(msg)
def get_version(self):
"""Returns the bot version.
Used when answering a CTCP VERSION request.
"""
return "Python irc.bot ({version})".format(
version=irc.client.VERSION_STRING)
def jump_server(self, msg="Changing servers"):
"""Connect to a new server, potentially disconnecting from the current one."""
if self.connection.is_connected():
self.connection.disconnect(msg)
self.server_list.append(self.server_list.pop(0))
self._connect()
def on_ctcp(self, c, e):
"""Default handler for ctcp events.
Replies to VERSION and PING requests and relays DCC requests
to the on_dccchat method.
"""
nick = e.source.nick
if e.arguments[0] == "VERSION":
c.ctcp_reply(nick, "VERSION " + self.get_version())
elif e.arguments[0] == "PING":
if len(e.arguments) > 1:
c.ctcp_reply(nick, "PING " + e.arguments[1])
elif e.arguments[0] == "DCC" and e.arguments[1].split(" ", 1)[0] == "CHAT":
self.on_dccchat(c, e)
def on_dccchat(self, c, e):
pass
def start(self):
"""Start the bot."""
self._connect()
super(IRCBot, self).start()
class Channel(object):
"""A class for keeping information about an IRC channel."""
def __init__(self):
self.userdict = IRCDict()
self.operdict = IRCDict()
self.voiceddict = IRCDict()
self.ownerdict = IRCDict()
self.halfopdict = IRCDict()
self.modes = {}
def users(self):
"""Returns an unsorted list of the channel's users."""
return self.userdict.keys()
def opers(self):
"""Returns an unsorted list of the channel's operators."""
return self.operdict.keys()
def voiced(self):
"""Returns an unsorted list of the persons that have voice
mode set in the channel."""
return self.voiceddict.keys()
def owners(self):
"""Returns an unsorted list of the channel's owners."""
return self.ownerdict.keys()
def halfops(self):
"""Returns an unsorted list of the channel's half-operators."""
return self.halfopdict.keys()
def has_user(self, nick):
"""Check whether the channel has a user."""
return nick in self.userdict
def is_oper(self, nick):
"""Check whether a user has operator status in the channel."""
return nick in self.operdict
def is_voiced(self, nick):
"""Check whether a user has voice mode set in the channel."""
return nick in self.voiceddict
def is_owner(self, nick):
"""Check whether a user has owner status in the channel."""
return nick in self.ownerdict
def is_halfop(self, nick):
"""Check whether a user has half-operator status in the channel."""
return nick in self.halfopdict
def add_user(self, nick):
self.userdict[nick] = 1
def remove_user(self, nick):
for d in self.userdict, self.operdict, self.voiceddict:
if nick in d:
del d[nick]
def change_nick(self, before, after):
self.userdict[after] = self.userdict.pop(before)
if before in self.operdict:
self.operdict[after] = self.operdict.pop(before)
if before in self.voiceddict:
self.voiceddict[after] = self.voiceddict.pop(before)
def set_userdetails(self, nick, details):
if nick in self.userdict:
self.userdict[nick] = details
def set_mode(self, mode, value=None):
"""Set mode on the channel.
Arguments:
mode -- The mode (a single-character string).
value -- Value
"""
if mode == "o":
self.operdict[value] = 1
elif mode == "v":
self.voiceddict[value] = 1
elif mode == "q":
self.ownerdict[value] = 1
elif mode == "h":
self.halfopdict[value] = 1
else:
self.modes[mode] = value
def clear_mode(self, mode, value=None):
"""Clear mode on the channel.
Arguments:
mode -- The mode (a single-character string).
value -- Value
"""
try:
if mode == "o":
del self.operdict[value]
elif mode == "v":
del self.voiceddict[value]
elif mode == "q":
del self.ownerdict[value]
elif mode == "h":
del self.halfopdict[value]
else:
del self.modes[mode]
except KeyError:
pass
def has_mode(self, mode):
return mode in self.modes
def is_moderated(self):
return self.has_mode("m")
def is_secret(self):
return self.has_mode("s")
def is_protected(self):
return self.has_mode("p")
def has_topic_lock(self):
return self.has_mode("t")
def is_invite_only(self):
return self.has_mode("i")
def has_allow_external_messages(self):
return self.has_mode("n")
def has_limit(self):
return self.has_mode("l")
def limit(self):
if self.has_limit():
return self.modes["l"]
else:
return None
def has_key(self):
return self.has_mode("k")