""" Twitter - access to Twitter through bot commands Copyright (C) 2010 Brian S. Stephan This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import re import thread import time import urlparse import MySQLdb as mdb import twython from Module import Module class Twitter(Module): """Access Twitter via the bot as an authenticated client.""" def __init__(self, irc, config): """Prepare for oauth stuff (but don't execute it yet).""" Module.__init__(self, irc, config) # setup regexes getstatuspattern = "^!twitter\s+getstatus(\s+nosource)?(\s+noid)?\s+(\S+)$" getuserstatuspattern = "^!twitter\s+getuserstatus(\s+nosource)?(\s+noid)?\s+(\S+)(\s+.*|$)" tweetpattern = "^!twitter\s+tweet\s+(.*)" gettokenpattern = "^!twitter\s+gettoken$" authpattern = "^!twitter\s+auth\s+(\S+)$" replytopattern = "^!twitter\s+replyto\s+(\S+)\s+(.*)" self.getstatusre = re.compile(getstatuspattern) self.getuserstatusre = re.compile(getuserstatuspattern) self.tweetre = re.compile(tweetpattern) self.gettokenre = re.compile(gettokenpattern) self.authre = re.compile(authpattern) self.replytore = re.compile(replytopattern) # prep twitter self.consumer_key = 'N2aSGxBP8t3cCgWyF1B2Aw' self.consumer_secret = '0aQPEV4K3MMpicfi2lDtCP5pvjsKaqIpfuWtsPzx8' # settings # force timeline check to wait 5 minutes (for channel joins and antispam) self.next_timeline_check = time.time() + 300 self.authed = False # try getting the stored auth tokens and logging in (oauth_token, oauth_token_secret) = self._retrieve_stored_auth_tokens() if oauth_token is not None and oauth_token_secret is not None: self.twit = twython.Twython(self.consumer_key, self.consumer_secret, oauth_token, oauth_token_secret) if self.twit.verify_credentials(): self.authed = True # print timeline stuff. this will set up the appropriate timer self._check_self_timeline() self.log.debug("Logged in to Twitter with saved token.") else: self.twit = twython.Twython(self.consumer_key, self.consumer_secret) else: self.twit = twython.Twython(self.consumer_key, self.consumer_secret) thread.start_new_thread(self.thread_do, ()) def db_init(self): """Set up the settings table.""" # init the table if it doesn't exist version = self.db_module_registered(self.__class__.__name__) if version == None or version < 1: db = self.get_db() # create tables try: version = 1 cur = db.cursor(mdb.cursors.DictCursor) cur.execute(""" CREATE TABLE twitter_settings ( since_id BIGINT(20) UNSIGNED NOT NULL, output_channel VARCHAR(64) NOT NULL, oauth_token VARCHAR(256) DEFAULT NULL, oauth_token_secret VARCHAR(256) DEFAULT NULL ) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin """) cur.execute("""INSERT INTO twitter_settings (since_id, output_channel) VALUES (0, '#dr.botzo')""") db.commit() self.db_register_module_version(self.__class__.__name__, version) except mdb.Error as e: db.rollback() self.log.error("database error trying to create tables") self.log.exception(e) raise finally: cur.close() def do(self, connection, event, nick, userhost, what, admin_unlocked): """Attempt to do twitter things.""" if self.getstatusre.search(what): return self.irc.reply(event, self.twitter_getstatus(event, nick, userhost, what, admin_unlocked)) elif self.getuserstatusre.search(what): return self.irc.reply(event, self.twitter_getuserstatus(event, nick, userhost, what, admin_unlocked)) elif self.tweetre.search(what): return self.irc.reply(event, self.twitter_tweet(event, nick, userhost, what, admin_unlocked)) elif self.replytore.search(what): return self.irc.reply(event, self.twitter_replyto(event, nick, userhost, what, admin_unlocked)) elif self.gettokenre.search(what): return self.irc.reply(event, self.twitter_gettoken(event, nick, userhost, what, admin_unlocked)) elif self.authre.search(what): return self.irc.reply(event, self.twitter_auth(event, nick, userhost, what, admin_unlocked)) def twitter_getstatus(self, event, nick, userhost, what, admin_unlocked): """Get a status by tweet ID.""" match = self.getstatusre.search(what) if match: print_source = True print_id = True if match.group(1): print_source = False if match.group(2): print_id = False status = match.group(3) try: tweet = self.twit.show_status(id=status) return self._return_tweet_or_retweet_text(tweet=tweet, print_source=print_source, print_id=print_id) except twython.exceptions.TwythonError as e: return "Couldn't obtain status: " + str(e) def twitter_getuserstatus(self, event, nick, userhost, what, admin_unlocked): """Get a status for a user. Allows for getting one other than the most recent.""" match = self.getuserstatusre.search(what) if match: print_source = True print_id = True if match.group(1): print_source = False if match.group(2): print_id = False user = match.group(3) index = match.group(4) try: if index: index = int(index) if index > 0: index = 0 else: index = 0 except ValueError as e: self.log.error("Couldn't convert index: " + str(e)) index = 0 count = (-1*index) + 1 try: tweets = self.twit.get_user_timeline(screen_name=user, count=count, include_rts=True) if tweets: tweet = tweets[-1*index] return self._return_tweet_or_retweet_text(tweet=tweet, print_source=print_source, print_id=print_id) except twython.exceptions.TwythonError as e: return "Couldn't obtain status: " + str(e) except ValueError as e: return "Couldn't obtain status: " + str(e) def twitter_tweet(self, event, nick, userhost, what, admin_unlocked): """Tweet. Needs authentication.""" match = self.tweetre.search(what) if match: tweet = match.group(1) if not self.twit.verify_credentials(): return "You must be authenticated to tweet." if admin_unlocked is False: return "Only admins can tweet." try: if self.twit.update_status(status=tweet, display_coordinates=False) is not None: return "'{0:s}' tweeted.".format(tweet) else: return "Unknown error sending tweet(s)." except twython.exceptions.TwythonError as e: return "Couldn't tweet: " + str(e) def twitter_replyto(self, event, nick, userhost, what, admin_unlocked): """Reply to a tweet, in the twitter in_reply_to_status_id sense. Needs authentication.""" match = self.replytore.search(what) if match: status_id = match.group(1) tweet = match.group(2) if not self.twit.verify_credentials(): return "You must be authenticated to tweet." if admin_unlocked is False: return "Only admins can tweet." replyee_tweet = self.twit.show_status(id=status_id) target = replyee_tweet['user']['screen_name'].encode('utf-8', 'ignore') try: reptweet = "@{0:s}: {1:s}".format(target, tweet) if self.twit.update_status(status=reptweet, display_coordinates=False, in_reply_to_status_id=status_id) is not None: return "'{0:s}' tweeted.".format(tweet) else: return "Unknown error sending tweet." except twython.exceptions.TwythonError as e: return "Couldn't tweet: " + str(e) def twitter_gettoken(self, event, nick, userhost, what, admin_unlocked): """Get an oauth token, so that the user may authenticate the bot.""" match = self.gettokenre.search(what) if match: if self.twit.verify_credentials(): self.authed = False self.twit = twython.Twython(self.consumer_key, self.consumer_secret) auth = self.twit.get_authentication_tokens() self.temp_token = auth['oauth_token'] self.temp_token_secret = auth['oauth_token_secret'] return ("Go to the following link in your browser: {0:s} " "and send me the pin.".format(auth['auth_url'])) def twitter_auth(self, event, nick, userhost, what, admin_unlocked): """Authenticate, given a PIN (following gettoken).""" match = self.authre.search(what) if match: oauth_verifier = match.group(1) self.twit = twython.Twython(self.consumer_key, self.consumer_secret, self.temp_token, self.temp_token_secret) final_step = self.twit.get_authorized_tokens(oauth_verifier) self.twit = twython.Twython(self.consumer_key, self.consumer_secret, final_step['oauth_token'], final_step['oauth_token_secret']) self._persist_auth_tokens(final_step['oauth_token'], final_step['oauth_token_secret']) if self.twit.verify_credentials(): self.authed = True # print timeline stuff. this will set up the appropriate timer self._check_self_timeline() return "The bot is now logged in." else: self.twit = twython.Twython(self.consumer_key, self.consumer_secret) def thread_do(self): """Check the timeline.""" while not self.is_shutdown: self._check_self_timeline() time.sleep(1) def _check_self_timeline(self): """Check my timeline, and if there are entries, print them to the channel.""" if self.next_timeline_check < time.time(): self.next_timeline_check = time.time() + 300 if self.twit.verify_credentials(): # get the id of the last check we made since_id = self._get_last_since_id() output_channel = self._get_output_channel() if since_id is not None and output_channel != '': tweets = self.twit.get_home_timeline(since_id=since_id) tweets.reverse() for tweet in tweets: tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True) self.sendmsg(output_channel.encode('utf-8', 'ignore'), tweet_text) # friends timeline printed, find the latest id new_since_id = self._get_latest_tweet_id(tweets, since_id) tweets = self.twit.get_mentions_timeline(since_id=since_id) tweets.reverse() for tweet in tweets: tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True) self.sendmsg(output_channel.encode('utf-8', 'ignore'), tweet_text) # mentions printed, find the latest id new_since_id = self._get_latest_tweet_id(tweets, new_since_id) # set since_id self._set_last_since_id(new_since_id) def _return_tweet_or_retweet_text(self, tweet, print_source=False, print_id=True): """ Return a string of the author and text body of a status, accounting for whether or not the fetched status is a retweet. """ reply = "" retweet = getattr(tweet, 'retweeted_status', None) if retweet: if print_source: reply = "@%s (RT @%s): %s" % (tweet['user']['screen_name'].encode('utf-8', 'ignore'), retweet['user']['screen_name'].encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(retweet['text'].encode('utf-8', 'ignore'))) else: reply = "(RT @%s): %s" % (retweet['user']['screen_name'].encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(retweet['text'].encode('utf-8', 'ignore'))) else: if print_source: reply = "@%s: %s" % (tweet['user']['screen_name'].encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(tweet['text'].encode('utf-8', 'ignore'))) else: reply = "%s" % (super(Twitter, self)._unencode_xml(tweet['text'].encode('utf-8', 'ignore'))) if print_id: reply = reply + " [{0:d}]".format(tweet['id']) return reply def _get_last_since_id(self): """Get the since_id out of the database.""" db = self.get_db() try: cur = db.cursor(mdb.cursors.DictCursor) query = "SELECT since_id FROM twitter_settings" cur.execute(query) result = cur.fetchone() if result: return result['since_id'] except mdb.Error as e: self.log.error("database error getting last since ID") self.log.exception(e) raise finally: cur.close() def _get_output_channel(self): """Get the output_channel out of the database.""" db = self.get_db() try: cur = db.cursor(mdb.cursors.DictCursor) query = "SELECT output_channel FROM twitter_settings" cur.execute(query) result = cur.fetchone() if result: return result['output_channel'] except mdb.Error as e: self.log.error("database error getting output channel") self.log.exception(e) raise finally: cur.close() def _set_last_since_id(self, since_id): """Set the since_id.""" db = self.get_db() try: cur = db.cursor(mdb.cursors.DictCursor) statement = "UPDATE twitter_settings SET since_id = %s" cur.execute(statement, (since_id,)) db.commit() except mdb.Error as e: self.log.error("database error saving last since ID") self.log.exception(e) raise finally: cur.close() def _get_latest_tweet_id(self, tweets, since_id): """Find the latest tweet id in the provided list, or the given since_id.""" latest = since_id for tweet in tweets: if tweet['id'] > latest: latest = tweet['id'] return latest def _persist_auth_tokens(self, oauth_token, oauth_token_secret): """Save the auth tokens to the database, with the intent of reusing them.""" db = self.get_db() try: cur = db.cursor(mdb.cursors.DictCursor) statement = "UPDATE twitter_settings SET oauth_token = %s, oauth_token_secret = %s" cur.execute(statement, (oauth_token, oauth_token_secret)) db.commit() except mdb.Error as e: self.log.error("database error saving auth tokens") self.log.exception(e) raise finally: cur.close() def _retrieve_stored_auth_tokens(self): """Check the database for existing auth tokens, try reusing them.""" db = self.get_db() try: cur = db.cursor(mdb.cursors.DictCursor) query = "SELECT oauth_token, oauth_token_secret FROM twitter_settings" cur.execute(query) result = cur.fetchone() if result: return (result['oauth_token'], result['oauth_token_secret']) except mdb.Error as e: self.log.error("database error retrieving auth tokens") self.log.exception(e) raise finally: cur.close() # vi:tabstop=4:expandtab:autoindent # kate: indent-mode python;indent-width 4;replace-tabs on;