Twitter: first attempt at twython library

keeping the old library around for a bit, but it can probably go
eventually
This commit is contained in:
Brian S. Stephan 2013-07-28 22:15:43 -05:00
parent 0d465ee670
commit e5663e6e5d
1 changed files with 55 additions and 96 deletions

View File

@ -16,15 +16,13 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import oauth2 as oauth
import re
import thread
import time
import urlparse
import MySQLdb as mdb
from extlib import twitter
import twython
from Module import Module
@ -52,38 +50,29 @@ class Twitter(Module):
self.authre = re.compile(authpattern)
self.replytore = re.compile(replytopattern)
# prep oauth magic
# prep twitter
self.consumer_key = 'N2aSGxBP8t3cCgWyF1B2Aw'
self.consumer_secret = '0aQPEV4K3MMpicfi2lDtCP5pvjsKaqIpfuWtsPzx8'
self.request_token_url = "https://api.twitter.com/oauth/request_token"
self.access_token_url = "https://api.twitter.com/oauth/access_token"
self.authorize_url = "https://api.twitter.com/oauth/authorize"
self.consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
self.client = oauth.Client(self.consumer)
# settings
# force timeline check to wait 5 minutes (for channel joins and antispam)
self.next_timeline_check = time.time() + 300
self.authed = False
# try getting the stored auth tokens and logging in
(oauth_token, oauth_token_secret) = self._retrieve_stored_auth_tokens()
if oauth_token is not None and oauth_token_secret is not None:
self._log_in_to_twitter(oauth_token, oauth_token_secret)
if self._verify_credentials():
# self.twit is set
self.twit = twython.Twython(self.consumer_key, self.consumer_secret, oauth_token, oauth_token_secret)
if self.twit.verify_credentials():
self.authed = True
# print timeline stuff. this will set up the appropriate timer
self._check_self_timeline()
self.log.debug("Logged in to Twitter with saved token.")
else:
self.log.error("Could not log in to Twitter with saved token.")
self.twit = twitter.Api()
self.twit = twython.Twython(self.consumer_key, self.consumer_secret)
else:
# create a default twitter API account, in case we never auth
self.twit = twitter.Api()
self.twit = twython.Twython(self.consumer_key, self.consumer_secret)
thread.start_new_thread(self.thread_do, ())
def db_init(self):
@ -115,12 +104,6 @@ class Twitter(Module):
raise
finally: cur.close()
def shutdown(self):
"""Deauth, and make the twitter API item inoperable."""
Module.shutdown(self)
self.twit.ClearCredentials()
def do(self, connection, event, nick, userhost, what, admin_unlocked):
"""Attempt to do twitter things."""
@ -156,9 +139,9 @@ class Twitter(Module):
print_id = False
status = match.group(3)
try:
tweet = self.twit.GetStatus(status)
tweet = self.twit.show_status(id=status)
return self._return_tweet_or_retweet_text(tweet=tweet, print_source=print_source, print_id=print_id)
except twitter.TwitterError as e:
except twython.exceptions.TwythonError as e:
return "Couldn't obtain status: " + str(e)
def twitter_getuserstatus(self, event, nick, userhost, what, admin_unlocked):
@ -189,11 +172,11 @@ class Twitter(Module):
count = (-1*index) + 1
try:
tweets = self.twit.GetUserTimeline(screen_name=user, count=count, include_rts=True)
tweets = self.twit.get_user_timeline(screen_name=user, count=count, include_rts=True)
if tweets:
tweet = tweets[-1*index]
return self._return_tweet_or_retweet_text(tweet=tweet, print_source=print_source, print_id=print_id)
except twitter.TwitterError as e:
except twython.exceptions.TwythonError as e:
return "Couldn't obtain status: " + str(e)
except ValueError as e:
return "Couldn't obtain status: " + str(e)
@ -204,17 +187,17 @@ class Twitter(Module):
match = self.tweetre.search(what)
if match:
tweet = match.group(1)
if self._verify_credentials() is None:
if not self.twit.verify_credentials():
return "You must be authenticated to tweet."
if admin_unlocked is False:
return "Only admins can tweet."
try:
if self.twit.PostUpdates(tweet, continuation='\xe2\x80\xa6') is not None:
if self.twit.PostUpdates(status=tweet, display_coordinates=False) is not None:
return "'{0:s}' tweeted.".format(tweet)
else:
return "Unknown error sending tweet(s)."
except twitter.TwitterError as e:
except twython.exceptions.TwythonError as e:
return "Couldn't tweet: " + str(e)
def twitter_replyto(self, event, nick, userhost, what, admin_unlocked):
@ -225,21 +208,21 @@ class Twitter(Module):
status_id = match.group(1)
tweet = match.group(2)
if self._verify_credentials() is None:
if not self.twit.verify_credentials():
return "You must be authenticated to tweet."
if admin_unlocked is False:
return "Only admins can tweet."
replyee_tweet = self.twit.GetStatus(status_id)
target = replyee_tweet.user.screen_name.encode('utf-8', 'ignore')
replyee_tweet = self.twit.show_status(id=status_id)
target = replyee_tweet['user']['screen_name'].encode('utf-8', 'ignore')
try:
reptweet = "@{0:s}: {1:s}".format(target, tweet)
if self.twit.PostUpdate(reptweet, in_reply_to_status_id=status_id) is not None:
if self.twit.PostUpdate(status=reptweet, display_coordinates=False, in_reply_to_status_id=status_id) is not None:
return "'{0:s}' tweeted.".format(tweet)
else:
return "Unknown error sending tweet."
except twitter.TwitterError as e:
except twython.exceptions.TwythonError as e:
return "Couldn't tweet: " + str(e)
def twitter_gettoken(self, event, nick, userhost, what, admin_unlocked):
@ -247,61 +230,37 @@ class Twitter(Module):
match = self.gettokenre.search(what)
if match:
# get request token
resp, content = self.client.request(self.request_token_url, "GET")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
if self.twit.verify_credentials():
self.authed = False
self.twit = twython.Twython(self.consumer_key, self.consumer_secret)
self.request_token = dict(urlparse.parse_qsl(content))
# have the user auth
return "Go to the following link in your browser: %s?oauth_token=%s and then send me the pin." % (self.authorize_url, self.request_token['oauth_token'])
auth = self.twit.get_authentication_tokens()
self.temp_token = auth['oauth_token']
self.temp_token_secret = auth['oauth_token_secret']
return ("Go to the following link in your browser: {0:s} "
"and send me the pin.".format(auth['auth_url']))
def twitter_auth(self, event, nick, userhost, what, admin_unlocked):
"""Authenticate, given a PIN (following gettoken)."""
match = self.authre.search(what)
if match:
if self._verify_credentials() is not None:
return "The bot is already logged in!"
oauth_verifier = match.group(1)
self.twit = twython.Twython(self.consumer_key, self.consumer_secret,
self.temp_token, self.temp_token_secret)
final_step = self.twit.get_authorized_tokens(oauth_verifier)
authtoken = match.group(1)
oauth_verifier = authtoken
self.twit = twython.Twython(self.consumer_key, self.consumer_secret,
final_step['oauth_token'], final_step['oauth_token_secret'])
self._persist_auth_tokens(final_step['oauth_token'], final_step['oauth_token_secret'])
# request access token
token = oauth.Token(self.request_token['oauth_token'], self.request_token['oauth_token_secret'])
token.set_verifier(oauth_verifier)
client = oauth.Client(self.consumer, token)
resp, content = client.request(self.access_token_url, "POST")
access_token = dict(urlparse.parse_qsl(content))
self._log_in_to_twitter(access_token['oauth_token'], access_token['oauth_token_secret'])
if self._verify_credentials() is not None:
if self.twit.verify_credentials():
self.authed = True
# print timeline stuff. this will set up the appropriate timer
self._check_self_timeline()
return "The bot is now logged in."
else:
return "Could not log in with supplied credentials."
def _verify_credentials(self):
"""Wrap the exceptions in the twitter client VerifyExceptions()."""
try:
return self.twit.VerifyCredentials()
except Exception:
return None
def _log_in_to_twitter(self, oauth_token, oauth_token_secret):
"""Do the actual authentication against Twitter."""
# create the twitter API object
self.twit = twitter.Api(self.consumer_key, self.consumer_secret, oauth_token, oauth_token_secret)
if self._verify_credentials() is not None:
# save the auth token for later reuse
self._persist_auth_tokens(oauth_token, oauth_token_secret)
self.twit = twython.Twython(self.consumer_key, self.consumer_secret)
def thread_do(self):
"""Check the timeline."""
@ -316,13 +275,13 @@ class Twitter(Module):
if self.next_timeline_check < time.time():
self.next_timeline_check = time.time() + 300
if self._verify_credentials() is not None:
if self.twit.verify_credentials():
# get the id of the last check we made
since_id = self._get_last_since_id()
output_channel = self._get_output_channel()
if since_id is not None and output_channel != '':
tweets = self.twit.GetFriendsTimeline(since_id=since_id)
tweets = self.twit.get_home_timeline(since_id=since_id)
tweets.reverse()
for tweet in tweets:
tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True)
@ -331,7 +290,7 @@ class Twitter(Module):
# friends timeline printed, find the latest id
new_since_id = self._get_latest_tweet_id(tweets, since_id)
tweets = self.twit.GetMentions(since_id=since_id)
tweets = self.twit.get_mentions_timeline(since_id=since_id)
tweets.reverse()
for tweet in tweets:
tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True)
@ -351,24 +310,24 @@ class Twitter(Module):
"""
reply = ""
if tweet.retweeted_status:
retweet = tweet.retweeted_status
retweet = getattr(tweet, 'retweeted_status', None)
if retweet:
if print_source:
reply = "@%s (RT @%s): %s" % (tweet.user.screen_name.encode('utf-8', 'ignore'),
retweet.user.screen_name.encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(retweet.text.encode('utf-8', 'ignore')))
reply = "@%s (RT @%s): %s" % (tweet['user']['screen_name'].encode('utf-8', 'ignore'),
retweet['user']['screen_name'].encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(retweet['text'].encode('utf-8', 'ignore')))
else:
reply = "(RT @%s): %s" % (retweet.user.screen_name.encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(retweet.text.encode('utf-8', 'ignore')))
reply = "(RT @%s): %s" % (retweet['user']['screen_name'].encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(retweet['text'].encode('utf-8', 'ignore')))
else:
if print_source:
reply = "@%s: %s" % (tweet.user.screen_name.encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(tweet.text.encode('utf-8', 'ignore')))
reply = "@%s: %s" % (tweet['user']['screen_name'].encode('utf-8', 'ignore'),
super(Twitter, self)._unencode_xml(tweet['text'].encode('utf-8', 'ignore')))
else:
reply = "%s" % (super(Twitter, self)._unencode_xml(tweet.text.encode('utf-8', 'ignore')))
reply = "%s" % (super(Twitter, self)._unencode_xml(tweet['text'].encode('utf-8', 'ignore')))
if print_id:
reply = reply + " [{0:d}]".format(tweet.id)
reply = reply + " [{0:d}]".format(tweet['id'])
return reply
@ -426,8 +385,8 @@ class Twitter(Module):
latest = since_id
for tweet in tweets:
if tweet.id > latest:
latest = tweet.id
if tweet['id'] > latest:
latest = tweet['id']
return latest