Archive
Tags
android (3)
ant (2)
beautifulsoup (1)
debian (1)
decorators (1)
django (9)
dovecot (1)
encryption (1)
fix (4)
gotcha (2)
hobo (1)
htmlparser (1)
imaplib (2)
java (1)
json (2)
kerberos (2)
linux (7)
lxml (5)
markdown (4)
mechanize (6)
multiprocessing (1)
mysql (2)
nagios (2)
new_features (3)
open_source (5)
optparse (2)
parsing (1)
perl (2)
postgres (1)
preseed (1)
pxe (4)
pyqt4 (1)
python (41)
raid (1)
rails (1)
red_hat (1)
reportlab (4)
request_tracker (2)
rt (2)
ruby (1)
scala (1)
screen_scraping (7)
shell_scripting (8)
soap (1)
solaris (3)
sql (2)
sqlalchemy (2)
tips_and_tricks (1)
twitter (2)
ubuntu (1)
vmware (2)
windows (1)
zimbra (2)

Last night, I got my first non-spam comment in some time. Harshad Joshi isn't a fan of the re and wanted to know if the code from my eariler post could be rewritten to not include it. Revisiting the code, I realize now that using python's regular expressions was a bit of overkill for this little script. So here it is without re. It also includes more resilience, logging, and getopt has been replaced by optparse.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python

import logging
import simplejson
import sys
import time
import traceback
import twitter
import urllib2
from getpass import getpass
from optparse import OptionParser
from urllib import urlencode

MAX_ERRORS = 10

def resilient_apply(func, *args, **kwargs):
    """
    If something goes awry, don't die!  Log it!

    Works just like regular apply.  max_errors keyword arg determines when we
    should quit trying and exit.
    """

    if 'max_errors' in kwargs:
        max_errors = kwargs['max_errors']
        del kwargs['max_errors']
    else:
        max_errors = MAX_ERRORS

    errors = 0
    while errors < max_errors:
        try:
            return apply(func, args, kwargs)
        except KeyboardInterrupt:
            raise SystemExit(1)
        except:
            logging.error("".join(
                traceback.format_exception(*sys.exc_info())))
        errors += 1
    logging.error("Maximum errors (%d) exceeded" % max_errors)
    raise SystemExit(1)

def compile_filter(query):
    good = []
    bad = []
    words = query.split()
    for word in words:
        word = word.lower()
        if word.startswith('-'):
            bad.append(word[1:])
        else:
            good.append(word)
    return (good, bad)

def filter_user_by_bio(user, filter, api=None):
    logging.debug('Looking up %s' % user)
    api = resilient_apply(twitter.Api)
    bio = resilient_apply(api.GetUser, user).GetDescription()
    if bio is None:
        return False # We only follow those with bios

    bio = bio.lower()
    good, bad = filter
    goodmatches = []
    for word in bad:
        if word in bio:
            return False
    for word in good:
        if word in bio:
            goodmatches.append(word)
    if good == goodmatches:
        return True
    return False

def follow_by_query(username, password, q, rpp=None, lang=None):
    filter = compile_filter(q)
    api = resilient_apply(twitter.Api, username=username, password=password)
    friends = [user.GetScreenName()
        for user in resilient_apply(api.GetFriends)]

    goodusers = []
    for user in get_users_from_search(q, rpp, lang):
        if filter_user_by_bio(user, filter, api):
            goodusers.append(user)
    newusers = []
    for user in goodusers:
        if not user in friends:
            logging.debug('Creating friendship %s' % user)
            resilient_apply(api.CreateFriendship, user)
            friends.append(user)
            newusers.append(user)
    return newusers

def get_users_from_search(query, resultnum=None, lang=None):
    q = []
    rpp = 10
    q.append(urlencode({'q': query}))
    if not lang is None:
        q.append(urlencode({'lang': lang}))
    if not resultnum is None:
        rpp = resultnum
    q.append(urlencode({'rpp': rpp}))
    response = resilient_apply(
        urllib2.urlopen,
        'http://search.twitter.com/search.json?',
        '&'.join(q)
    )
    data = simplejson.load(response)
    for result in data['results']:
        yield result['from_user']

def main():
    parser = OptionParser('usage: %prog [options] search terms')
    parser.add_option('-u', '--username', dest='username', default=None)
    parser.add_option('-p', '--password', dest='password', default=None)
    parser.add_option('-r', '--results', dest='rpp', default=None)
    parser.add_option('-l', '--lang', dest='lang', default=None)
    parser.add_option('-f', '--logfile', dest='logfile', default=None)
    parser.add_option('-v', '--logginglevel', dest='level', default='INFO')
    options, args = parser.parse_args()

    if not hasattr(logging, options.level):
        parser.error("level %s is not acceptable" % options.level)

    if options.username is None:
        parser.error("username is required")

    logging_args = {
         'format': '%(asctime)s %(levelname)s %(message)s',
         'level': getattr(logging, options.level),
    }
    if options.logfile is None:
        logging_args['stream'] = sys.stdout
    else:
        logging_args['filename'] = options.logfile
    logging.basicConfig(**logging_args)

    if options.password is None:
        options.password = getpass()

    newusers = follow_by_query(
        options.username,
        options.password,
        " ".join(args),
        options.rpp,
        options.lang,
    )
    if newusers:
        logging.info(", ".join(newusers) + ' Added!')

if __name__ == '__main__':
    main()
Posted by Tyler Lesmann on January 6, 2010 at 23:11
Tagged as: json optparse python twitter

If you are using Python 2.6 or higher, then you should get to know the multiprocessing module as soon as possible. It works around the GIL to give true multiprocessing capabilities to python. Here is an example to show you how to spider sites with several worker processes. Use of the logging module is imperative to debugging these multiprocess programs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python

"""
Spider steam boycott group and tally who followed through and who didn't.
"""

import logging
import urllib2
from cStringIO import StringIO
from multiprocessing import Pool
from lxml.html import parse

def glean_games(url):
    logging.debug('Getting %s' % url)
    doc = parse(urllib2.urlopen(url)).getroot()
    game_elements = doc.cssselect('#mainContents h4')
    return [e.text_content() for e in game_elements]

def glean_users(url=None, html=None):
    if html is None:
        logging.debug('Getting %s' % url)
        doc = parse(urllib2.urlopen(url)).getroot()
    else:
        doc = parse(StringIO(html)).getroot()
    user_links = doc.cssselect(
    'a.linkFriend_offline, a.linkFriend_online, a.linkFriend_in-game')
    return [(link.text_content(), link.attrib['href']) for link in user_links]

def spider(url, pool_size=20):
    logging.debug('Getting %s' % url)
    response = urllib2.urlopen(url)
    html = response.read() # Necessary for mulitprocessing, needs to be pickleable
    group_page = parse(StringIO(html)).getroot()
    page_links = group_page.cssselect('.pageLinks a')
    page_count = page_links[-2].attrib['href'].split('=')[-1]
    urls = ['%s?p=%d' % (url, page) for page in xrange(2, int(page_count) + 1)]

    pool = Pool(pool_size)
    results = []
    results.append(pool.apply_async(glean_users, (), {'html': html}))
    results.extend([pool.apply_async(glean_users, (url,)) for url in urls])

    users = []
    for result in results:
        users.extend(result.get())

    logging.info('Found %d users!' % len(users))

    games = []
    for username, url in users:
        games.append((username, pool.apply_async(glean_games, (url + '/games',))))

    for username, result in games:
        games = result.get()
        yield username, games

def main():
    import sys
    logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
    game = 'Call of Duty: Modern Warfare 2'
    has = []
    has_not = []
    for username, games in spider(
        'http://steamcommunity.com/groups/BOYCOTTMW2/members'):
        if game in games:
            logging.info('%s has %s' % (username, game))
            has.append(username)
        else:
            logging.info('%s has not' % (username))
            has_not.append(username)
    print '%d users have %s and %d do not.' % (len(has), game, len(has_not))

if __name__ == '__main__':
    main()
Posted by Tyler Lesmann on November 12, 2009 at 18:46
Tagged as: lxml multiprocessing python screen_scraping

I have moved the code repository to Google Code. In addition, the latest version respects youtube's URL get values, like hd=1.

Grab a clone like so:

hg clone https://python-markdown-video.googlecode.com/hg/ python-markdown-video

This is only the version compatible with python markdown 2.0. The version for earlier versions of python markdown is now deprecated and will not be maintained.

Posted by Tyler Lesmann on October 20, 2009 at 9:33
Tagged as: markdown open_source python

I saw this blog post yesterday and I was inspired. I forgot that Qt has a nice little browser object, QWebView. I have to say that Siva's example could not be less pythonic though. Siva's primary language is Objective-C and it shows in that code. I've rewritten the whole thing to be pythonic.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
#!/usr/bin/env python

import os
import sys
from PyQt4.QtCore import QUrl, SIGNAL
from PyQt4.QtGui import QApplication
from PyQt4.QtWebKit import QWebPage, QWebView
from urllib2 import urlopen

JQUERY_URL = 'http://jqueryjs.googlecode.com/files/jquery-1.3.2.min.js'
JQUERY_FILE = JQUERY_URL.split('/')[-1]
JQUERY_PATH = os.path.join(os.path.dirname(__file__), JQUERY_FILE)

def get_jquery(jquery_url=JQUERY_URL, jquery_path=JQUERY_PATH):
    """
    Returns jquery source.

    If the source is not available at jquery_path, then we will download it from
    jquery_url.
    """
    if not os.path.exists(jquery_path):
        jquery = urlopen(jquery_url).read()
        f = open(jquery_path, 'w')
        f.write(jquery)
        f.close()
    else:
        f = open(jquery_path)
        jquery = f.read()
        f.close()
    return jquery

class WebPage(QWebPage):
    """
    QWebPage that prints Javascript errors to stderr.
    """
    def javaScriptConsoleMessage(self, message, lineNumber, sourceID):
        sys.stderr.write('Javascript error at line number %d\n' % lineNumber)
        sys.stderr.write('%s\n' % message)
        sys.stderr.write('Source ID: %s\n' % sourceID)

class GoogleSearchBot(QApplication):
    def __init__(self, argv, show_window=True):
        super(GoogleSearchBot, self).__init__(argv)
        self.jquery = get_jquery()
        self.web_view = QWebView()
        self.web_page = WebPage()
        self.web_view.setPage(self.web_page)
        if show_window is True:
            self.web_view.show()
        self.connect(self.web_view, SIGNAL("loadFinished(bool)"),
            self.load_finished)
        self.set_load_function(None)

    def google_search(self, keyword_string):
        self.set_load_function(self.parse_google_search)
        current_frame = self.web_view.page().currentFrame()
        current_frame.evaluateJavaScript(
            r"""
            $("input[title=Google Search]").val("%s");
            $("input[value=Google Search]").parents("form").submit();
            """ % keyword_string
        )

    def load_finished(self, ok):
        current_frame = self.web_page.currentFrame()
        current_frame.evaluateJavaScript(self.jquery)
        self.load_function(*self.load_function_args,
            **self.load_function_kwargs)

    def parse_google_search(self):
        current_frame = self.web_page.currentFrame()
        results = current_frame.evaluateJavaScript(
            r"""
            var results = "";
            $("h3[class=r]").each(function(i) {
                results += $(this).text() + "\n";
            });
            results;
            """
        )
        print('Google search result\n====================')
        for i, result in enumerate(unicode(results.toString(),'utf-8').splitlines()):
            print('%d. %s' % (i + 1, result))
        self.exit()

    def search(self, keyword):
        self.set_load_function(self.google_search, keyword)
        self.web_page.currentFrame().load(QUrl('http://www.google.com/ncr'))

    def set_load_function(self, load_function, *args, **kwargs):
        self.load_function = load_function
        self.load_function_args = args
        self.load_function_kwargs = kwargs

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Usage: %s <keyword>" % sys.argv[0])
        raise SystemExit, 255

    googleSearchBot = GoogleSearchBot(sys.argv)
    googleSearchBot.search(sys.argv[1])
    sys.exit(googleSearchBot.exec_())

So what is the good and bad of using this method for web scraping?

Good

  • Javascript is not a problem anymore! Javascript is usually a pain in the world of web scraping as one must read Javascript and emulate it. This is especially awful with obfuscated Javascript. By using real browser, Javascript becomes a tool instead of a hindrance. AJAX applications become worlds easier to automate.
  • User gets more visual feedback through the browser rendering the page.

Bad

  • Javascript is hard to debug. I'm looking for the equivalent of the Firefox error console in QWebView or its attributes. That would fix this problem. FIXED! Extended QWebPage to add printing of Javascript errors to stderr.
  • QWebView takes a bit more resources than mechanize. Of course, we get page rendering and a Javascript engine.
  • This is not as easily implemented for Windows and OS X as it is for Linux/BSD. This is not a big problem for me, as Fedora has PyQt4 and its prerequisites whenever you install KDE. You may not be so lucky.
Posted by Tyler Lesmann on October 1, 2009 at 6:22 and commented on 7 times
Tagged as: pyqt4 python screen_scraping

Want to have fun? Try migrating an existing web application between different database technologies! With Django and SQLAlchemy, it actually isn't that hard! I used the following procedure to migrate both deathcat.org and this blog to Postgres. I'm assuming your know you to use Postgres and you are doing this as a Postgres superuser. All of this assumes ident authentication for Postgres, but should be easily tweaked for other configurations.

Make a directory in your Django application to store these scripts, like scripts/. Make sure this directory resides at the same level as manage.py. Now, get the code for my SQLAlchemy table copier and put it in a new file called puller.py. Comment out the line that reads table.metadata.create_all(dengine).

Now put this in a file called migrate2pg.sh:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash

database=my_django_db
mysql_user=django_user
mysql_pass=django_passwd
mysql_connection_string="mysql://$mysql_user:$mysql_pass@localhost/$database?charset=utf8"
postgres_connection_string="postgres:///$database"

tables=$(echo 'show tables' | mysql -u $mysql_user -p"$mysql_pass" $database | xargs echo | cut -d ' ' -f 2-)

echo $tables

echo "Dropping old postgres database, if any"
dropdb $database

echo "Creating new database"
createdb $database

echo "Setting up Django schema"
../manage.py syncdb --noinput

echo "Removing initial data"
echo 'DELETE FROM auth_permission' | psql $database
echo 'DELETE FROM django_content_type' | psql $database

echo "Importing data from MySQL"
python puller.py \
    -f $mysql_connection_string \
    -t $postgres_connection_string \
    $tables

echo "Fixing sequences"
for table in $tables
do
    echo Fixing "${table}'s sequence"
    echo "select setval('${table}_id_seq', max(id)) from ${table};" | psql $database
done

Tweak the variables at the top as necessary for your case. Run a bash migrate2pg.sh and read the messages. One error you will see is a during the Fixing sequences phase when the script attempts to fix django_session_id_seq sequence. Ignore this error.

The final part is to give permissions or ownership to the user who will be accessing the data. I'm assuming you can do this, but if you are using Postgres ident authentication and apache, then here's a helpful script for you.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash

database='my_django_db'

echo "Granting apache rights to ${database}"
echo "GRANT ALL ON DATABASE ${database} TO apache;" | psql $database

tables=$(echo '\dt' | psql $database | awk -F '|' '/table/ {print $2}')
sequences=$(echo '\ds' | psql $database | awk -F '|' '/sequence/ {print $2}')

echo "Tables:" $tables
echo

echo "Sequences:" $sequences
echo

tablesql=$(for table in $tables; do echo "ALTER TABLE $table OWNER TO apache;"; done)
seqsql=$(for seq in $sequences; do echo "ALTER TABLE $seq OWNER TO apache;"; done)

echo "Table Alteration SQL:" $tablesql
echo
echo "Sequence Alteration SQL:" $seqsql
echo

echo $tablesql $seqsql | psql $database
Posted by Tyler Lesmann on September 4, 2009 at 16:47 and commented on 3 times
Tagged as: django mysql postgres python sql sqlalchemy