SQLAlchemy is one of the best pieces of software I have had the pleasure to use. We needed an in-office mirror of a few tables on our Oracle server for Crystal Reports. We did not need full replication, so I decided to write something to make table copying easy. SQLAlchemy made this easy and fun.

This uses the latest version of SQLAlchemy at this time, 5.3. SQLAlchemy only abstracts the database interaction. You will still need to install the database driver, like MySQLdb and cx_Oracle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python

import getopt
import sys
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

def make_session(connection_string):
    engine = create_engine(connection_string, echo=False, convert_unicode=True)
    Session = sessionmaker(bind=engine)
    return Session(), engine

def pull_data(from_db, to_db, tables):
    source, sengine = make_session(from_db)
    smeta = MetaData(bind=sengine)
    destination, dengine = make_session(to_db)

    for table_name in tables:
        print 'Processing', table_name
        print 'Pulling schema from source server'
        table = Table(table_name, smeta, autoload=True)
        print 'Creating table on destination server'
        table.metadata.create_all(dengine)
        NewRecord = quick_mapper(table)
        columns = table.columns.keys()
        print 'Transferring records'
        for record in source.query(table).all():
            data = dict(
                [(str(column), getattr(record, column)) for column in columns]
            )
            destination.merge(NewRecord(**data))
    print 'Committing changes'
    destination.commit()

def print_usage():
    print """
Usage: %s -f source_server -t destination_server table [table ...]
    -f, -t = driver://user[:password]@host[:port]/database

Example: %s -f oracle://someuser:PaSsWd@db1/TSH1 \\
    -t mysql://root@db2:3307/reporting table_one table_two
    """ % (sys.argv[0], sys.argv[0])

def quick_mapper(table):
    Base = declarative_base()
    class GenericMapper(Base):
        __table__ = table
    return GenericMapper

if __name__ == '__main__':
    optlist, tables = getopt.getopt(sys.argv[1:], 'f:t:')

    options = dict(optlist)
    if '-f' not in options or '-t' not in options or not tables:
        print_usage()
        raise SystemExit, 1

    pull_data(
        options['-f'],
        options['-t'],
        tables,
    )

The main event is in pull_data. I create several objects to access both sides. My make_session function creates two of these, a Session, which I use for table transactions, and an Engine, which I use for database transactions. The MetaData is used to pull table schema here.

For every table, I pull the schema of the source table, Table(table_name, smeta, autoload=True), and create that table in destination database, table.metadata.create_all(dengine). The coolest thing about this is that SQLAlchemy does all of the hard work. Oracle fields become MySQL fields, or whatever is appropriate. If the table exists, nothing is done.

The destination system is ready for data now. My quick_mapper spits out ORM for a given table, which makes new records a breeze. I loop through every record from the source, create a dictionary of the column names and data, and make new records on the destination server, using the dictionary as keyword arguments. One point to note is that merge will update records that have the same primary key as the new record.

At the end, I commit the changes. If anything goes wrong, nothing will be written. I have added command-line functionality as well, works like so assuming the script is called puller.py:

Usage: puller.py -f source_server -t destination_server table [table ...]
    -f, -t = driver://user[:password]@host[:port]/database

Example: puller.py -f oracle://someuser:PaSsWd@db1/TSH1 \
    -t mysql://root@db2:3307/reporting table_one table_two
Posted by Tyler Lesmann on April 27, 2009 at 16:28
Tagged as: python sqlalchemy
Comments
#1 Andrei wrote this 4 years, 11 months ago

Hi, i tried your script to copy data from mysql to sqlite, and it didn't work for unicode strings. (they all became ???? in sqlite database)

I tried to set encoding in

engine = create_engine(connection_string, encoding='utf-8', echo=False)

But it didn't help.

#2 Tyler Lesmann wrote this 4 years, 11 months ago

Let me do some tests to try recreate your problem. What is the encoding of the MySQL table you are copying? You can check with this command in the mysql client:

SHOW CREATE TABLE database_name.table_name;

#3 Tyler Lesmann wrote this 4 years, 11 months ago

I think I have a solution for you. Add the keyword argument convert_unicode=True to the create_engine on line 10.

I've updated the code to include this.

#4 Andrei wrote this 4 years, 11 months ago

I tried this as well, but it throws an error:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x97 in position 23: unexpected code byte

File "copydb.py", line 28, in pull_data
for record in source.query(table).all():

#5 Andrei wrote this 4 years, 11 months ago

MySQL charset is UTF-8 Unicode (utf8)

#6 Tyler Lesmann wrote this 4 years, 11 months ago

That is bizarre. Python is telling us the data that should be utf8 is contains a portion that is not utf8.

Try explicitly converting your table to utf8. This will convert the data as well. Make a backup with mysqldump before doing this!

ALTER TABLE table_name CONVERT TO CHARACTER SET utf8 COLLATE utf8_general_ci;

#7 Andrei wrote this 4 years, 11 months ago

phpMyAdmin says
MySQL connection collation is utf8_general_ci
and all tables in database are utf8_general_ci as well

#8 Andrei wrote this 4 years, 11 months ago

Never mind. I used shell script from

http://www.sqlite.org/cvstrac/wiki?p=ConverterTools

#9 Tyler Lesmann wrote this 4 years, 8 months ago

If anyone has a problem with unicode and mysql, like Andrei, put ?charset=utf8 at the end of the source URI. Something like this:

mysql://root@db2:3307/reporting?charset=utf8

Details found here:
http://www.sqlalchemy.org/docs/05/reference/dialects/mysql.html#character-sets

#10 Seemant wrote this 3 years, 8 months ago

Hi Tyler,

I'm getting a strange error with this (using sqlalchemy 0.6.3): http://dpaste.com/222109/

Would love some feedback on where I'm going wrong.

#11 Tyler Lesmann wrote this 3 years, 8 months ago

This looks like a bug in SQLAlchemy. It should be using timestamp or similar for the type here.

#12 jon wrote this 3 years, 4 months ago

I would like to use this script to copy my tables between databases (oracle to sqlite)

but get the following error..

raise sa_exc.ArgumentError("Mapper %s could not assemble any primary key columns for mapped table '%s'" % (self, self.mapped_table.description))
sqlalchemy.exc.ArgumentError: Mapper Mapper|GenericMapper|coils could not assemble any primary key columns for mapped table 'coils'

Thanks for any help..

#13 anang wrote this 3 years, 3 months ago

maybe this is your answer : http://comments.gmane.org/gmane.comp.python.sqlalchemy.user/24055

the 'coils' table don't have a primary key column

#14 bhavik wrote this 2 years, 9 months ago

Hi Tyler,
I would like to copy data from mysql to sqlite. But I don't want to have all content from the source table, means I'm looking for solution to fetch particular columns and particular rows which matches the condition.
Could you please help me to implement this?

#15 bhavik wrote this 2 years, 9 months ago

Hi Tyler,
I googled and I end up below code, it is somehow working as per my requirement. Let me know comments ...

================================================================================
import getopt
import sys
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import backref, mapper, relation, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

def make_session(connection_string):
engine = create_engine(connection_string, echo=True, convert_unicode=True)
Session = sessionmaker(bind=engine)
return Session(), engine

def pull_data(from_db, to_db, tables):
source, sengine = make_session(from_db)
smeta = MetaData(bind=sengine)
destination, dengine = make_session(to_db)
Base = declarative_base()

for table_name in tables:
print 'Processing', table_name
print 'Pulling schema from source server'
columns = {
'id' : Column('id', Integer, primary_key=True),
'name' : Column('name', String(45)),
'description' : Column('description', String(100))
}
mydict = {
'__tablename__' : table_name,
'__mapper__' : smeta
}
mydict.update(columns)
NewRecord = type(table_name,(Base,),mydict)
print 'Creating table on destination server'
NewRecord.metadata.create_all(dengine)
for record in source.query(NewRecord).filter_by(name='test').all():
data = dict(
[(str(column), getattr(record, column)) for column in columns.keys()]
)
destination.merge(NewRecord(**data))
print 'Committing changes'
destination.commit()

def print_usage():
print """
Usage: %s -f source_server -t destination_server table [table ...]
-f, -t = driver://user[:password]@host[:port]/database

Example: %s -f oracle://someuser:PaSsWd@db1/TSH1 \\
-t mysql://root@db2:3307/reporting table_one table_two
""" % (sys.argv[0], sys.argv[0])

if __name__ == '__main__':
optlist, tables = getopt.getopt(sys.argv[1:], 'f:t:')

options = dict(optlist)
if '-f' not in options or '-t' not in options or not tables:
print_usage()
raise SystemExit, 1

pull_data(
options['-f'],
options['-t'],
tables,
)
================================================================================

#16 Jerry wrote this 2 years, 3 months ago

Hi,

About this statement:
for record in source.query(table).all():

If the table has really big dataset, will it explode the memory?

If I change it to:
for record in source.query(table):

Do I need to commit the session every time a new record is merged? Otherwise the memory will explode, too?

Thanks.

#17 Tyler Lesmann wrote this 2 years, 3 months ago

You are correct. The all() method returns a list of all results. It's a generator the way you've rewritten it, which is the smarter way.

As far as merging, it might. I honestly don't know. Best to test.

#18 Paul Filo wrote this 2 years, 3 months ago

Optimization to avoid object creation for big datasets :

def pull_data(from_db, to_db, tables, paquet=1000):
source, sengine = make_session(from_db)
dest, dengine = make_session(to_db)
meta = MetaData()
for table_name in tables:
print 'Processing', table_name
print 'Pulling schema from source server'
table = Table(table_name, meta, autoload=True, autoload_with=sengine)
print 'Creating table on destination server'
table.create(dengine)
ins = table.insert()
s = select([table])
result = source.execute(s)
print 'Transferring records'
while True :
l = result.fetchmany(paquet)
if len(l) == 0 :
break
dest.execute(ins, l)
print 'Committing changes'
dest.commit()

Post a comment