Archive
Tags
android (3)
ant (2)
beautifulsoup (1)
debian (1)
decorators (1)
django (9)
dovecot (1)
encryption (1)
fix (4)
gotcha (2)
hobo (1)
htmlparser (1)
imaplib (2)
java (1)
json (2)
kerberos (2)
linux (7)
lxml (5)
markdown (4)
mechanize (6)
multiprocessing (1)
mysql (2)
nagios (2)
new_features (3)
open_source (5)
optparse (2)
parsing (1)
perl (2)
postgres (1)
preseed (1)
pxe (4)
pyqt4 (1)
python (41)
raid (1)
rails (1)
red_hat (1)
reportlab (4)
request_tracker (2)
rt (2)
ruby (1)
scala (1)
screen_scraping (7)
shell_scripting (8)
soap (1)
solaris (3)
sql (2)
sqlalchemy (2)
tips_and_tricks (1)
twitter (2)
ubuntu (1)
vmware (2)
windows (1)
zimbra (2)

Last night, I got my first non-spam comment in some time. Harshad Joshi isn't a fan of the re and wanted to know if the code from my eariler post could be rewritten to not include it. Revisiting the code, I realize now that using python's regular expressions was a bit of overkill for this little script. So here it is without re. It also includes more resilience, logging, and getopt has been replaced by optparse.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python

import logging
import simplejson
import sys
import time
import traceback
import twitter
import urllib2
from getpass import getpass
from optparse import OptionParser
from urllib import urlencode

MAX_ERRORS = 10

def resilient_apply(func, *args, **kwargs):
    """
    If something goes awry, don't die!  Log it!

    Works just like regular apply.  max_errors keyword arg determines when we
    should quit trying and exit.
    """

    if 'max_errors' in kwargs:
        max_errors = kwargs['max_errors']
        del kwargs['max_errors']
    else:
        max_errors = MAX_ERRORS

    errors = 0
    while errors < max_errors:
        try:
            return apply(func, args, kwargs)
        except KeyboardInterrupt:
            raise SystemExit(1)
        except:
            logging.error("".join(
                traceback.format_exception(*sys.exc_info())))
        errors += 1
    logging.error("Maximum errors (%d) exceeded" % max_errors)
    raise SystemExit(1)

def compile_filter(query):
    good = []
    bad = []
    words = query.split()
    for word in words:
        word = word.lower()
        if word.startswith('-'):
            bad.append(word[1:])
        else:
            good.append(word)
    return (good, bad)

def filter_user_by_bio(user, filter, api=None):
    logging.debug('Looking up %s' % user)
    api = resilient_apply(twitter.Api)
    bio = resilient_apply(api.GetUser, user).GetDescription()
    if bio is None:
        return False # We only follow those with bios

    bio = bio.lower()
    good, bad = filter
    goodmatches = []
    for word in bad:
        if word in bio:
            return False
    for word in good:
        if word in bio:
            goodmatches.append(word)
    if good == goodmatches:
        return True
    return False

def follow_by_query(username, password, q, rpp=None, lang=None):
    filter = compile_filter(q)
    api = resilient_apply(twitter.Api, username=username, password=password)
    friends = [user.GetScreenName()
        for user in resilient_apply(api.GetFriends)]

    goodusers = []
    for user in get_users_from_search(q, rpp, lang):
        if filter_user_by_bio(user, filter, api):
            goodusers.append(user)
    newusers = []
    for user in goodusers:
        if not user in friends:
            logging.debug('Creating friendship %s' % user)
            resilient_apply(api.CreateFriendship, user)
            friends.append(user)
            newusers.append(user)
    return newusers

def get_users_from_search(query, resultnum=None, lang=None):
    q = []
    rpp = 10
    q.append(urlencode({'q': query}))
    if not lang is None:
        q.append(urlencode({'lang': lang}))
    if not resultnum is None:
        rpp = resultnum
    q.append(urlencode({'rpp': rpp}))
    response = resilient_apply(
        urllib2.urlopen,
        'http://search.twitter.com/search.json?',
        '&'.join(q)
    )
    data = simplejson.load(response)
    for result in data['results']:
        yield result['from_user']

def main():
    parser = OptionParser('usage: %prog [options] search terms')
    parser.add_option('-u', '--username', dest='username', default=None)
    parser.add_option('-p', '--password', dest='password', default=None)
    parser.add_option('-r', '--results', dest='rpp', default=None)
    parser.add_option('-l', '--lang', dest='lang', default=None)
    parser.add_option('-f', '--logfile', dest='logfile', default=None)
    parser.add_option('-v', '--logginglevel', dest='level', default='INFO')
    options, args = parser.parse_args()

    if not hasattr(logging, options.level):
        parser.error("level %s is not acceptable" % options.level)

    if options.username is None:
        parser.error("username is required")

    logging_args = {
         'format': '%(asctime)s %(levelname)s %(message)s',
         'level': getattr(logging, options.level),
    }
    if options.logfile is None:
        logging_args['stream'] = sys.stdout
    else:
        logging_args['filename'] = options.logfile
    logging.basicConfig(**logging_args)

    if options.password is None:
        options.password = getpass()

    newusers = follow_by_query(
        options.username,
        options.password,
        " ".join(args),
        options.rpp,
        options.lang,
    )
    if newusers:
        logging.info(", ".join(newusers) + ' Added!')

if __name__ == '__main__':
    main()
Posted by Tyler Lesmann on January 6, 2010 at 23:11
Tagged as: json optparse python twitter

Inspired by Amy Iris, I have made a little bit of automation for twitter. On twitter, it is not easy to find others by interest. This little piece of code runs a search on the terms you specify and then checks the bios of each poster for your search terms. With each user that is a match, it will add them as a follower for you.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python

import getopt
import re
import simplejson
import sys
import time
import twitter
import urllib2
from getpass import getpass
from urllib import urlencode

def compile_filter(query):
    good = []
    bad = []
    words = query.split()
    for word in words:
        if word[0] == '-':
            bad.append(re.compile(word, re.IGNORECASE))
        else:
            good.append(re.compile(word, re.IGNORECASE))
    return (good, bad)

def filter_user_by_bio(user, filter, api=None):
    if api is None:
        api = twitter.Api()
    bio = api.GetUser(user).GetDescription()
    if bio is None:
        return False # We only follow those with bios
    good, bad = filter
    goodmatches = []
    for word in bad:
        if not word.search(bio) is None:
            return False
    for word in good:
        if not word.search(bio) is None:
            goodmatches.append(word)
    if good == goodmatches:
        return True
    return False

def follow_by_query(username, password, q, rpp=None, lang=None):
    filter = compile_filter(q)
    api = twitter.Api(username=username, password=password)
    friends = []
    for user in api.GetFriends():
        friends.append(user.GetScreenName())
    goodusers = []
    for user in get_users_from_search(q, rpp, lang):
        if filter_user_by_bio(user, filter, api):
            goodusers.append(user)
    newusers = []
    for user in goodusers:
        if not user in friends:
            api.CreateFriendship(user)
            friends.append(user)
            newusers.append(user)
    return newusers

def get_users_from_search(query, resultnum=None, lang=None):
    q = []
    rpp = 10
    q.append(urlencode({'q': query}))
    if not lang is None:
        q.append(urlencode({'lang': lang}))
    if not resultnum is None:
        rpp = resultnum
    q.append(urlencode({'rpp': rpp}))
    response = urllib2.urlopen(
        'http://search.twitter.com/search.json?',
        '&'.join(q)
    )
    data = simplejson.load(response)
    for result in data['results']:
        yield result['from_user'] 

def print_usage():
    sys.stderr.write("""
    Usage: %s -u username [-p password] [-r search_result_number] [-l language]
        terms ...

    -l language = Filter search by language.
    -p password = Optional.  If not supplied, you will be asked for it.
    -r search_result_number = Number of results to pull from twitter searches.
    -u username = twitter username.
""" % sys.argv[0])

if __name__ == '__main__':
    optlist, args = getopt.getopt(sys.argv[1:], 'l:p:r:u:')
    if not args:
        sys.stderr.write("You must specify search terms\n")
        print_usage()
        raise SystemExit, 1
    optd = dict(optlist)
    if not '-u' in optd:
        sys.stderr.write("You must specify a user\n")
        print_usage()
        raise SystemExit, 1
    username = optd['-u']
    query = " ".join(args)

    if not '-p' in optd:
        sys.stderr.write("Password:")
        password = getpass("")
    else:
        password = optd['-p']

    rpp = None
    if '-r' in optd:
        rpp = optd['-r']

    lang = None
    if '-l' in optd:
        lang = optd['-l']


    try:
        newusers = follow_by_query(username, password, query, rpp, lang)
    except urllib2.HTTPError, e:
        sys.stderr.write("Cannot connect to Twitter\n")
        sys.stderr.write(str(e))
        sys.stderr.write("\n")
    else:
        if newusers:
            print ", ".join(newusers), 'Added!'

The usage is as such, assuming the script is named twitsheep.py:

Usage: ./twitsheep.py -u username [-p password] [-r search_result_number] [-l language]
    terms ...

-l language = Filter search by language.
-p password = Optional.  If not supplied, you will be asked for it.
-r search_result_number = Number of results to pull from twitter searches.
-u username = twitter username.

Running the program without arguments produces the usage as well. It is best to run this with cron or Scheduled Tasks every thirty minutes at most. The default search results to check are ten, but you can turn it up to about 30. If you start getting 400 Errors, a.k.a Bad Request, you are being throttled by twitter's DoS protection. You should consider a lower amount of search results or a longer duration between searches.

You can see an active test of this script here. It is running with this command line:

./twitsheep.py -u twitsheep -r 20 -l en "python -monty -ball"

If you have any features you would like integrated into this, please leave a comment.

Posted by Tyler Lesmann on January 22, 2009 at 6:09 and commented on 10 times
Tagged as: json python twitter