okay, it's time. we got around for a while with all sorts of silly config options and exceptions and common strings triggering bot commands. but now it's time to man up and expect modules to be more strict and less loosey-goosey. convert the popular modules (i.e. the ones that still work) to trigger on !pi rather than pi, etc. usually, this is achieved via regex searches, although there are some weird bugs (ones i'm hoping are caused by other recursion/alias bugs and not this commit). more code around this will be gutted soon, but this, at least, means you can't say 'tweet that shit, yo' and accidentally trigger the bot.
157 lines
6.4 KiB
Python
157 lines
6.4 KiB
Python
"""
|
|
Twitter - access to Twitter through bot commands
|
|
Copyright (C) 2010 Brian S. Stephan
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
import re
|
|
import oauth2 as oauth
|
|
import urlparse
|
|
|
|
from extlib import irclib
|
|
from extlib import twitter
|
|
|
|
from Module import Module
|
|
|
|
class Twitter(Module):
|
|
"""Access Twitter via the bot as an authenticated client."""
|
|
|
|
def __init__(self, irc, config, server):
|
|
|
|
"""Prompt the user for oauth stuff when starting up.
|
|
|
|
TODO: make this optional, and have API calls log if they need auth.
|
|
"""
|
|
|
|
Module.__init__(self, irc, config, server)
|
|
|
|
# begin oauth magic
|
|
self.consumer_key = 'N2aSGxBP8t3cCgWyF1B2Aw'
|
|
self.consumer_secret = '0aQPEV4K3MMpicfi2lDtCP5pvjsKaqIpfuWtsPzx8'
|
|
|
|
self.request_token_url = 'https://api.twitter.com/oauth/request_token'
|
|
self.access_token_url = 'https://api.twitter.com/oauth/access_token'
|
|
self.authorize_url = 'https://api.twitter.com/oauth/authorize'
|
|
|
|
self.consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
|
|
self.client = oauth.Client(self.consumer)
|
|
|
|
# create a default twitter API account, in case we never auth
|
|
self.twit = twitter.Api()
|
|
self.authed = False
|
|
|
|
def do(self, connection, event, nick, userhost, replypath, what, admin_unlocked):
|
|
"""
|
|
Attempt to do twitter things.
|
|
"""
|
|
|
|
match = re.search('^!twitter\s+getstatus\s+(\S+)$', what)
|
|
if match:
|
|
status = match.group(1)
|
|
try:
|
|
tweet = self.twit.GetStatus(status)
|
|
return self.reply(connection, replypath, self.tweet_or_retweet_text(tweet=tweet, print_source=True))
|
|
except twitter.TwitterError as e:
|
|
return self.reply(connection, replypath, 'Couldn\'t obtain status: ' + str(e))
|
|
|
|
match = re.search('^!twitter\s+getuserstatus\s+(\S+)(\s+.*|$)', what)
|
|
if match:
|
|
user = match.group(1)
|
|
index = match.group(2)
|
|
|
|
if index:
|
|
index = int(index)
|
|
if index > 0:
|
|
index = 0
|
|
else:
|
|
index = 0
|
|
count = (-1*index) + 1
|
|
|
|
try:
|
|
tweets = self.twit.GetUserTimeline(screen_name=user, count=count, include_rts=True)
|
|
if tweets:
|
|
tweet = tweets[-1*index]
|
|
return self.reply(connection, replypath, self.tweet_or_retweet_text(tweet=tweet))
|
|
except twitter.TwitterError as e:
|
|
return self.reply(connection, replypath, 'Couldn\'t obtain status: ' + str(e))
|
|
|
|
match = re.search('^!twitter\s+tweet\s+(.*)', what)
|
|
if match:
|
|
tweet = match.group(1)
|
|
if self.authed is False:
|
|
return self.reply(connection, replypath, 'You must be authenticated to tweet.')
|
|
if admin_unlocked is False:
|
|
return self.reply(connection, replypath, 'Only admins can tweet.')
|
|
|
|
try:
|
|
if self.twit.PostUpdates(tweet, continuation='\xe2\x80\xa6') is not None:
|
|
return self.reply(connection, replypath, 'Tweet(s) sent.')
|
|
else:
|
|
return self.reply(connection, replypath, 'Unknown error sending tweet(s).')
|
|
except twitter.TwitterError as e:
|
|
return self.reply(connection, replypath, 'Couldn\'t tweet: ' + str(e))
|
|
|
|
match = re.search('^!twitter\s+gettoken$', what)
|
|
if match:
|
|
# get request token
|
|
resp, content = self.client.request(self.request_token_url, "GET")
|
|
if resp['status'] != '200':
|
|
raise Exception("Invalid response %s." % resp['status'])
|
|
|
|
self.request_token = dict(urlparse.parse_qsl(content))
|
|
|
|
# have the user auth
|
|
return self.reply(connection, replypath, 'Go to the following link in your browser: %s?oauth_token=%s and then send me the pin.' % (self.authorize_url, self.request_token['oauth_token']))
|
|
|
|
match = re.search('^!twitter\s+auth\s+(\S+)$', what)
|
|
if match:
|
|
authtoken = match.group(1)
|
|
oauth_verifier = authtoken
|
|
|
|
# request access token
|
|
token = oauth.Token(self.request_token['oauth_token'], self.request_token['oauth_token_secret'])
|
|
token.set_verifier(oauth_verifier)
|
|
client = oauth.Client(self.consumer, token)
|
|
|
|
resp, content = client.request(self.access_token_url, "POST")
|
|
self.access_token = dict(urlparse.parse_qsl(content))
|
|
|
|
# finally, create the twitter API object
|
|
self.twit = twitter.Api(self.consumer_key, self.consumer_secret, self.access_token['oauth_token'], self.access_token['oauth_token_secret'])
|
|
self.authed = True
|
|
return self.reply(connection, replypath, 'The bot is now logged in.')
|
|
|
|
def tweet_or_retweet_text(self, tweet, print_source=False):
|
|
"""
|
|
Return a string of the author and text body of a status,
|
|
accounting for whether or not the fetched status is a
|
|
retweet.
|
|
"""
|
|
|
|
if tweet.retweeted_status:
|
|
retweet = tweet.retweeted_status
|
|
if print_source:
|
|
return '%s (RT %s): %s [%s]' % (tweet.user.name.encode('utf-8', 'ignore'), retweet.user.name.encode('utf-8', 'ignore'), retweet.text.encode('utf-8', 'ignore'), tweet.id)
|
|
else:
|
|
return '(RT %s): %s [%s]' % (retweet.user.name.encode('utf-8', 'ignore'), retweet.text.encode('utf-8', 'ignore'), tweet.id)
|
|
else:
|
|
if print_source:
|
|
return '%s: %s [%s]' % (tweet.user.name.encode('utf-8', 'ignore'), tweet.text.encode('utf-8', 'ignore'), tweet.id)
|
|
else:
|
|
return '%s [%s]' % (tweet.text.encode('utf-8', 'ignore'), tweet.id)
|
|
|
|
# vi:tabstop=4:expandtab:autoindent
|
|
# kate: indent-mode python;indent-width 4;replace-tabs on;
|