dr.botzo/twitter/ircplugin.py

343 lines
16 KiB
Python
Raw Normal View History

"""Access to Twitter through bot commands."""
import logging
import threading
import time
import twython
from django.conf import settings
from ircbot.lib import Plugin, has_permission
from twitter.models import TwitterClient
log = logging.getLogger('twitter.ircplugin')
class Twitter(Plugin):
"""Access Twitter via the bot as an authenticated client."""
def __init__(self, bot, connection, event):
"""Initialize some stuff."""
self.authed = False
self.twit_args = {'timeout': 30.0}
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
self.temp_token = None
self.temp_token_secret = None
self.poll_mentions = False
super(Twitter, self).__init__(bot, connection, event)
def start(self):
"""Prepare for oauth stuff (but don't execute it yet)."""
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!twitter\s+getstatus(\s+nosource)?(\s+noid)?\s+(\S+)$',
self.handle_getstatus, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'],
r'^!twitter\s+getuserstatus(\s+nosource)?(\s+noid)?\s+(\S+)(\s+.*|$)',
self.handle_getuserstatus, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+tweet\s+(.*)',
self.handle_tweet, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+gettoken$',
self.handle_gettoken, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+auth\s+(\S+)$',
self.handle_auth, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+replyto\s+(\S+)\s+(.*)',
self.handle_replyto, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+mentionpoll\s+start',
self.handle_start_mentionpoll, -20)
self.connection.reactor.add_global_regex_handler(['pubmsg', 'privmsg'], r'^!twitter\s+mentionpoll\s+stop',
self.handle_stop_mentionpoll, -20)
# try getting the stored auth tokens and logging in
try:
twittersettings = TwitterClient.objects.get(pk=1)
if twittersettings.oauth_token != '' and twittersettings.oauth_token_secret != '':
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
twittersettings.oauth_token, twittersettings.oauth_token_secret,
client_args=self.twit_args)
if self.twit.verify_credentials():
self.authed = True
log.debug("Logged in to Twitter with saved token.")
else:
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
else:
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
except TwitterClient.DoesNotExist:
log.error("twitter settings module does not exist")
super(Twitter, self).start()
def stop(self):
"""Tear down handlers."""
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_getstatus)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_getuserstatus)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_tweet)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_gettoken)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_auth)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_replyto)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_start_mentionpoll)
self.connection.reactor.remove_global_regex_handler(['pubmsg', 'privmsg'], self.handle_stop_mentionpoll)
self.poll_mentions = False
super(Twitter, self).stop()
def handle_start_mentionpoll(self, connection, event, match):
if not has_permission(event.source, 'twitter.manage_threads'):
return self.bot.reply(event, "You do not have permission to start/stop threads.")
self.poll_mentions = True
t = threading.Thread(target=self.thread_watch_mentions)
t.daemon = True
t.start()
self.bot.reply(event, "Now polling for mentions.")
def handle_stop_mentionpoll(self, connection, event, match):
if not has_permission(event.source, 'twitter.manage_threads'):
return self.bot.reply(event, "You do not have permission to start/stop threads.")
self.poll_mentions = False
self.bot.reply(event, "No longer polling for mentions.")
def handle_getstatus(self, connection, event, match):
"""Get a status by tweet ID."""
print_source = True
print_id = True
if match.group(1):
print_source = False
if match.group(2):
print_id = False
status = match.group(3)
try:
tweet = self.twit.show_status(id=status)
return self._reply_with_tweet_or_retweet_text(event, tweet=tweet, print_source=print_source,
print_id=print_id)
except Exception as e:
log.error("couldn't obtain status")
log.exception(e)
return self.bot.reply(event, "Couldn't obtain status: {0:s}".format(str(e)))
def handle_getuserstatus(self, connection, event, match):
"""Get a status for a user. Allows for getting one other than the most recent."""
print_source = True
print_id = True
if match.group(1):
print_source = False
if match.group(2):
print_id = False
user = match.group(3)
index = match.group(4)
try:
if index:
index = int(index)
if index > 0:
index = 0
else:
index = 0
except ValueError as e:
log.error("Couldn't convert index")
log.exception(e)
index = 0
count = (-1*index) + 1
try:
tweets = self.twit.get_user_timeline(screen_name=user, count=count, include_rts=True)
if tweets:
tweet = tweets[-1*index]
return self._reply_with_tweet_or_retweet_text(event, tweet=tweet, print_source=print_source,
print_id=print_id)
except Exception as e:
log.error("couldn't obtain status")
log.exception(e)
return self.bot.reply(event, "Couldn't obtain status: {0:s}".format(str(e)))
def handle_tweet(self, connection, event, match):
"""Tweet. Needs authentication."""
tweet = match.group(1)
if not self.twit.verify_credentials():
return self.bot.reply(event, "The bot must be authenticated to tweet.")
if not has_permission(event.source, 'twitter.send_tweets'):
return self.bot.reply(event, "You do not have permission to send tweets.")
try:
if self.twit.update_status(status=tweet, display_coordinates=False) is not None:
return self.bot.reply(event, "'{0:s}' tweeted.".format(tweet))
else:
return self.bot.reply(event, "Unknown error sending tweet(s).")
except Exception as e:
log.error("couldn't tweet")
log.exception(e)
return self.bot.reply(event, "Couldn't tweet: {0:s}".format(str(e)))
def handle_replyto(self, connection, event, match):
"""Reply to a tweet, in the twitter in_reply_to_status_id sense. Needs authentication."""
status_id = match.group(1)
tweet = match.group(2)
if not self.twit.verify_credentials():
return self.bot.reply(event, "The bot must be authenticated to tweet.")
if not has_permission(event.source, 'twitter.send_tweets'):
return self.bot.reply(event, "you do not have permission to send tweets.")
replyee_tweet = self.twit.show_status(id=status_id)
target = replyee_tweet['user']['screen_name']
try:
reptweet = "@{0:s} {1:s}".format(target, tweet)
if self.twit.update_status(status=reptweet, display_coordinates=False, in_reply_to_status_id=status_id) is not None:
return self.bot.reply(event, "'{0:s}' tweeted.".format(tweet))
else:
return self.bot.reply(event, "Unknown error sending tweet.")
except Exception as e:
log.error("couldn't tweet")
log.exception(e)
return self.bot.reply(event, "Couldn't tweet: {0:s}".format(str(e)))
def handle_gettoken(self, connection, event, match):
"""Get an oauth token, so that the user may authenticate the bot."""
try:
if not self.twit.verify_credentials():
self.authed = False
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
except twython.TwythonError:
self.authed = False
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
auth = self.twit.get_authentication_tokens()
self.temp_token = auth['oauth_token']
self.temp_token_secret = auth['oauth_token_secret']
return self.bot.reply(event, "Go to the following link in your browser: {0:s} and send me the pin."
"".format(auth['auth_url']))
def handle_auth(self, connection, event, match):
"""Authenticate, given a PIN (following gettoken)."""
oauth_verifier = match.group(1)
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
self.temp_token, self.temp_token_secret,
client_args=self.twit_args)
final_step = self.twit.get_authorized_tokens(oauth_verifier)
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
final_step['oauth_token'], final_step['oauth_token_secret'],
client_args=self.twit_args)
try:
twittersettings = TwitterClient.objects.get(pk=1)
twittersettings.oauth_token = final_step['oauth_token']
twittersettings.oauth_token_secret = final_step['oauth_token_secret']
twittersettings.clean()
twittersettings.save()
if self.twit.verify_credentials():
self.authed = True
# print timeline stuff. this will set up the appropriate timer
return self.bot.reply(event, "The bot is now logged in.")
else:
self.twit = twython.Twython(settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET,
client_args=self.twit_args)
return self.bot.reply(event, "The bot was not able to authenticate.")
except TwitterClient.DoesNotExist:
log.error("twitter settings object does not exist")
return self.bot.reply(event, "twitter module not configured")
def thread_watch_mentions(self, sleep_time=60):
"""Poll mentions from Twitter every sleep_time seconds.
:param sleep_time: second to sleep between checks
:type sleep_time: int
"""
while self.poll_mentions:
twittersettings = TwitterClient.objects.get(pk=1)
out_chan = twittersettings.mentions_output_channel.name
since_id = twittersettings.mentions_since_id
mentions = self.twit.get_mentions_timeline(since_id=since_id)
mentions.reverse()
for mention in mentions:
reply = self._return_tweet_or_retweet_text(tweet=mention, print_source=True)
self.bot.reply(None, reply, explicit_target=out_chan)
since_id = mention['id'] if mention['id'] > since_id else since_id
twittersettings.mentions_since_id = since_id
twittersettings.save()
time.sleep(sleep_time)
def _reply_with_tweet_or_retweet_text(self, event, tweet, print_source=False, print_id=True):
"""Do a bot.reply() with the appropriate text representation of the given tweet.
See _return_tweet_or_retweet_text for details.
:param event: the irc event to use for the reply
:type event: Event
:param tweet: the tweet (from twython) to inspect and return a string for
:type tweet: dict
:param print_source: whether or not to print the tweet's author (default False)
:type print_source: bool
:param print_id: whether or not to print the tweet's ID (default True)
:type print_id: bool
:returns: tweet text suitable for printing
:rtype: str
"""
return self.bot.reply(event, self._return_tweet_or_retweet_text(tweet, print_source, print_id))
def _return_tweet_or_retweet_text(self, tweet, print_source=False, print_id=True):
"""Return a string of the author and text body of a status, accounting for whether
or not the fetched status is a retweet.
:param tweet: the tweet (from twython) to inspect and return a string for
:type tweet: dict
:param print_source: whether or not to print the tweet's author (default False)
:type print_source: bool
:param print_id: whether or not to print the tweet's ID (default True)
:type print_id: bool
:returns: tweet text suitable for printing
:rtype: str
"""
retweet = getattr(tweet, 'retweeted_status', None)
if retweet:
if print_source:
reply = "@%s (RT @%s): %s" % (tweet['user']['screen_name'],
retweet['user']['screen_name'],
self._unencode_xml(retweet['text']))
else:
reply = "(RT @%s): %s" % (retweet['user']['screen_name'],
self._unencode_xml(retweet['text']))
else:
if print_source:
reply = "@%s: %s" % (tweet['user']['screen_name'],
self._unencode_xml(tweet['text']))
else:
reply = "%s" % (self._unencode_xml(tweet['text']))
if print_id:
reply = reply + " [{0:d}]".format(tweet['id'])
return reply
plugin = Twitter