dr.botzo/storycraft/ircplugin.py

573 lines
26 KiB
Python

"""Collaborative nonsense story writing."""
import logging
import random
import re
import time
from django.conf import settings
from django.utils import timezone
from irc.client import NickMask
from ircbot.lib import Plugin
from storycraft.models import StorycraftGame, StorycraftPlayer, StorycraftLine
log = logging.getLogger('storycraft.ircplugin')
class Storycraft(Plugin):
"""Play Storycraft, the game of varying authors adding to a story mostly devoid of context.
This is a game for people who like making up facts, insulting each other, telling
insane stories, making in-jokes, and generally having a good time writing what can
fairly only be described as pure nonsense. It does not assume that all players are
available at any given time, so games may take a while, but the module allows for
concurrency in the number of games running.
"""
def start(self):
"""Hook handler functions into the IRC library."""
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!storycraft\s+status$',
self.handle_status)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+new\s+game(\s+with\s+config\s+(.*)$|$)',
self.handle_new_game)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+game\s+(\d+)\s+join$',
self.handle_join_game)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+list\s+games\s+(open|in progress|completed|my games|waiting for me)$',
self.handle_list_games)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+game\s+(\d+)\s+start$',
self.handle_start_game)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+game\s+(\d+)\s+show\s+line$',
self.handle_show_line)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+game\s+(\d+)\s+add\s+line\s+(.*)$',
self.handle_add_line)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!storycraft\s+game\s+(\d+)\s+status$',
self.handle_game_status)
super(Storycraft, self).start()
def stop(self):
"""Unhook handler functions into the IRC library."""
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_status)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_new_game)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_join_game)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_list_games)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_start_game)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_show_line)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_add_line)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_game_status)
super(Storycraft, self).stop()
def handle_status(self, connection, event, match):
"""Print information about the storycraft games, or one specific game."""
# do the general status of all games
count_completed = self._get_completed_games().count()
count_in_progress = self._get_in_progress_games().count()
count_open = self._get_open_games().count()
count_free = self._get_free_game_count()
return self.bot.reply(event, "Storycraft - {0:d} games completed, {1:d} in progress, {2:d} open. "
"{3:d} slots free.".format(count_completed, count_in_progress, count_open, count_free))
def handle_game_status(self, connection, event, match):
"""Print information about one specific game."""
game_id = int(match.group(1))
# do the status of one game
game = self._get_game_details(game_id)
if game:
# get game string
status_str = game.summary()
return self.bot.reply(event, status_str)
else:
return self.bot.reply(event, "Game #{0:d} does not exist.".format(game_id))
def handle_new_game(self, connection, event, match):
"""Add a game to the system, which users can join."""
nick = NickMask(event.source).nick
# ensure the system isn't at capacity
count_free = self._get_free_game_count()
if count_free > 0:
# get the default settings
master_channel = settings.STORYCRAFT_MASTER_CHANNEL
game_length = settings.STORYCRAFT_DEFAULT_GAME_LENGTH
line_length = settings.STORYCRAFT_DEFAULT_LINE_LENGTH
lines_per_turn = settings.STORYCRAFT_DEFAULT_LINES_PER_TURN
options = match.group(2)
if options:
# override settings with provided options
tuples = options.split(';')
for option in tuples:
optpair = option.split(',')
if len(optpair) == 2:
if optpair[0] == 'game_length':
game_length = int(optpair[1])
elif optpair[0] == 'line_length':
line_length = int(optpair[1])
elif optpair[0] == 'lines_per_turn':
lines_per_turn = int(optpair[1])
# add a new game
game = self._add_new_game(game_length, line_length, lines_per_turn, nick, event.source)
if game:
# add the player to the game, too
player = StorycraftPlayer.objects.create(nick=nick, nickmask=event.source, game=game)
# tell the control channel
self.bot.reply(None, "{0:s} created a game of storycraft - do '!storycraft game "
"{1:d} join' to take part!".format(nick, game.pk),
explicit_target=master_channel)
log.debug("%s added a new game", nick)
return self.bot.reply(event, "Game #{0:d} has been created. When everyone has joined, do "
"'!storycraft game {0:d} start'".format(game.pk))
else:
return self.bot.reply(event, "Error creating game.")
else:
return self.bot.reply(event, "All slots are full.")
def handle_join_game(self, connection, event, match):
"""Add a player to an open game."""
nick = NickMask(event.source).nick
game_id = int(match.group(1))
# ensure the game exists
game = self._get_game_details(game_id)
if game:
# check that the game hasn't started yet
if game.status == StorycraftGame.STATUS_OPEN:
# see if userhost is already in the game
if not self._get_player_exists_in_game(game_id, event.source):
# add the player
player = StorycraftPlayer.objects.create(nick=nick, nickmask=event.source, game=game)
if player:
# output results
master_channel = settings.STORYCRAFT_MASTER_CHANNEL
self.bot.reply(None, "{0:s} joined storycraft #{1:d}!".format(nick, game_id),
explicit_target=master_channel)
log.debug("%s joined game #%d", nick, game_id)
return self.bot.reply(event, "{0:s}, welcome to the game.".format(nick))
else:
return self.bot.reply(event, "Error joining game.")
else:
return self.bot.reply(event, "You are already in game #{0:d}.".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} is not open for new players.".format(game_id))
else:
return self.bot.reply("Game #{0:d} does not exist.".format(game_id))
def handle_list_games(self, connection, event, match):
"""Get the listing of either open or in progress games."""
nick = NickMask(event.source).nick
category = match.group(1)
games = list()
if category == 'open':
games = self._get_open_games()
elif category == 'in progress':
games = self._get_in_progress_games()
elif category == 'completed':
games = self._get_completed_games()
elif category == 'my games':
games = self._get_active_games_with_player(nick)
elif category == 'waiting for me':
games = self._get_games_waiting_on_player(nick)
if len(games) > 5:
# just list the game ids
gameids = []
for game in games:
gameids.append(str(game.id))
return self.bot.reply(event, "Too many to list! ids: {0:s}".format(','.join(gameids)))
else:
# show game details, since there's not many
gamestrs = []
for game in games:
gamestrs.append(game.summary())
return self.bot.reply(event, "\n".join(gamestrs))
def handle_start_game(self, connection, event, match):
"""Start a game, closing the period to join and starting line trading."""
nick = NickMask(event.source).nick
game_id = int(match.group(1))
# retrieve the game
game = self._get_game_details(game_id)
if game:
# check that the user is the owner of the game
if event.source == game.owner_nickmask:
# check that the game is open
if game.status == StorycraftGame.STATUS_OPEN:
# start the game
game.status = StorycraftGame.STATUS_IN_PROGRESS
game.clean()
game.save()
# determine the first person to send a line
if self._pick_next_player(game_id):
# indicate the status
line = game.get_lines()[0]
player = self._get_player_by_id(line.player_id)
master_channel = settings.STORYCRAFT_MASTER_CHANNEL
# tell the control channel
self.bot.reply(None, "{0:s} started storycraft #{1:d}! - first player "
"is {2:s}, do '!storycraft game {1:d} show line' when the game is "
"assigned to you.".format(nick, game_id, player.nick),
explicit_target=master_channel)
log.debug("%s started game #%d", nick, game_id)
return self.bot.reply(event, "Game #{0:d} started.".format(game_id))
else:
return self.bot.reply(event, "Failed to assign game to first player.")
else:
return self.bot.reply("Game #{0:d} is not open.".format(game_id))
else:
return self.bot.reply(event, "You are not the owner of #{0:d}.".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} does not exist.".format(game_id))
def handle_show_line(self, connection, event, match):
"""List the line to continue with, if queryer is the assignee."""
game_id = int(match.group(1))
# retrieve the game
game = self._get_game_details(game_id)
if game:
# check that the game is in progress
if game.status == StorycraftGame.STATUS_IN_PROGRESS:
# get the most recent line
lines = game.get_lines()
if lines:
line = lines[0]
# check that the line is a prompt for input
if line.line == '':
player = self._get_player_by_id(line.player_id)
# check that the caller is the person the line is assigned to
if player.nickmask == event.source:
# check previous line
if len(lines) > 1:
# get previous line and display it
repline = lines[1]
return self.bot.reply(event, "The current line is '{0:s}'. Continue the story "
"with '!storycraft game {1:d} add line YOUR TEXT'."
"".format(repline.line, game_id))
else:
# player is starting the story
return self.bot.reply(event, "You are starting the story! Add the first line "
"with '!storycraft game {0:d} add line YOUR TEXT'."
"".format(game_id))
else:
return self.bot.reply(event, "You are not the assignee of the current game.")
else:
return self.bot.reply(event, "Game is in progress but the most recent line is not "
"available - internal consistency error.")
else:
return self.bot.reply(event, "Game #{0:d} has no lines - internal consistency error."
"".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} has not been started.".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} does not exist.".format(game_id))
def handle_add_line(self, connection, event, match):
"""Add a line to an in progress game."""
nick = NickMask(event.source).nick
game_id = int(match.group(1))
input_line = match.group(2)
# retrieve the game
game = self._get_game_details(game_id)
if game:
# check that the game is in progress
if game.status == StorycraftGame.STATUS_IN_PROGRESS:
# get the most recent line
lines = game.get_lines()
if lines:
line = lines[0]
# check that the line is a prompt for input
if line.line == '':
player = self._get_player_by_id(line.player_id)
if player.nickmask == event.source:
# check the input line length
if len(input_line) <= game.line_length:
# add line
line.line = input_line
line.clean()
line.save()
# determine the first person to send a line
if self._pick_next_player(game_id):
# indicate the status
game = self._get_game_details(game_id)
line = game.get_lines()[0]
last_line = game.get_lines()[1]
player = self._get_player_by_id(line.player_id)
return_msg = 'Line logged.'
# get progress, so user knows how much time is left
progress_str = game.get_progress_string()
# message the new owner
if player.nick == nick:
# simpily notify them they're up again
return_msg = 'Line logged. Please add another. ' + progress_str
else:
# notify the new owner, too
self.bot.reply(None, "You have a new line in storycraft "
"#{0:d}: '{1:s}' {2:s}"
"".format(game_id, last_line.line, progress_str),
explicit_target=player.nick)
master_channel = settings.STORYCRAFT_MASTER_CHANNEL
log.debug("%s added a line to #%d", nick, game_id)
# log output
if game.status == StorycraftGame.STATUS_IN_PROGRESS:
self.bot.reply(None, "{0:s} added a line to storycraft "
"#{1:d}! - next player is {2:s}"
"".format(nick, game_id, player.nick),
explicit_target=master_channel)
return self.bot.reply(event, return_msg)
else:
self.bot.reply(None, "{0:s} finished storycraft #{1:d}!"
"".format(nick, game_id),
explicit_target=master_channel)
return self.bot.reply(event, "Line logged (and game completed).")
else:
return self.bot.reply(event, "Failed to assign game to next player.")
else:
return self.bot.reply(event, "The maximum line length for this game is {0:d} "
"characters. (You were over by {1:d}.)"
"".format(game.line_length, len(input_line)-game.line_length))
else:
return self.bot.reply(event, "You are not the assignee of the current game.")
else:
return self.bot.reply(event, "Game is in progress but the most recent line is not "
"available - internal consistency error.")
else:
return self.bot.reply(event, "Game #{0:d} has no lines - internal consistency error."
"".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} has not been started.".format(game_id))
else:
return self.bot.reply(event, "Game #{0:d} does not exist.".format(game_id))
@staticmethod
def _get_game_details(game_id):
"""Get the details of one game."""
try:
game = StorycraftGame.objects.get(pk=game_id)
return game
except StorycraftGame.DoesNotExist:
return None
@staticmethod
def _add_new_game(game_length, line_length, lines_per_turn, nick, nickmask):
"""Add a new game to the system."""
game = StorycraftGame.objects.create(game_length=game_length, line_length=line_length,
lines_per_turn=lines_per_turn, owner_nick=nick,
owner_nickmask=nickmask)
return game
@staticmethod
def _get_player_list_for_game(game_id):
"""Get the list of players in one game."""
try:
game = StorycraftGame.objects.get(pk=game_id)
players = StorycraftPlayer.objects.filter(game=game)
return players
except StorycraftGame.DoesNotExist:
return []
def _get_game_exists(self, game_id):
"""Return the existence of a game."""
return self._get_game_details(game_id) is not None
def _pick_next_player(self, game_id):
"""Based on the game's settings and state, set who will be replying next."""
game = self._get_game_details(game_id)
if game:
count_by_latest_player = 0
latest_player_id = 0
# get the lines in this game, latest first
lines = game.get_lines()
for line in lines:
if latest_player_id == 0:
latest_player_id = line.player_id
if latest_player_id == line.player_id:
count_by_latest_player += 1
else:
break
# now we know how many times the most recent person has responded,
# figure out what to do
if count_by_latest_player == 0:
# must be a new game, get a random player
players = self._get_player_list_for_game(game.id)
random_player = players[random.randint(1, len(players))-1]
return self._assign_game_to_player(game_id, random_player.id)
elif count_by_latest_player >= game.lines_per_turn and len(lines) < game.game_length:
# player has reached the max, get a random different player to assign
players = self._get_player_list_for_game(game.id)
random_player_id = latest_player_id
if len(players) > 1:
while random_player_id == latest_player_id:
random_player = players[random.randint(1, len(players))-1]
random_player_id = random_player.id
return self._assign_game_to_player(game_id, random_player_id)
elif count_by_latest_player >= game.lines_per_turn and len(lines) >= game.game_length:
# end of game
game.status = StorycraftGame.STATUS_COMPLETED
game.clean()
game.save()
return game
elif count_by_latest_player < game.lines_per_turn:
# player should continue getting lines
return self._assign_game_to_player(game_id, latest_player_id)
@staticmethod
def _assign_game_to_player(game_id, player_id):
"""Assign the game to a player, prompting them for responses."""
try:
game = StorycraftGame.objects.get(pk=game_id)
except StorycraftGame.DoesNotExist:
return None
try:
player = StorycraftPlayer.objects.get(pk=player_id)
except StorycraftPlayer.DoesNotExist:
return None
line = StorycraftLine.objects.create(game=game, player=player)
return line
@staticmethod
def _get_player_by_id(player_id):
"""Get the player details based on id."""
try:
player = StorycraftPlayer.objects.get(pk=player_id)
return player
except StorycraftPlayer.DoesNotExist:
return None
@staticmethod
def _get_player_by_userhost_in_game(game_id, nickmask):
"""Get the player details if they exist in the given game."""
try:
game = StorycraftGame.objects.get(pk=game_id)
except StorycraftGame.DoesNotExist:
return None
players = StorycraftPlayer.objects.filter(nickmask=nickmask, game=game)
if len(players) == 1:
return players[0]
else:
return None
def _get_completed_games(self):
"""Get the games with COMPLETED status."""
return self._get_games_of_type(StorycraftGame.STATUS_COMPLETED)
def _get_in_progress_games(self):
"""Get the games with IN PROGRESS status."""
return self._get_games_of_type(StorycraftGame.STATUS_IN_PROGRESS)
def _get_open_games(self):
"""Get the games with OPEN status."""
return self._get_games_of_type(StorycraftGame.STATUS_OPEN)
@staticmethod
def _get_active_games_with_player(player_nick):
"""Return the in progress/open games that include the player nick."""
players = StorycraftPlayer.objects.filter(nick=player_nick)
game_ids = [x.game.pk for x in players]
return StorycraftGame.objects.filter(pk__in=game_ids).filter(status__in=[StorycraftGame.STATUS_OPEN,
StorycraftGame.STATUS_IN_PROGRESS])
@staticmethod
def _get_games_waiting_on_player(player_nick):
"""Return the games where the player nick is the owner of a pending line.
Include the games that the player started and are open, since they're waiting
on the person too.
"""
try:
player = StorycraftPlayer.objects.get(nick=player_nick)
except StorycraftPlayer.DoesNotExist:
return StorycraftGame.objects.none()
latest_lines = StorycraftLine.objects.filter(player=player).latest('time')
return [x.game for x in latest_lines if x.line == '']
@staticmethod
def _get_games_of_type(status):
"""Return the games of the specified type."""
return StorycraftGame.objects.filter(status=status)
def _get_player_exists_in_game(self, game_id, nickmask):
"""Return the existence of a player in a game."""
return self._get_player_by_userhost_in_game(game_id, nickmask) is not None
def _get_free_game_count(self):
"""Get the number of slots available, given current in progess/open games."""
all_slots = settings.STORYCRAFT_CONCURRENT_GAMES
count_in_progress = self._get_in_progress_games().count()
count_open = self._get_open_games().count()
return all_slots - count_in_progress - count_open
plugin = Storycraft