#!/usr/bin/python # # Copyright 2011 Edward Toroshchin # # This is bird, the console Twitter client. # # bird is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # bird is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See the full text of the GNU General Public License in the COPYING # file in source code directory. # # If you are unable to read that file, see . # import sys try: import twython except ImportError as e: print ("You should have Twython installed to run bird. Get it from https://github.com/ryanmcgrath/twython") raise import ConfigParser import os import shelve import threading class TwitterError(Exception): pass class WrapTwython: def __init__(self, client_args={}, *args, **kwargs): try: self.twython = twython.Twython(client_args=client_args, *args, **kwargs) except TypeError, e: print "-- Your version of Twython does not support timeouts. Bird reserves the right to hang at any moment because of Twitter." self.twython = twython.Twython(*args, **kwargs) if not 'verifyCredentials' in twython.twython.api_table: twython.twython.api_table['verifyCredentials'] = { 'url': '/account/verify_credentials.json', 'method': 'GET', } def __getattr__(self, attr): twattr = getattr(self.twython, attr) def wrapper(**kwargs): try: result = twattr(**kwargs) except twython.twython.TwythonError, e: raise TwitterError(e) if u'error' in result: raise TwitterError(result[u'error']) return result return wrapper def MaximumHitFrequency(self): return 10 class Bird: '''Main class for Bird instance''' LIMIT_SEEN_LIST=100 class NoSuchCommand(Exception): pass class Message: def __init__(self, text): self.text = text def __unicode__(self): return self.text def do(self, api): api.updateStatus(status=self.text.encode('utf-8')) class Reply: def __init__(self, text, id): self.text = text self.id = id def __unicode__(self): return self.text def do(self, api): api.updateStatus(status=self.text.encode('utf-8'), in_reply_to_status_id=self.id) class Retweet: def __init__(self, id): self.id = id def __unicode__(self): return u"retweet of {0}".format(self.id) def do(self, api): api.reTweet(id=self.id) def __init__(self, directory=None): self.directory = directory or '.' self.__prepare_directory() self.__read_config() self.sendqueue = [] self.clock = threading.Condition() self.daemon = threading.Thread(target=self.__daemon, name='twitter daemon') self.daemon.daemon = True self.api = WrapTwython() self.creds = shelve.open(self.file('bird.key')) self.seen = [] self.keys = {} self.lastkeys = {} self.hooks = {'filter_incoming': [], 'filter_outgoing': [], 'filter_status_rendering': [], 'update': [], } self.echo("accessing Twitter") try: self.__initapi() self.user = self.api.verifyCredentials() except (TwitterError, KeyError): self.echo("failed to access Twitter, please re-authenticate") try: self.__authenticate() self.user = self.api.verifyCredentials() except Exception as e: self.echo("failed to re-authenticate ({0}), bailing out".format(e)) sys.exit(1) except twython.twython.urllib2.URLError as e: self.error("Twitter unavailable: {0}".format(e)) sys.exit(2) self.echo("access granted") self.creds.close() self.__load_modules() self.daemon.start() def __prepare_directory(self): try: os.makedirs(self.directory) except OSError: pass if self.directory == '.': self.warn(u"using current directory to hold keys and configs, probably not what you want") def __read_config(self): self.config = ConfigParser.SafeConfigParser() self.config.read([self.file('birdrc')]) def __load_modules(self): self.modules = [self] for modname in self.getvar('modules', '').split(' '): if not modname: continue try: self.do_load(self, modname) except Exception as e: self.error("failed to load module {0}: {1}".format(modname, e)) def __enumerate(self, status, prefix): key = 'a0' if prefix in self.lastkeys: key = self.lastkeys[prefix] a, b = key[0], key[1] a = a.lower() if b == '9': b = '0' if a == 'z': a = 'a' else: a = chr(ord(a) + 1) else: b = chr(ord(b) + 1) if a == 'l': a = 'L' key = a + b self.lastkeys[prefix] = key key = prefix + key.decode('utf-8') self.keys[key] = status['id'] return key def __print_status(self, status, prefix): params = {'name': status['user']['screen_name'], 'text': status['text']} if 'retweeted_status' in status: params['text'] = u"RT @{0}: {1}".format(status['retweeted_status']['user']['screen_name'], status['retweeted_status']['text']) params['text'] = params['text'].replace("\n", "\\n") params['key'] = self.__enumerate(status, prefix) color = 'tweet' if status['in_reply_to_status_id']: color = ('reply', 94) if status['in_reply_to_user_id'] == self.user['id']: color = ('replytome', 95) if status['text'].find(u"@{0}".format(self.user['screen_name'])) >= 0: color = ('replytome', 95) if status['user']['id'] == self.user['id']: color = ('own', 91) params['name'] = self.paint(params['name'], ('author', 1)) params['text'] = self.paint(params['text'], color) print self.__run_hooks('filter_status_rendering', u"({key}) <{name}> {text}".format(**params), status=status) def __get_timelines(self, page=None): result = page and self.api.getHomeTimeline() or self.api.getHomeTimeline(page=page) if self.getvar('mentions', False) and (not page or page == 1): result += self.api.getUserMentions() return result def __remove_duplicates(self, result): if not result: return [] if len(result) == 1: return result result.sort(lambda x,y: cmp(y['id'], x['id'])) k = result[-1] for i in range(len(result)-2, -1, -1): if k['id'] == result[i]['id']: del result[i] else: k = result[i] return result def __get_new_tweets(self): result = [] if not self.seen: result = self.__get_timelines() self.seen = [k['id'] for k in result] else: proceed = True page = 1 newseen = [] while proceed: new = self.__get_timelines(page) newseen += [k['id'] for k in new] if not new: proceed = False for status in new: if status['id'] in self.seen: proceed = False else: result.append(status) page += 1 self.seen = list(set( self.seen + newseen )) if len(self.seen) > Bird.LIMIT_SEEN_LIST: self.seen = self.seen[:Bird.LIMIT_SEEN_LIST] result = self.__remove_duplicates(result) result.reverse() result = self.__run_hooks('filter_incoming', result) for status in result: self.__print_status(status, '') self.__run_hooks('update', None) def __daemon(self): while True: self.clock.acquire() while not self.sendqueue: limit = 60 try: self.__get_new_tweets() mlimit = self.api.MaximumHitFrequency() if mlimit > limit: limit = mlimit self.echo( u"throttled, new hit frequency is {0}".format(limit) ) except Exception, e: self.warn(u"updating tweets failed: {0}".format(e)) self.clock.wait(limit) tw = self.sendqueue.pop(0) try: tw.do(self.api) except Exception, e: self.error(u"tweeting '{0}' failed: {1}".format(tw, e)) self.clock.release() def __send_tweet(self, string): result = self.__run_hooks('filter_outgoing', [string]) if not result: return self.clock.acquire() self.sendqueue += result self.clock.notify() self.clock.release() def __initapi(self): self.api = WrapTwython(client_args={'timeout': self.getvar('timeout', 10.0)}, twitter_token='fe4WfR8gZq1pkThdGAIbYA', twitter_secret='NfrnJPUjYDBLnxiubyZPs4Ad973I7PE0xM8KA4Cvg38', oauth_token=self.creds['a_key'], oauth_token_secret=self.creds['a_secret']) def __authenticate(self): from urlparse import parse_qsl from twython.twython import oauth self.echo( """bird will now try to authenticate you with Twitter. If you don't want bird to access your Twitter account, hit ^C now.""" ) client = oauth.Client(oauth.Consumer(key='fe4WfR8gZq1pkThdGAIbYA', secret='NfrnJPUjYDBLnxiubyZPs4Ad973I7PE0xM8KA4Cvg38')) r, c = client.request('https://api.twitter.com/oauth/request_token', 'GET') if r['status'] != '200': self.error(u"bad response from Twitter, status: {0} (probably a Fail Whale?)".format(r['status'])) token = dict(parse_qsl(c)) self.echo( """now, please, visit the following page and follow the instructions: https://api.twitter.com/oauth/authorize?oauth_token={0}\n""".format(token['oauth_token'])) pin = raw_input("PIN? ") at = oauth.Token(token['oauth_token'], token['oauth_token_secret']) at.set_verifier(pin) client = oauth.Client(oauth.Consumer(key='fe4WfR8gZq1pkThdGAIbYA', secret='NfrnJPUjYDBLnxiubyZPs4Ad973I7PE0xM8KA4Cvg38'), at) r, c = client.request('https://api.twitter.com/oauth/access_token', method='POST', body='oauth_verifier={0}'.format(pin)) if r['status'] != '200': self.error(u"bad response from Twitter, status {0}, text: {1}".format( r['status'], c )) else: access = dict(parse_qsl(c)) self.echo( "authentication completed successfully" ) self.creds['a_key'] = access['oauth_token'] self.creds['a_secret'] = access['oauth_token_secret'] self.__initapi() def __run_hooks(self, hookname, arg, *args, **kwargs): result = arg for hook in self.hooks[hookname]: try: result = hook(self, result, *args, **kwargs) except Exception, e: self.warn("hook {0} failed: {1}".format(hook, e)) return result def execute(self, string): '''Execute a line of user's input''' if not isinstance(string, unicode): self.warn("input line is not unicode. Garbage and chaos may ensue. Check that your locale is set correctly") if not string: return if string[0] == '/': command = string[1:].split(' ', 1) try: if len(command) > 1: self.exec_command(command[0], command[1]) else: self.exec_command(command[0], '') except Bird.NoSuchCommand: self.echo(u'unknown command: {0}. Try /help or something.'.format(command[0])) else: self.send(string) def warn(self, string): '''Warn a user about something''' print self.paint(u"-- WARNING: {0}".format(string), ("warn", 93)) def error(self, string): '''Warn a user about something very wrong''' print self.paint(u"-- ERROR: {0}".format(string), ("error", 95)) def echo(self, string): '''Notify a user about something''' print (u"-- {0}".format(string)) def file(self, filename): '''Get a file with a given string in user's directory''' return os.path.join(self.directory, filename) def getvar(self, var, default=None): '''Get a variable with specified name from config''' try: method = self.config.get if isinstance(default, bool): method = self.config.getboolean section, variable = 'bird', var if var.find('.') > 0: section, variable = var.split('.', 1) return method(section, variable) except ConfigParser.Error: return default def setvar(self, var, value): '''Set a variable to a given value''' section, variable = 'bird', var if var.find('.') > 0: section, variable = var.split('.', 1) try: self.config.add_section(section) except ConfigParser.DuplicateSectionError: pass self.config.set(section, variable, value) def hook(self, hookname, hookfunc): '''Add given hookfunc to a hook with given name hookname''' self.hooks[hookname] += [hookfunc] def paint(self, text, color): '''Return a text, colorized with given color. Color should be either a numerical value, a string, or a tuple (color, default)''' if self.getvar('colors', False): ccode = None if isinstance(color, int): ccode = color elif isinstance(color, str): ccode = self.getvar("color.{0}".format(color)) elif isinstance(color, tuple): ccode = self.getvar("color.{0}".format(color[0]), color[1]) if ccode: return u"\033[{0}m{1}\033[0m".format(ccode, text) return text def printStatus(self, status, prefix='u'): '''Print a status, or a list of statuses. Start keys with a given prefix''' if isinstance(status, list): for k in status: self.__print_status(k, prefix) else: self.__print_status(status, prefix) def getOriginalStatus(self, status): '''If tweet is a retweet, return original tweet.''' if 'retweeted_status' in status: return status['retweeted_status'] else: return status def send(self, string): '''Send a tweet or a sequence of command classes''' if isinstance(string, list): for item in string: self.__send_tweet(item) else: self.__send_tweet(Bird.Message(string)) def exec_command(self, command, arg): '''Execute a given command with given args. Will raise NoSuchCommand upon error.''' attrname = u'do_{0}'.format(command) for module in self.modules: if hasattr(module, attrname): method = getattr(module, attrname) if callable(method): try: method(self, arg) except Exception, e: self.error(u"exception occured: {0}".format(e)) return raise Bird.NoSuchCommand() def do_quit(self, bird, arg): '''exit bird''' sys.exit(0) def do_exec(self, bird, arg): '''execute arbitrary Python expression (don't forget to use "print" and "self")''' exec(arg) def do_load(self, bird, arg): '''load a module with given name''' import imp f, p, d = imp.find_module(arg) with f: modinstance = imp.load_module(arg, f, p, d) inst = modinstance.instance if callable(inst): inst = inst(self) if inst: self.modules += [inst] def do_rt(self, bird, arg): '''retweed someone's tweet''' try: id = self.keys[arg] self.echo( u"(retweeting)" ) self.__send_tweet(Bird.Retweet(id)) except KeyError: self.error( u"unknown key {0}".format(arg) ) def do_re(self, bird, arg): '''reply to someone's tweet''' try: key, text = arg.split(' ', 1) id = self.keys[key] tweet = self.api.showStatus(id=id) name = tweet['user']['screen_name'] self.__send_tweet(Bird.Reply(u"@{0} {1}".format(name, text), id)) except ValueError: self.error( u"missing argument: tweet to reply to" ) except KeyError as e: self.error( u"unknown key {0}".format(e.args[0]) ) except TwitterError as e: self.error( u"Twitter error: {0} Probably message has been deleted".format(e) ) def do_save(self, bird, var): '''save configuration''' self.config.write(open(self.file('birdrc'), 'wb')) self.echo(u"configuration file saved") def do_help(self, bird, arg): '''show available commands''' self.echo(u"available commands:") for m in self.modules: for a in [x for x in dir(m) if x.startswith(u'do_')]: f = getattr(m, a) doc = f.__doc__ or u"no help available" self.echo(u"/{0:15} {1}".format(a[3:], doc)) def do_set(self, bird, arg): '''set or show a value for a variable (don't forget to /save)''' try: key, text = arg.split(' ', 1) self.setvar(key, text) except ValueError: self.echo(u"{0} == {1}".format(arg, self.getvar(arg))) if __name__ == '__main__': import readline from optparse import OptionParser parser = OptionParser() parser.add_option("-d", "--dir", dest="directory", help="directory for configs, keys and such", default=os.path.join(os.getenv('HOME'), '.config/bird')) options, args = parser.parse_args() bird = Bird(directory=options.directory) while True: prompt = 'bird> ' try: line = raw_input(prompt) encoding = getattr(sys.stdin, "encoding", None) if encoding and not isinstance(line, unicode): line = line.decode(encoding) bird.execute(line) except EOFError: break # vim:sw=4:ts=4:et