more of a moving of the code, actually, it now exists in (an overridden) _handle_event, so that recursions happen against irc events directly, rather than an already partially interpreted object. with this change, modules don't need to implement do() nor do we have a need for the internal_bus, which was doing an additional walk of the modules after the irc event was already handled and turned into text. now the core event handler does the recursion scans. to support this, we bring back the old replypath trick and use it again, so we know when to send a privmsg reply and when to return text so that it may be chained in recursion. this feels old hat by now, but if you haven't been following along, you should really look at the diff. that's the meat of the change. the rest is updating modules to use self.reply() and reimplementing (un)register_handlers where appropriate
272 lines
11 KiB
Python
272 lines
11 KiB
Python
"""
|
|
Markov - Chatterbot via Markov chains for IRC
|
|
Copyright (C) 2010 Brian S. Stephan
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
import cPickle
|
|
import os
|
|
import random
|
|
import re
|
|
import sys
|
|
|
|
from extlib import irclib
|
|
|
|
from Module import Module
|
|
|
|
class Markov(Module):
|
|
|
|
"""
|
|
Create a chatterbot very similar to a MegaHAL, but simpler and
|
|
implemented in pure Python. Proof of concept code from Ape.
|
|
|
|
Ape wrote: based on this:
|
|
http://uswaretech.com/blog/2009/06/pseudo-random-text-markov-chains-python/
|
|
and this:
|
|
http://code.activestate.com/recipes/194364-the-markov-chain-algorithm/
|
|
"""
|
|
|
|
def __init__(self, irc, config, server):
|
|
"""Create the Markov chainer, and learn text from a file if available."""
|
|
|
|
Module.__init__(self, irc, config, server)
|
|
|
|
self.brain_filename = 'dr.botzo.markov'
|
|
|
|
# set up some keywords for use in the chains --- don't change these
|
|
# once you've created a brain
|
|
self.start1 = '__start1'
|
|
self.start2 = '__start2'
|
|
self.stop = '__stop'
|
|
|
|
# set up regexes, for replying to specific stuff
|
|
trainpattern = '^!markov\s+train\s+(.*)$'
|
|
learnpattern = '^!markov\s+learn\s+(.*)$'
|
|
replypattern = '^!markov\s+reply(\s+min=(\d+))?(\s+max=(\d+))?(\s+(.*)$|$)'
|
|
|
|
self.trainre = re.compile(trainpattern)
|
|
self.learnre = re.compile(learnpattern)
|
|
self.replyre = re.compile(replypattern)
|
|
|
|
try:
|
|
brainfile = open(self.brain_filename, 'r')
|
|
self.brain = cPickle.load(brainfile)
|
|
brainfile.close()
|
|
except IOError:
|
|
self.brain = {}
|
|
self.brain.setdefault((self.start1, self.start2), []).append(self.stop)
|
|
|
|
def register_handlers(self):
|
|
"""Handle pubmsg/privmsg, to learn and/or reply to IRC events."""
|
|
|
|
self.server.add_global_handler('pubmsg', self.on_pub_or_privmsg, self.priority())
|
|
self.server.add_global_handler('privmsg', self.on_pub_or_privmsg, self.priority())
|
|
self.server.add_global_handler('pubmsg', self.learn_from_irc_event)
|
|
self.server.add_global_handler('privmsg', self.learn_from_irc_event)
|
|
|
|
def unregister_handlers(self):
|
|
self.server.remove_global_handler('pubmsg', self.on_pub_or_privmsg)
|
|
self.server.remove_global_handler('privmsg', self.on_pub_or_privmsg)
|
|
self.server.remove_global_handler('pubmsg', self.learn_from_irc_event)
|
|
self.server.remove_global_handler('privmsg', self.learn_from_irc_event)
|
|
|
|
def save(self):
|
|
"""Pickle the brain upon save."""
|
|
|
|
brainfile = open(self.brain_filename, 'w')
|
|
cPickle.dump(self.brain, brainfile)
|
|
brainfile.close()
|
|
|
|
def learn_from_irc_event(self, connection, event):
|
|
"""Learn from IRC events."""
|
|
|
|
what = ''.join(event.arguments()[0])
|
|
|
|
# don't learn from commands
|
|
if self.trainre.search(what) or self.learnre.search(what) or self.replyre.search(what):
|
|
return
|
|
|
|
self._learn_line(what)
|
|
|
|
def do(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Handle commands and inputs."""
|
|
|
|
if self.trainre.search(what):
|
|
return self.reply(connection, event, self.markov_train(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.learnre.search(what):
|
|
return self.reply(connection, event, self.markov_learn(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.replyre.search(what):
|
|
return self.reply(connection, event, self.markov_reply(connection, event, nick, userhost, what, admin_unlocked))
|
|
|
|
# not a command, so see if i'm being mentioned
|
|
if re.search(connection.get_nickname(), what, re.IGNORECASE) is not None:
|
|
addressed_pattern = '^' + connection.get_nickname() + '[:,]\s+(.*)'
|
|
addressed_re = re.compile(addressed_pattern)
|
|
if addressed_re.match(what):
|
|
# i was addressed directly, so respond, addressing the speaker
|
|
return self.reply(connection, event, '{0:s}: {1:s}'.format(nick, self._reply_to_line(addressed_re.match(what).group(1))))
|
|
else:
|
|
# i wasn't addressed directly, so just respond
|
|
return self.reply(connection, event, '{0:s}'.format(self._reply_to_line(what)))
|
|
|
|
def markov_train(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Learn lines from a file. Good for initializing a brain."""
|
|
|
|
match = self.trainre.search(what)
|
|
if match and admin_unlocked:
|
|
filename = match.group(1)
|
|
|
|
try:
|
|
for line in open(filename, 'r'):
|
|
self._learn_line(line)
|
|
|
|
return 'Learned from \'{0:s}\'.'.format(filename)
|
|
except IOError:
|
|
return 'No such file \'{0:s}\'.'.format(filename)
|
|
|
|
def markov_learn(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Learn one line, as provided to the command."""
|
|
|
|
match = self.learnre.search(what)
|
|
if match:
|
|
line = match.group(1)
|
|
self._learn_line(line)
|
|
|
|
# return what was learned, for weird chaining purposes
|
|
return line
|
|
|
|
def markov_reply(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Generate a reply to one line, without learning it."""
|
|
|
|
match = self.replyre.search(what)
|
|
if match:
|
|
min_size = 15
|
|
max_size = 100
|
|
|
|
if match.group(2):
|
|
min_size = int(match.group(2))
|
|
if match.group(4):
|
|
max_size = int(match.group(4))
|
|
|
|
if match.group(5) != '':
|
|
line = match.group(6)
|
|
return self._reply_to_line(line, min_size=min_size, max_size=max_size)
|
|
else:
|
|
return self._reply(min_size=min_size, max_size=max_size)
|
|
|
|
def _learn_line(self, line):
|
|
"""Create Markov chains from the provided line."""
|
|
|
|
# set up the head of the chain
|
|
w1 = self.start1
|
|
w2 = self.start2
|
|
|
|
# for each word pair, add the next word to the dictionary
|
|
for word in line.split():
|
|
self.brain.setdefault((w1, w2), []).append(word.lower())
|
|
w1, w2 = w2, word.lower()
|
|
|
|
# cap the end of the chain
|
|
self.brain.setdefault((w1, w2), []).append(self.stop)
|
|
|
|
def _reply(self, min_size=15, max_size=100):
|
|
"""Generate a totally random string from the chains, of specified limit of words."""
|
|
|
|
# if the limit is too low, there's nothing to do
|
|
if (max_size <= 3):
|
|
raise Exception("max_size is too small: %d" % max_size)
|
|
|
|
# if the min is too large, abort
|
|
if (min_size > 20):
|
|
raise Exception("min_size is too large: %d" % min_size)
|
|
|
|
# start with an empty chain, and work from there
|
|
gen_words = [self.start1, self.start2]
|
|
|
|
# set up the number of times we've tried to hit the specified minimum
|
|
min_search_tries = 0
|
|
|
|
# walk a chain, randomly, building the list of words
|
|
while len(gen_words) < max_size + 2 and gen_words[-1] != self.stop:
|
|
if len(gen_words) < min_size and len(filter(lambda a: a != self.stop, self.brain[(gen_words[-2], gen_words[-1])])) > 0:
|
|
# we aren't at min size yet and we have at least one chain path
|
|
# that isn't (yet) the end. take one of those.
|
|
gen_words.append(random.choice(filter(lambda a: a != self.stop, self.brain[(gen_words[-2], gen_words[-1])])))
|
|
min_search_tries = 0
|
|
elif len(gen_words) < min_size and min_search_tries <= 10:
|
|
# we aren't at min size yet and the only path we currently have is
|
|
# a end, but we haven't retried much yet, so chop off our current
|
|
# chain and try again.
|
|
gen_words = gen_words[0:len(gen_words)-2]
|
|
min_search_tries = min_search_tries + 1
|
|
else:
|
|
# either we have hit our min size requirement, or we haven't but
|
|
# we also exhausted min_search_tries. either way, just pick a word
|
|
# at random, knowing it may be the end of the chain
|
|
gen_words.append(random.choice(self.brain[(gen_words[-2], gen_words[-1])]))
|
|
min_search_tries = 0
|
|
|
|
# chop off the seed data at the start
|
|
gen_words = gen_words[2:]
|
|
|
|
# chop off the end text, if it was the keyword indicating an end of chain
|
|
if gen_words[-1] == self.stop:
|
|
gen_words = gen_words[:-1]
|
|
|
|
return ' '.join(gen_words)
|
|
|
|
def _reply_to_line(self, line, min_size=15, max_size=100):
|
|
"""Reply to a line, using some text in the line as a point in the chain."""
|
|
|
|
# if the limit is too low, there's nothing to do
|
|
if (max_size <= 3):
|
|
raise Exception("max_size is too small: %d" % max_size)
|
|
|
|
# if the min is too large, abort
|
|
if (min_size > 20):
|
|
raise Exception("min_size is too large: %d" % min_size)
|
|
|
|
# get a random word from the input
|
|
words = line.split()
|
|
target_word = words[random.randint(0, len(words)-1)]
|
|
|
|
# start with an empty chain, and work from there
|
|
gen_words = [self.start1, self.start2]
|
|
|
|
# walk a chain, randomly, building the list of words
|
|
while len(gen_words) < max_size + 2 and gen_words[-1] != self.stop:
|
|
# use the chain that includes the target word, if it is found
|
|
if target_word in self.brain[(gen_words[-2], gen_words[-1])]:
|
|
gen_words.append(target_word)
|
|
# generate new word
|
|
target_word = words[random.randint(0, len(words)-1)]
|
|
else:
|
|
if len(gen_words) < min_size and len(filter(lambda a: a != self.stop, self.brain[(gen_words[-2], gen_words[-1])])) > 0:
|
|
gen_words.append(random.choice(filter(lambda a: a != self.stop, self.brain[(gen_words[-2], gen_words[-1])])))
|
|
else:
|
|
gen_words.append(random.choice(self.brain[(gen_words[-2], gen_words[-1])]))
|
|
|
|
# chop off the seed data at the start
|
|
gen_words = gen_words[2:]
|
|
|
|
# chop off the end text, if it was the keyword indicating an end of chain
|
|
if gen_words[-1] == self.stop:
|
|
gen_words = gen_words[:-1]
|
|
|
|
return ' '.join(gen_words)
|
|
|
|
# vi:tabstop=4:expandtab:autoindent
|
|
# kate: indent-mode python;indent-width 4;replace-tabs on;
|