December 2009
November 2009
October 2009
September 2009
June 2009
April 2009
March 2009
February 2009
January 2009
December 2008
November 2008
October 2008
July 2008
June 2008
October 2007
September 2007
Last night, I got my first non-spam comment in some time. Harshad Joshi isn't a fan of the re and wanted to know if the code from my eariler post could be rewritten to not include it. Revisiting the code, I realize now that using python's regular expressions was a bit of overkill for this little script. So here it is without re. It also includes more resilience, logging, and getopt has been replaced by optparse.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | #!/usr/bin/env python import logging import simplejson import sys import time import traceback import twitter import urllib2 from getpass import getpass from optparse import OptionParser from urllib import urlencode MAX_ERRORS = 10 def resilient_apply(func, *args, **kwargs): """ If something goes awry, don't die! Log it! Works just like regular apply. max_errors keyword arg determines when we should quit trying and exit. """ if 'max_errors' in kwargs: max_errors = kwargs['max_errors'] del kwargs['max_errors'] else: max_errors = MAX_ERRORS errors = 0 while errors < max_errors: try: return apply(func, args, kwargs) except KeyboardInterrupt: raise SystemExit(1) except: logging.error("".join( traceback.format_exception(*sys.exc_info()))) errors += 1 logging.error("Maximum errors (%d) exceeded" % max_errors) raise SystemExit(1) def compile_filter(query): good = [] bad = [] words = query.split() for word in words: word = word.lower() if word.startswith('-'): bad.append(word[1:]) else: good.append(word) return (good, bad) def filter_user_by_bio(user, filter, api=None): logging.debug('Looking up %s' % user) api = resilient_apply(twitter.Api) bio = resilient_apply(api.GetUser, user).GetDescription() if bio is None: return False # We only follow those with bios bio = bio.lower() good, bad = filter goodmatches = [] for word in bad: if word in bio: return False for word in good: if word in bio: goodmatches.append(word) if good == goodmatches: return True return False def follow_by_query(username, password, q, rpp=None, lang=None): filter = compile_filter(q) api = resilient_apply(twitter.Api, username=username, password=password) friends = [user.GetScreenName() for user in resilient_apply(api.GetFriends)] goodusers = [] for user in get_users_from_search(q, rpp, lang): if filter_user_by_bio(user, filter, api): goodusers.append(user) newusers = [] for user in goodusers: if not user in friends: logging.debug('Creating friendship %s' % user) resilient_apply(api.CreateFriendship, user) friends.append(user) newusers.append(user) return newusers def get_users_from_search(query, resultnum=None, lang=None): q = [] rpp = 10 q.append(urlencode({'q': query})) if not lang is None: q.append(urlencode({'lang': lang})) if not resultnum is None: rpp = resultnum q.append(urlencode({'rpp': rpp})) response = resilient_apply( urllib2.urlopen, 'http://search.twitter.com/search.json?', '&'.join(q) ) data = simplejson.load(response) for result in data['results']: yield result['from_user'] def main(): parser = OptionParser('usage: %prog [options] search terms') parser.add_option('-u', '--username', dest='username', default=None) parser.add_option('-p', '--password', dest='password', default=None) parser.add_option('-r', '--results', dest='rpp', default=None) parser.add_option('-l', '--lang', dest='lang', default=None) parser.add_option('-f', '--logfile', dest='logfile', default=None) parser.add_option('-v', '--logginglevel', dest='level', default='INFO') options, args = parser.parse_args() if not hasattr(logging, options.level): parser.error("level %s is not acceptable" % options.level) if options.username is None: parser.error("username is required") logging_args = { 'format': '%(asctime)s %(levelname)s %(message)s', 'level': getattr(logging, options.level), } if options.logfile is None: logging_args['stream'] = sys.stdout else: logging_args['filename'] = options.logfile logging.basicConfig(**logging_args) if options.password is None: options.password = getpass() newusers = follow_by_query( options.username, options.password, " ".join(args), options.rpp, options.lang, ) if newusers: logging.info(", ".join(newusers) + ' Added!') if __name__ == '__main__': main() |
Inspired by Amy Iris, I have made a little bit of automation for twitter. On twitter, it is not easy to find others by interest. This little piece of code runs a search on the terms you specify and then checks the bios of each poster for your search terms. With each user that is a match, it will add them as a follower for you.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | #!/usr/bin/env python import getopt import re import simplejson import sys import time import twitter import urllib2 from getpass import getpass from urllib import urlencode def compile_filter(query): good = [] bad = [] words = query.split() for word in words: if word[0] == '-': bad.append(re.compile(word, re.IGNORECASE)) else: good.append(re.compile(word, re.IGNORECASE)) return (good, bad) def filter_user_by_bio(user, filter, api=None): if api is None: api = twitter.Api() bio = api.GetUser(user).GetDescription() if bio is None: return False # We only follow those with bios good, bad = filter goodmatches = [] for word in bad: if not word.search(bio) is None: return False for word in good: if not word.search(bio) is None: goodmatches.append(word) if good == goodmatches: return True return False def follow_by_query(username, password, q, rpp=None, lang=None): filter = compile_filter(q) api = twitter.Api(username=username, password=password) friends = [] for user in api.GetFriends(): friends.append(user.GetScreenName()) goodusers = [] for user in get_users_from_search(q, rpp, lang): if filter_user_by_bio(user, filter, api): goodusers.append(user) newusers = [] for user in goodusers: if not user in friends: api.CreateFriendship(user) friends.append(user) newusers.append(user) return newusers def get_users_from_search(query, resultnum=None, lang=None): q = [] rpp = 10 q.append(urlencode({'q': query})) if not lang is None: q.append(urlencode({'lang': lang})) if not resultnum is None: rpp = resultnum q.append(urlencode({'rpp': rpp})) response = urllib2.urlopen( 'http://search.twitter.com/search.json?', '&'.join(q) ) data = simplejson.load(response) for result in data['results']: yield result['from_user'] def print_usage(): sys.stderr.write(""" Usage: %s -u username [-p password] [-r search_result_number] [-l language] terms ... -l language = Filter search by language. -p password = Optional. If not supplied, you will be asked for it. -r search_result_number = Number of results to pull from twitter searches. -u username = twitter username. """ % sys.argv[0]) if __name__ == '__main__': optlist, args = getopt.getopt(sys.argv[1:], 'l:p:r:u:') if not args: sys.stderr.write("You must specify search terms\n") print_usage() raise SystemExit, 1 optd = dict(optlist) if not '-u' in optd: sys.stderr.write("You must specify a user\n") print_usage() raise SystemExit, 1 username = optd['-u'] query = " ".join(args) if not '-p' in optd: sys.stderr.write("Password:") password = getpass("") else: password = optd['-p'] rpp = None if '-r' in optd: rpp = optd['-r'] lang = None if '-l' in optd: lang = optd['-l'] try: newusers = follow_by_query(username, password, query, rpp, lang) except urllib2.HTTPError, e: sys.stderr.write("Cannot connect to Twitter\n") sys.stderr.write(str(e)) sys.stderr.write("\n") else: if newusers: print ", ".join(newusers), 'Added!' |
The usage is as such, assuming the script is named twitsheep.py:
Usage: ./twitsheep.py -u username [-p password] [-r search_result_number] [-l language]
terms ...
-l language = Filter search by language.
-p password = Optional. If not supplied, you will be asked for it.
-r search_result_number = Number of results to pull from twitter searches.
-u username = twitter username.
Running the program without arguments produces the usage as well. It is best to run this with cron or Scheduled Tasks every thirty minutes at most. The default search results to check are ten, but you can turn it up to about 30. If you start getting 400 Errors, a.k.a Bad Request, you are being throttled by twitter's DoS protection. You should consider a lower amount of search results or a longer duration between searches.
You can see an active test of this script here. It is running with this command line:
./twitsheep.py -u twitsheep -r 20 -l en "python -monty -ball"
If you have any features you would like integrated into this, please leave a comment.
