this is necessary for supporting multiple irc servers in one bot config. this also has the side effect of requiring some code in ircbot and markov which autocreates channels to also include the server (retrieved via the connection). this will again help keep channels coherent for multi-server arrangements the twitter bot change here is untested but seems like the right idea (I haven't used the twitter package in forever)
349 lines
16 KiB
Python
349 lines
16 KiB
Python
"""Access to Twitter through bot commands."""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
|
|
import twython
|
|
|
|
from django.conf import settings
|
|
|
|
from ircbot.lib import Plugin, has_permission
|
|
from twitter.models import TwitterClient
|
|
|
|
|
|
log = logging.getLogger('twitter.ircplugin')
|
|
|
|
|
|
class Twitter(Plugin):
|
|
|
|
"""Access Twitter via the bot as an authenticated client."""
|
|
|
|
def __init__(self, bot, connection, event):
|
|
"""Initialize some stuff."""
|
|
|
|
self.authed = False
|
|
self.twit_args = {'timeout': 30.0}
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
self.temp_token = None
|
|
self.temp_token_secret = None
|
|
|
|
self.poll_mentions = False
|
|
|
|
self.server = connection.server_config
|
|
|
|
super(Twitter, self).__init__(bot, connection, event)
|
|
|
|
def start(self):
|
|
"""Prepare for oauth stuff (but don't execute it yet)."""
|
|
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
|
|
r'^!twitter\s+getstatus(\s+nosource)?(\s+noid)?\s+(\S+)$',
|
|
self.handle_getstatus, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
|
|
r'^!twitter\s+getuserstatus(\s+nosource)?(\s+noid)?\s+(\S+)(\s+.*|$)',
|
|
self.handle_getuserstatus, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+tweet\s+(.*)',
|
|
self.handle_tweet, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+gettoken$',
|
|
self.handle_gettoken, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+auth\s+(\S+)$',
|
|
self.handle_auth, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+replyto\s+(\S+)\s+(.*)',
|
|
self.handle_replyto, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+mentionpoll\s+start',
|
|
self.handle_start_mentionpoll, -20)
|
|
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+mentionpoll\s+stop',
|
|
self.handle_stop_mentionpoll, -20)
|
|
|
|
# try getting the stored auth tokens and logging in
|
|
try:
|
|
twittersettings = TwitterClient.objects.get(pk=1)
|
|
if twittersettings.oauth_token != '' and twittersettings.oauth_token_secret != '':
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
twittersettings.oauth_token, twittersettings.oauth_token_secret,
|
|
client_args=self.twit_args)
|
|
if self.twit.verify_credentials():
|
|
self.authed = True
|
|
log.debug("Logged in to Twitter with saved token.")
|
|
else:
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
else:
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
except TwitterClient.DoesNotExist:
|
|
log.error("twitter settings module does not exist")
|
|
|
|
super(Twitter, self).start()
|
|
|
|
def stop(self):
|
|
"""Tear down handlers."""
|
|
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_getstatus)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_getuserstatus)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_tweet)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_gettoken)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_auth)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_replyto)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_start_mentionpoll)
|
|
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_stop_mentionpoll)
|
|
|
|
self.poll_mentions = False
|
|
|
|
super(Twitter, self).stop()
|
|
|
|
def handle_start_mentionpoll(self, connection, event, match):
|
|
if not has_permission(event.source, 'twitter.manage_threads'):
|
|
return self.bot.reply(event, "You do not have permission to start/stop threads.")
|
|
|
|
self.poll_mentions = True
|
|
t = threading.Thread(target=self.thread_watch_mentions)
|
|
t.daemon = True
|
|
t.start()
|
|
self.bot.reply(event, "Now polling for mentions.")
|
|
|
|
def handle_stop_mentionpoll(self, connection, event, match):
|
|
if not has_permission(event.source, 'twitter.manage_threads'):
|
|
return self.bot.reply(event, "You do not have permission to start/stop threads.")
|
|
|
|
self.poll_mentions = False
|
|
self.bot.reply(event, "No longer polling for mentions.")
|
|
|
|
def handle_getstatus(self, connection, event, match):
|
|
"""Get a status by tweet ID."""
|
|
|
|
print_source = True
|
|
print_id = True
|
|
if match.group(1):
|
|
print_source = False
|
|
if match.group(2):
|
|
print_id = False
|
|
status = match.group(3)
|
|
try:
|
|
tweet = self.twit.show_status(id=status)
|
|
return self._reply_with_tweet_or_retweet_text(event, tweet=tweet, print_source=print_source,
|
|
print_id=print_id)
|
|
except Exception as e:
|
|
log.error("couldn't obtain status")
|
|
log.exception(e)
|
|
return self.bot.reply(event, "Couldn't obtain status: {0:s}".format(str(e)))
|
|
|
|
def handle_getuserstatus(self, connection, event, match):
|
|
"""Get a status for a user. Allows for getting one other than the most recent."""
|
|
|
|
print_source = True
|
|
print_id = True
|
|
if match.group(1):
|
|
print_source = False
|
|
if match.group(2):
|
|
print_id = False
|
|
user = match.group(3)
|
|
index = match.group(4)
|
|
|
|
try:
|
|
if index:
|
|
index = int(index)
|
|
if index > 0:
|
|
index = 0
|
|
else:
|
|
index = 0
|
|
except ValueError as e:
|
|
log.error("Couldn't convert index")
|
|
log.exception(e)
|
|
index = 0
|
|
|
|
count = (-1*index) + 1
|
|
|
|
try:
|
|
tweets = self.twit.get_user_timeline(screen_name=user, count=count, include_rts=True)
|
|
if tweets:
|
|
tweet = tweets[-1*index]
|
|
return self._reply_with_tweet_or_retweet_text(event, tweet=tweet, print_source=print_source,
|
|
print_id=print_id)
|
|
except Exception as e:
|
|
log.error("couldn't obtain status")
|
|
log.exception(e)
|
|
return self.bot.reply(event, "Couldn't obtain status: {0:s}".format(str(e)))
|
|
|
|
def handle_tweet(self, connection, event, match):
|
|
"""Tweet. Needs authentication."""
|
|
|
|
tweet = match.group(1)
|
|
if not self.twit.verify_credentials():
|
|
return self.bot.reply(event, "The bot must be authenticated to tweet.")
|
|
if not has_permission(event.source, 'twitter.send_tweets'):
|
|
return self.bot.reply(event, "You do not have permission to send tweets.")
|
|
|
|
try:
|
|
if self.twit.update_status(status=tweet, display_coordinates=False) is not None:
|
|
return self.bot.reply(event, "'{0:s}' tweeted.".format(tweet))
|
|
else:
|
|
return self.bot.reply(event, "Unknown error sending tweet(s).")
|
|
except Exception as e:
|
|
log.error("couldn't tweet")
|
|
log.exception(e)
|
|
return self.bot.reply(event, "Couldn't tweet: {0:s}".format(str(e)))
|
|
|
|
def handle_replyto(self, connection, event, match):
|
|
"""Reply to a tweet, in the twitter in_reply_to_status_id sense. Needs authentication."""
|
|
|
|
status_id = match.group(1)
|
|
tweet = match.group(2)
|
|
|
|
if not self.twit.verify_credentials():
|
|
return self.bot.reply(event, "The bot must be authenticated to tweet.")
|
|
if not has_permission(event.source, 'twitter.send_tweets'):
|
|
return self.bot.reply(event, "you do not have permission to send tweets.")
|
|
|
|
replyee_tweet = self.twit.show_status(id=status_id)
|
|
target = replyee_tweet['user']['screen_name']
|
|
|
|
try:
|
|
reptweet = "@{0:s} {1:s}".format(target, tweet)
|
|
if self.twit.update_status(status=reptweet, display_coordinates=False, in_reply_to_status_id=status_id) is not None:
|
|
return self.bot.reply(event, "'{0:s}' tweeted.".format(tweet))
|
|
else:
|
|
return self.bot.reply(event, "Unknown error sending tweet.")
|
|
except Exception as e:
|
|
log.error("couldn't tweet")
|
|
log.exception(e)
|
|
return self.bot.reply(event, "Couldn't tweet: {0:s}".format(str(e)))
|
|
|
|
def handle_gettoken(self, connection, event, match):
|
|
"""Get an oauth token, so that the user may authenticate the bot."""
|
|
|
|
try:
|
|
if not self.twit.verify_credentials():
|
|
self.authed = False
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
except twython.TwythonError:
|
|
self.authed = False
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
|
|
auth = self.twit.get_authentication_tokens()
|
|
self.temp_token = auth['oauth_token']
|
|
self.temp_token_secret = auth['oauth_token_secret']
|
|
return self.bot.reply(event, "Go to the following link in your browser: {0:s} and send me the pin."
|
|
"".format(auth['auth_url']))
|
|
|
|
def handle_auth(self, connection, event, match):
|
|
"""Authenticate, given a PIN (following gettoken)."""
|
|
|
|
oauth_verifier = match.group(1)
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
self.temp_token, self.temp_token_secret,
|
|
client_args=self.twit_args)
|
|
final_step = self.twit.get_authorized_tokens(oauth_verifier)
|
|
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
final_step['oauth_token'], final_step['oauth_token_secret'],
|
|
client_args=self.twit_args)
|
|
|
|
try:
|
|
twittersettings = TwitterClient.objects.get(pk=1)
|
|
twittersettings.oauth_token = final_step['oauth_token']
|
|
twittersettings.oauth_token_secret = final_step['oauth_token_secret']
|
|
twittersettings.clean()
|
|
twittersettings.save()
|
|
|
|
if self.twit.verify_credentials():
|
|
self.authed = True
|
|
# print timeline stuff. this will set up the appropriate timer
|
|
return self.bot.reply(event, "The bot is now logged in.")
|
|
else:
|
|
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
|
|
client_args=self.twit_args)
|
|
return self.bot.reply(event, "The bot was not able to authenticate.")
|
|
except TwitterClient.DoesNotExist:
|
|
log.error("twitter settings object does not exist")
|
|
return self.bot.reply(event, "twitter module not configured")
|
|
|
|
def thread_watch_mentions(self, sleep_time=60):
|
|
"""Poll mentions from Twitter every sleep_time seconds.
|
|
|
|
:param sleep_time: second to sleep between checks
|
|
:type sleep_time: int
|
|
"""
|
|
|
|
while self.poll_mentions:
|
|
twittersettings = TwitterClient.objects.get(pk=1)
|
|
out_chan = twittersettings.mentions_output_channel.name
|
|
since_id = twittersettings.mentions_since_id
|
|
|
|
if out_chan.server != self.server:
|
|
self.poll_mentions = False
|
|
return
|
|
|
|
mentions = self.twit.get_mentions_timeline(since_id=since_id)
|
|
mentions.reverse()
|
|
for mention in mentions:
|
|
reply = self._return_tweet_or_retweet_text(tweet=mention, print_source=True)
|
|
self.bot.reply(None, reply, explicit_target=out_chan)
|
|
since_id = mention['id'] if mention['id'] > since_id else since_id
|
|
|
|
twittersettings.mentions_since_id = since_id
|
|
twittersettings.save()
|
|
|
|
time.sleep(sleep_time)
|
|
|
|
def _reply_with_tweet_or_retweet_text(self, event, tweet, print_source=False, print_id=True):
|
|
"""Do a bot.reply() with the appropriate text representation of the given tweet.
|
|
|
|
See _return_tweet_or_retweet_text for details.
|
|
|
|
:param event: the irc event to use for the reply
|
|
:type event: Event
|
|
:param tweet: the tweet (from twython) to inspect and return a string for
|
|
:type tweet: dict
|
|
:param print_source: whether or not to print the tweet's author (default False)
|
|
:type print_source: bool
|
|
:param print_id: whether or not to print the tweet's ID (default True)
|
|
:type print_id: bool
|
|
:returns: tweet text suitable for printing
|
|
:rtype: str
|
|
"""
|
|
|
|
return self.bot.reply(event, self._return_tweet_or_retweet_text(tweet, print_source, print_id))
|
|
|
|
def _return_tweet_or_retweet_text(self, tweet, print_source=False, print_id=True):
|
|
"""Return a string of the author and text body of a status, accounting for whether
|
|
or not the fetched status is a retweet.
|
|
|
|
:param tweet: the tweet (from twython) to inspect and return a string for
|
|
:type tweet: dict
|
|
:param print_source: whether or not to print the tweet's author (default False)
|
|
:type print_source: bool
|
|
:param print_id: whether or not to print the tweet's ID (default True)
|
|
:type print_id: bool
|
|
:returns: tweet text suitable for printing
|
|
:rtype: str
|
|
"""
|
|
|
|
retweet = getattr(tweet, 'retweeted_status', None)
|
|
if retweet:
|
|
if print_source:
|
|
reply = "@%s (RT @%s): %s" % (tweet['user']['screen_name'],
|
|
retweet['user']['screen_name'],
|
|
self._unencode_xml(retweet['text']))
|
|
else:
|
|
reply = "(RT @%s): %s" % (retweet['user']['screen_name'],
|
|
self._unencode_xml(retweet['text']))
|
|
else:
|
|
if print_source:
|
|
reply = "@%s: %s" % (tweet['user']['screen_name'],
|
|
self._unencode_xml(tweet['text']))
|
|
else:
|
|
reply = "%s" % (self._unencode_xml(tweet['text']))
|
|
|
|
if print_id:
|
|
reply = reply + " [{0:d}]".format(tweet['id'])
|
|
|
|
return reply
|
|
|
|
|
|
plugin = Twitter
|