twitter will occasionally fail hard, when it was doing so this check for updates wasn't reregistering, making the bot stop checking eventually
356 lines
14 KiB
Python
356 lines
14 KiB
Python
"""
|
|
Twitter - access to Twitter through bot commands
|
|
Copyright (C) 2010 Brian S. Stephan
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
from ConfigParser import NoSectionError, NoOptionError
|
|
import oauth2 as oauth
|
|
import re
|
|
import sqlite3
|
|
from threading import Timer
|
|
import urlparse
|
|
|
|
from extlib import irclib
|
|
from extlib import twitter
|
|
|
|
from Module import Module
|
|
|
|
class Twitter(Module):
|
|
|
|
"""Access Twitter via the bot as an authenticated client."""
|
|
|
|
def __init__(self, irc, config, server):
|
|
"""Prepare for oauth stuff (but don't execute it yet)."""
|
|
|
|
Module.__init__(self, irc, config, server)
|
|
|
|
# setup regexes
|
|
getstatuspattern = '^!twitter\s+getstatus\s+(\S+)$'
|
|
getuserstatuspattern = '^!twitter\s+getuserstatus\s+(\S+)(\s+.*|$)'
|
|
tweetpattern = '^!twitter\s+tweet\s+(.*)'
|
|
gettokenpattern = '^!twitter\s+gettoken$'
|
|
authpattern = '^!twitter\s+auth\s+(\S+)$'
|
|
replytopattern = '^!twitter\s+replyto\s+(\S+)\s+(.*)'
|
|
|
|
self.getstatusre = re.compile(getstatuspattern)
|
|
self.getuserstatusre = re.compile(getuserstatuspattern)
|
|
self.tweetre = re.compile(tweetpattern)
|
|
self.gettokenre = re.compile(gettokenpattern)
|
|
self.authre = re.compile(authpattern)
|
|
self.replytore = re.compile(replytopattern)
|
|
|
|
# prep oauth magic
|
|
self.consumer_key = 'N2aSGxBP8t3cCgWyF1B2Aw'
|
|
self.consumer_secret = '0aQPEV4K3MMpicfi2lDtCP5pvjsKaqIpfuWtsPzx8'
|
|
|
|
self.request_token_url = 'https://api.twitter.com/oauth/request_token'
|
|
self.access_token_url = 'https://api.twitter.com/oauth/access_token'
|
|
self.authorize_url = 'https://api.twitter.com/oauth/authorize'
|
|
|
|
self.consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
|
|
self.client = oauth.Client(self.consumer)
|
|
|
|
# create a default twitter API account, in case we never auth
|
|
self.twit = twitter.Api()
|
|
self.authed = False
|
|
|
|
def db_init(self):
|
|
"""Set up the settings table."""
|
|
|
|
# init the table if it doesn't exist
|
|
version = self.db_module_registered(self.__class__.__name__)
|
|
if version == None:
|
|
# create tables
|
|
db = self.get_db()
|
|
try:
|
|
db.execute('''
|
|
CREATE TABLE twitter_settings (
|
|
since_id INTEGER NOT NULL,
|
|
output_channel TEXT NOT NULL
|
|
)''')
|
|
db.execute('''INSERT INTO twitter_settings (since_id, output_channel) VALUES (0, '#drbotzo')''')
|
|
db.execute('INSERT INTO drbotzo_modules VALUES (?,?)', (self.__class__.__name__, 1))
|
|
db.commit()
|
|
except sqlite3.Error as e:
|
|
db.rollback()
|
|
print("sqlite error: " + str(e))
|
|
raise
|
|
|
|
def shutdown(self):
|
|
"""Deauth, and make the twitter API item inoperable."""
|
|
|
|
self.twit.ClearCredentials()
|
|
self.authed = False
|
|
|
|
def do(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Attempt to do twitter things."""
|
|
|
|
if self.getstatusre.search(what):
|
|
return self.reply(connection, event, self.twitter_getstatus(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.getuserstatusre.search(what):
|
|
return self.reply(connection, event, self.twitter_getuserstatus(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.tweetre.search(what):
|
|
return self.reply(connection, event, self.twitter_tweet(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.replytore.search(what):
|
|
return self.reply(connection, event, self.twitter_replyto(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.gettokenre.search(what):
|
|
return self.reply(connection, event, self.twitter_gettoken(connection, event, nick, userhost, what, admin_unlocked))
|
|
elif self.authre.search(what):
|
|
return self.reply(connection, event, self.twitter_auth(connection, event, nick, userhost, what, admin_unlocked))
|
|
|
|
def twitter_getstatus(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Get a status by tweet ID."""
|
|
|
|
match = self.getstatusre.search(what)
|
|
if match:
|
|
status = match.group(1)
|
|
try:
|
|
tweet = self.twit.GetStatus(status)
|
|
return self._return_tweet_or_retweet_text(tweet=tweet, print_source=True)
|
|
except twitter.TwitterError as e:
|
|
return 'Couldn\'t obtain status: ' + str(e)
|
|
|
|
def twitter_getuserstatus(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Get a status for a user. Allows for getting one other than the most recent."""
|
|
|
|
match = self.getuserstatusre.search(what)
|
|
if match:
|
|
user = match.group(1)
|
|
index = match.group(2)
|
|
|
|
if index:
|
|
index = int(index)
|
|
if index > 0:
|
|
index = 0
|
|
else:
|
|
index = 0
|
|
count = (-1*index) + 1
|
|
|
|
try:
|
|
tweets = self.twit.GetUserTimeline(screen_name=user, count=count, include_rts=True)
|
|
if tweets:
|
|
tweet = tweets[-1*index]
|
|
return self._return_tweet_or_retweet_text(tweet=tweet)
|
|
except twitter.TwitterError as e:
|
|
return 'Couldn\'t obtain status: ' + str(e)
|
|
|
|
def twitter_tweet(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Tweet. Needs authentication."""
|
|
|
|
match = self.tweetre.search(what)
|
|
if match:
|
|
tweet = match.group(1)
|
|
if self.authed is False:
|
|
return 'You must be authenticated to tweet.'
|
|
if admin_unlocked is False:
|
|
return 'Only admins can tweet.'
|
|
|
|
try:
|
|
if self.twit.PostUpdates(tweet, continuation='\xe2\x80\xa6') is not None:
|
|
return 'Tweet(s) sent.'
|
|
else:
|
|
return 'Unknown error sending tweet(s).'
|
|
except twitter.TwitterError as e:
|
|
return 'Couldn\'t tweet: ' + str(e)
|
|
|
|
def twitter_replyto(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Reply to a tweet, in the twitter in_reply_to_status_id sense. Needs authentication."""
|
|
|
|
match = self.replytore.search(what)
|
|
if match:
|
|
status_id = match.group(1)
|
|
tweet = match.group(2)
|
|
|
|
if self.authed is False:
|
|
return 'You must be authenticated to tweet.'
|
|
if admin_unlocked is False:
|
|
return 'Only admins can tweet.'
|
|
|
|
replyee_tweet = self.twit.GetStatus(status_id)
|
|
target = replyee_tweet.user.screen_name.encode('utf-8', 'ignore')
|
|
|
|
try:
|
|
if self.twit.PostUpdate('@'+target+': '+tweet, in_reply_to_status_id=status_id) is not None:
|
|
return 'Tweet sent.'
|
|
else:
|
|
return 'Unknown error sending tweet.'
|
|
except twitter.TwitterError as e:
|
|
return 'Couldn\'t tweet: ' + str(e)
|
|
|
|
def twitter_gettoken(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Get an oauth token, so that the user may authenticate the bot."""
|
|
|
|
match = self.gettokenre.search(what)
|
|
if match:
|
|
# get request token
|
|
resp, content = self.client.request(self.request_token_url, "GET")
|
|
if resp['status'] != '200':
|
|
raise Exception("Invalid response %s." % resp['status'])
|
|
|
|
self.request_token = dict(urlparse.parse_qsl(content))
|
|
|
|
# have the user auth
|
|
return 'Go to the following link in your browser: %s?oauth_token=%s and then send me the pin.' % (self.authorize_url, self.request_token['oauth_token'])
|
|
|
|
def twitter_auth(self, connection, event, nick, userhost, what, admin_unlocked):
|
|
"""Authenticate, given a PIN (following gettoken)."""
|
|
|
|
match = self.authre.search(what)
|
|
if match:
|
|
authtoken = match.group(1)
|
|
oauth_verifier = authtoken
|
|
|
|
# request access token
|
|
token = oauth.Token(self.request_token['oauth_token'], self.request_token['oauth_token_secret'])
|
|
token.set_verifier(oauth_verifier)
|
|
client = oauth.Client(self.consumer, token)
|
|
|
|
resp, content = client.request(self.access_token_url, "POST")
|
|
self.access_token = dict(urlparse.parse_qsl(content))
|
|
|
|
# finally, create the twitter API object
|
|
self.twit = twitter.Api(self.consumer_key, self.consumer_secret, self.access_token['oauth_token'], self.access_token['oauth_token_secret'])
|
|
self.authed = True
|
|
|
|
# ugly, but the best place to track it. store the connection for later use
|
|
self.connection = connection
|
|
|
|
# print timeline stuff. this will set up the appropriate timer
|
|
self._check_self_timeline()
|
|
|
|
return 'The bot is now logged in.'
|
|
|
|
def _check_self_timeline(self):
|
|
"""Check my timeline, and if there are entries, print them to the channel."""
|
|
|
|
# re-register this check
|
|
Timer(300, self._check_self_timeline, ()).start()
|
|
|
|
if self.authed:
|
|
# get the id of the last check we made
|
|
since_id = self._get_last_since_id()
|
|
output_channel = self._get_output_channel()
|
|
|
|
if since_id is not None and output_channel != '':
|
|
tweets = self.twit.GetFriendsTimeline(since_id=since_id)
|
|
tweets.reverse()
|
|
for tweet in tweets:
|
|
tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True)
|
|
self.sendmsg(self.connection, output_channel.encode('utf-8', 'ignore'), tweet_text)
|
|
|
|
# friends timeline printed, find the latest id
|
|
new_since_id = self._get_latest_tweet_id(tweets, since_id)
|
|
|
|
tweets = self.twit.GetMentions(since_id=since_id)
|
|
tweets.reverse()
|
|
for tweet in tweets:
|
|
tweet_text = self._return_tweet_or_retweet_text(tweet=tweet, print_source=True)
|
|
self.sendmsg(self.connection, output_channel.encode('utf-8', 'ignore'), tweet_text)
|
|
|
|
# mentions printed, find the latest id
|
|
new_since_id = self._get_latest_tweet_id(tweets, new_since_id)
|
|
|
|
# set since_id
|
|
self._set_last_since_id(new_since_id)
|
|
|
|
def _return_tweet_or_retweet_text(self, tweet, print_source=False):
|
|
"""
|
|
Return a string of the author and text body of a status,
|
|
accounting for whether or not the fetched status is a
|
|
retweet.
|
|
"""
|
|
|
|
if tweet.retweeted_status:
|
|
retweet = tweet.retweeted_status
|
|
if print_source:
|
|
return '%s (RT %s): %s [%s]' % (tweet.user.name.encode('utf-8', 'ignore'), retweet.user.name.encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(retweet.text.encode('utf-8', 'ignore')), tweet.id)
|
|
else:
|
|
return '(RT %s): %s [%s]' % (retweet.user.name.encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(retweet.text.encode('utf-8', 'ignore')), tweet.id)
|
|
else:
|
|
if print_source:
|
|
return '%s: %s [%s]' % (tweet.user.name.encode('utf-8', 'ignore'), super(Twitter, self)._unencode_xml(tweet.text.encode('utf-8', 'ignore')), tweet.id)
|
|
else:
|
|
return '%s [%s]' % (super(Twitter, self)._unencode_xml(tweet.text.encode('utf-8', 'ignore')), tweet.id)
|
|
|
|
def _get_last_since_id(self):
|
|
"""Get the since_id out of the database."""
|
|
|
|
try:
|
|
# need to create our own db object, since this is likely going
|
|
# to be called in a new thread
|
|
dbfile = self.config.get('dr.botzo', 'database')
|
|
self.conn = sqlite3.connect(dbfile)
|
|
self.conn.row_factory = sqlite3.Row
|
|
db = self.conn
|
|
query = 'SELECT since_id FROM twitter_settings'
|
|
cursor = db.execute(query)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result['since_id']
|
|
except sqlite3.Error as e:
|
|
print("sqlite error: " + str(e))
|
|
raise
|
|
|
|
def _get_output_channel(self):
|
|
"""Get the output_channel out of the database."""
|
|
|
|
try:
|
|
# need to create our own db object, since this is likely going
|
|
# to be called in a new thread
|
|
dbfile = self.config.get('dr.botzo', 'database')
|
|
self.conn = sqlite3.connect(dbfile)
|
|
self.conn.row_factory = sqlite3.Row
|
|
db = self.conn
|
|
query = 'SELECT output_channel FROM twitter_settings'
|
|
cursor = db.execute(query)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result['output_channel']
|
|
except sqlite3.Error as e:
|
|
print("sqlite error: " + str(e))
|
|
raise
|
|
|
|
def _set_last_since_id(self, since_id):
|
|
"""Set the since_id."""
|
|
|
|
try:
|
|
# need to create our own db object, since this is likely going
|
|
# to be called in a new thread
|
|
dbfile = self.config.get('dr.botzo', 'database')
|
|
self.conn = sqlite3.connect(dbfile)
|
|
self.conn.row_factory = sqlite3.Row
|
|
db = self.conn
|
|
cur = db.cursor()
|
|
statement = 'UPDATE twitter_settings SET since_id = ?'
|
|
cur.execute(statement, (since_id,))
|
|
db.commit()
|
|
except sqlite3.Error as e:
|
|
print("sqlite error: " + str(e))
|
|
raise
|
|
|
|
def _get_latest_tweet_id(self, tweets, since_id):
|
|
"""Find the latest tweet id in the provided list, or the given since_id."""
|
|
|
|
latest = since_id
|
|
for tweet in tweets:
|
|
if tweet.id > latest:
|
|
latest = tweet.id
|
|
|
|
return latest
|
|
|
|
# vi:tabstop=4:expandtab:autoindent
|
|
# kate: indent-mode python;indent-width 4;replace-tabs on;
|