December 2009
November 2009
October 2009
September 2009
June 2009
April 2009
March 2009
February 2009
January 2009
December 2008
November 2008
October 2008
July 2008
June 2008
October 2007
September 2007
Last night, I got my first non-spam comment in some time. Harshad Joshi isn't a fan of the re and wanted to know if the code from my eariler post could be rewritten to not include it. Revisiting the code, I realize now that using python's regular expressions was a bit of overkill for this little script. So here it is without re. It also includes more resilience, logging, and getopt has been replaced by optparse.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | #!/usr/bin/env python import logging import simplejson import sys import time import traceback import twitter import urllib2 from getpass import getpass from optparse import OptionParser from urllib import urlencode MAX_ERRORS = 10 def resilient_apply(func, *args, **kwargs): """ If something goes awry, don't die! Log it! Works just like regular apply. max_errors keyword arg determines when we should quit trying and exit. """ if 'max_errors' in kwargs: max_errors = kwargs['max_errors'] del kwargs['max_errors'] else: max_errors = MAX_ERRORS errors = 0 while errors < max_errors: try: return apply(func, args, kwargs) except KeyboardInterrupt: raise SystemExit(1) except: logging.error("".join( traceback.format_exception(*sys.exc_info()))) errors += 1 logging.error("Maximum errors (%d) exceeded" % max_errors) raise SystemExit(1) def compile_filter(query): good = [] bad = [] words = query.split() for word in words: word = word.lower() if word.startswith('-'): bad.append(word[1:]) else: good.append(word) return (good, bad) def filter_user_by_bio(user, filter, api=None): logging.debug('Looking up %s' % user) api = resilient_apply(twitter.Api) bio = resilient_apply(api.GetUser, user).GetDescription() if bio is None: return False # We only follow those with bios bio = bio.lower() good, bad = filter goodmatches = [] for word in bad: if word in bio: return False for word in good: if word in bio: goodmatches.append(word) if good == goodmatches: return True return False def follow_by_query(username, password, q, rpp=None, lang=None): filter = compile_filter(q) api = resilient_apply(twitter.Api, username=username, password=password) friends = [user.GetScreenName() for user in resilient_apply(api.GetFriends)] goodusers = [] for user in get_users_from_search(q, rpp, lang): if filter_user_by_bio(user, filter, api): goodusers.append(user) newusers = [] for user in goodusers: if not user in friends: logging.debug('Creating friendship %s' % user) resilient_apply(api.CreateFriendship, user) friends.append(user) newusers.append(user) return newusers def get_users_from_search(query, resultnum=None, lang=None): q = [] rpp = 10 q.append(urlencode({'q': query})) if not lang is None: q.append(urlencode({'lang': lang})) if not resultnum is None: rpp = resultnum q.append(urlencode({'rpp': rpp})) response = resilient_apply( urllib2.urlopen, 'http://search.twitter.com/search.json?', '&'.join(q) ) data = simplejson.load(response) for result in data['results']: yield result['from_user'] def main(): parser = OptionParser('usage: %prog [options] search terms') parser.add_option('-u', '--username', dest='username', default=None) parser.add_option('-p', '--password', dest='password', default=None) parser.add_option('-r', '--results', dest='rpp', default=None) parser.add_option('-l', '--lang', dest='lang', default=None) parser.add_option('-f', '--logfile', dest='logfile', default=None) parser.add_option('-v', '--logginglevel', dest='level', default='INFO') options, args = parser.parse_args() if not hasattr(logging, options.level): parser.error("level %s is not acceptable" % options.level) if options.username is None: parser.error("username is required") logging_args = { 'format': '%(asctime)s %(levelname)s %(message)s', 'level': getattr(logging, options.level), } if options.logfile is None: logging_args['stream'] = sys.stdout else: logging_args['filename'] = options.logfile logging.basicConfig(**logging_args) if options.password is None: options.password = getpass() newusers = follow_by_query( options.username, options.password, " ".join(args), options.rpp, options.lang, ) if newusers: logging.info(", ".join(newusers) + ' Added!') if __name__ == '__main__': main() |
If you are using Python 2.6 or higher, then you should get to know the multiprocessing module as soon as possible. It works around the GIL to give true multiprocessing capabilities to python. Here is an example to show you how to spider sites with several worker processes. Use of the logging module is imperative to debugging these multiprocess programs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | #!/usr/bin/env python """ Spider steam boycott group and tally who followed through and who didn't. """ import logging import urllib2 from cStringIO import StringIO from multiprocessing import Pool from lxml.html import parse def glean_games(url): logging.debug('Getting %s' % url) doc = parse(urllib2.urlopen(url)).getroot() game_elements = doc.cssselect('#mainContents h4') return [e.text_content() for e in game_elements] def glean_users(url=None, html=None): if html is None: logging.debug('Getting %s' % url) doc = parse(urllib2.urlopen(url)).getroot() else: doc = parse(StringIO(html)).getroot() user_links = doc.cssselect( 'a.linkFriend_offline, a.linkFriend_online, a.linkFriend_in-game') return [(link.text_content(), link.attrib['href']) for link in user_links] def spider(url, pool_size=20): logging.debug('Getting %s' % url) response = urllib2.urlopen(url) html = response.read() # Necessary for mulitprocessing, needs to be pickleable group_page = parse(StringIO(html)).getroot() page_links = group_page.cssselect('.pageLinks a') page_count = page_links[-2].attrib['href'].split('=')[-1] urls = ['%s?p=%d' % (url, page) for page in xrange(2, int(page_count) + 1)] pool = Pool(pool_size) results = [] results.append(pool.apply_async(glean_users, (), {'html': html})) results.extend([pool.apply_async(glean_users, (url,)) for url in urls]) users = [] for result in results: users.extend(result.get()) logging.info('Found %d users!' % len(users)) games = [] for username, url in users: games.append((username, pool.apply_async(glean_games, (url + '/games',)))) for username, result in games: games = result.get() yield username, games def main(): import sys logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) game = 'Call of Duty: Modern Warfare 2' has = [] has_not = [] for username, games in spider( 'http://steamcommunity.com/groups/BOYCOTTMW2/members'): if game in games: logging.info('%s has %s' % (username, game)) has.append(username) else: logging.info('%s has not' % (username)) has_not.append(username) print '%d users have %s and %d do not.' % (len(has), game, len(has_not)) if __name__ == '__main__': main() |
I have moved the code repository to Google Code. In addition, the latest version respects youtube's URL get values, like hd=1.
Grab a clone like so:
hg clone https://python-markdown-video.googlecode.com/hg/ python-markdown-video
This is only the version compatible with python markdown 2.0. The version for earlier versions of python markdown is now deprecated and will not be maintained.
I saw this blog post yesterday and I was inspired. I forgot that Qt has a nice little browser object, QWebView. I have to say that Siva's example could not be less pythonic though. Siva's primary language is Objective-C and it shows in that code. I've rewritten the whole thing to be pythonic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | #!/usr/bin/env python import os import sys from PyQt4.QtCore import QUrl, SIGNAL from PyQt4.QtGui import QApplication from PyQt4.QtWebKit import QWebPage, QWebView from urllib2 import urlopen JQUERY_URL = 'http://jqueryjs.googlecode.com/files/jquery-1.3.2.min.js' JQUERY_FILE = JQUERY_URL.split('/')[-1] JQUERY_PATH = os.path.join(os.path.dirname(__file__), JQUERY_FILE) def get_jquery(jquery_url=JQUERY_URL, jquery_path=JQUERY_PATH): """ Returns jquery source. If the source is not available at jquery_path, then we will download it from jquery_url. """ if not os.path.exists(jquery_path): jquery = urlopen(jquery_url).read() f = open(jquery_path, 'w') f.write(jquery) f.close() else: f = open(jquery_path) jquery = f.read() f.close() return jquery class WebPage(QWebPage): """ QWebPage that prints Javascript errors to stderr. """ def javaScriptConsoleMessage(self, message, lineNumber, sourceID): sys.stderr.write('Javascript error at line number %d\n' % lineNumber) sys.stderr.write('%s\n' % message) sys.stderr.write('Source ID: %s\n' % sourceID) class GoogleSearchBot(QApplication): def __init__(self, argv, show_window=True): super(GoogleSearchBot, self).__init__(argv) self.jquery = get_jquery() self.web_view = QWebView() self.web_page = WebPage() self.web_view.setPage(self.web_page) if show_window is True: self.web_view.show() self.connect(self.web_view, SIGNAL("loadFinished(bool)"), self.load_finished) self.set_load_function(None) def google_search(self, keyword_string): self.set_load_function(self.parse_google_search) current_frame = self.web_view.page().currentFrame() current_frame.evaluateJavaScript( r""" $("input[title=Google Search]").val("%s"); $("input[value=Google Search]").parents("form").submit(); """ % keyword_string ) def load_finished(self, ok): current_frame = self.web_page.currentFrame() current_frame.evaluateJavaScript(self.jquery) self.load_function(*self.load_function_args, **self.load_function_kwargs) def parse_google_search(self): current_frame = self.web_page.currentFrame() results = current_frame.evaluateJavaScript( r""" var results = ""; $("h3[class=r]").each(function(i) { results += $(this).text() + "\n"; }); results; """ ) print('Google search result\n====================') for i, result in enumerate(unicode(results.toString(),'utf-8').splitlines()): print('%d. %s' % (i + 1, result)) self.exit() def search(self, keyword): self.set_load_function(self.google_search, keyword) self.web_page.currentFrame().load(QUrl('http://www.google.com/ncr')) def set_load_function(self, load_function, *args, **kwargs): self.load_function = load_function self.load_function_args = args self.load_function_kwargs = kwargs if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: %s <keyword>" % sys.argv[0]) raise SystemExit, 255 googleSearchBot = GoogleSearchBot(sys.argv) googleSearchBot.search(sys.argv[1]) sys.exit(googleSearchBot.exec_()) |
So what is the good and bad of using this method for web scraping?
Good
- Javascript is not a problem anymore! Javascript is usually a pain in the world of web scraping as one must read Javascript and emulate it. This is especially awful with obfuscated Javascript. By using real browser, Javascript becomes a tool instead of a hindrance. AJAX applications become worlds easier to automate.
- User gets more visual feedback through the browser rendering the page.
Bad
- Javascript is hard to debug. I'm looking for the equivalent of the Firefox error console in QWebView or its attributes. That would fix this problem. FIXED! Extended QWebPage to add printing of Javascript errors to stderr.
- QWebView takes a bit more resources than mechanize. Of course, we get page rendering and a Javascript engine.
- This is not as easily implemented for Windows and OS X as it is for Linux/BSD. This is not a big problem for me, as Fedora has PyQt4 and its prerequisites whenever you install KDE. You may not be so lucky.
Want to have fun? Try migrating an existing web application between different database technologies! With Django and SQLAlchemy, it actually isn't that hard! I used the following procedure to migrate both deathcat.org and this blog to Postgres. I'm assuming your know you to use Postgres and you are doing this as a Postgres superuser. All of this assumes ident authentication for Postgres, but should be easily tweaked for other configurations.
Make a directory in your Django application to store these scripts, like scripts/. Make sure this directory resides at the same level as manage.py. Now, get the code for my SQLAlchemy table copier and put it in a new file called puller.py. Comment out the line that reads table.metadata.create_all(dengine).
Now put this in a file called migrate2pg.sh:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | #!/bin/bash database=my_django_db mysql_user=django_user mysql_pass=django_passwd mysql_connection_string="mysql://$mysql_user:$mysql_pass@localhost/$database?charset=utf8" postgres_connection_string="postgres:///$database" tables=$(echo 'show tables' | mysql -u $mysql_user -p"$mysql_pass" $database | xargs echo | cut -d ' ' -f 2-) echo $tables echo "Dropping old postgres database, if any" dropdb $database echo "Creating new database" createdb $database echo "Setting up Django schema" ../manage.py syncdb --noinput echo "Removing initial data" echo 'DELETE FROM auth_permission' | psql $database echo 'DELETE FROM django_content_type' | psql $database echo "Importing data from MySQL" python puller.py \ -f $mysql_connection_string \ -t $postgres_connection_string \ $tables echo "Fixing sequences" for table in $tables do echo Fixing "${table}'s sequence" echo "select setval('${table}_id_seq', max(id)) from ${table};" | psql $database done |
Tweak the variables at the top as necessary for your case. Run a bash migrate2pg.sh and read the messages. One error you will see is a during the Fixing sequences phase when the script attempts to fix django_session_id_seq sequence. Ignore this error.
The final part is to give permissions or ownership to the user who will be accessing the data. I'm assuming you can do this, but if you are using Postgres ident authentication and apache, then here's a helpful script for you.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | #!/bin/bash database='my_django_db' echo "Granting apache rights to ${database}" echo "GRANT ALL ON DATABASE ${database} TO apache;" | psql $database tables=$(echo '\dt' | psql $database | awk -F '|' '/table/ {print $2}') sequences=$(echo '\ds' | psql $database | awk -F '|' '/sequence/ {print $2}') echo "Tables:" $tables echo echo "Sequences:" $sequences echo tablesql=$(for table in $tables; do echo "ALTER TABLE $table OWNER TO apache;"; done) seqsql=$(for seq in $sequences; do echo "ALTER TABLE $seq OWNER TO apache;"; done) echo "Table Alteration SQL:" $tablesql echo echo "Sequence Alteration SQL:" $seqsql echo echo $tablesql $seqsql | psql $database |
