python query - Inserimento di massa con SQLAlchemy ORM




flask tutorial (9)

Esiste un modo per far sì che SQLAlchemy esegua un inserimento bulk piuttosto che inserire ogni singolo oggetto. vale a dire,

fare:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

piuttosto che:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Ho appena convertito del codice per usare sqlalchemy piuttosto che raw sql e anche se ora è molto più bello lavorare con esso sembra essere più lento ora (fino a un fattore di 10), mi chiedo se questo è il motivo.

Potrei essere in grado di migliorare la situazione utilizzando le sessioni in modo più efficiente. Al momento ho autoCommit=False e faccio un session.commit() dopo che ho aggiunto qualcosa. Anche se sembra che i dati diventino obsoleti se il DB viene modificato altrove, come se facessi una nuova query, ottenevo ancora vecchi risultati?

Grazie per l'aiuto!


Answers

Tutte le strade portano a Roma , ma alcuni di loro attraversano montagne, richiedono traghetti ma se si vuole arrivare in fretta basta prendere l'autostrada.

In questo caso l'autostrada utilizza la funzione execute_batch() di psycopg2 . La documentazione dice che è il migliore:

L'attuale implementazione di executemany() è (usando un understatement estremamente caritatevole) non particolarmente performante. Queste funzioni possono essere utilizzate per accelerare l'esecuzione ripetuta di una dichiarazione rispetto a un insieme di parametri. Riducendo il numero di roundtrip del server, le prestazioni possono essere di ordine di grandezza migliori rispetto all'utilizzo di executemany() .

Nel mio test execute_batch() è approssimativamente due volte più veloce di executemany() e offre l'opzione per configurare il page_size per ulteriori tweaking (se si vuole spremere l'ultimo 2-3% delle prestazioni dal driver).

La stessa funzione può essere facilmente abilitata se si utilizza SQLAlchemy impostando use_batch_mode=True come parametro quando si crea un'istanza del motore con create_engine()


Per quanto ne so, non c'è modo di ottenere l'ORM per emettere inserti di massa. Credo che la ragione di fondo sia che SQLAlchemy ha bisogno di tenere traccia dell'identità di ogni oggetto (ad esempio, nuove chiavi primarie) e gli inserimenti di massa interferiscono con quello. Ad esempio, supponendo che la tua tabella foo contenga una colonna id e sia mappata su una classe Foo :

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Poiché SQLAlchemy ha rilevato il valore per x.id senza emettere un'altra query, possiamo dedurre che ha ottenuto il valore direttamente x.id INSERT . Se non hai bisogno dell'accesso successivo agli oggetti creati tramite le stesse istanze, puoi saltare il livello ORM per il tuo inserto:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy non può abbinare queste nuove righe con nessun oggetto esistente, quindi dovrai interrogarli di nuovo su ogni operazione successiva.

Per quanto riguarda i dati obsoleti, è utile ricordare che la sessione non ha un modo integrato per sapere quando il database viene modificato al di fuori della sessione. Per accedere a dati modificati esternamente tramite istanze esistenti, le istanze devono essere contrassegnate come scadute . Ciò accade di default su session.commit() , ma può essere fatto manualmente chiamando session.expire_all() o session.expire(instance) . Un esempio (omesso SQL):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit() scade x , quindi la prima istruzione print apre implicitamente una nuova transazione e richiede nuovamente gli attributi di x . Se si commenta la prima dichiarazione di stampa, si noterà che il secondo raccoglie il valore corretto, poiché la nuova query non viene emessa fino a dopo l'aggiornamento.

Questo ha senso dal punto di vista dell'isolamento transazionale: dovresti solo rilevare le modifiche esterne tra le transazioni. Se ciò ti causa problemi, ti suggerirei di chiarire o ripensare i limiti delle transazioni della tua applicazione invece di raggiungere immediatamente session.expire_all() .


La migliore risposta che ho trovato finora è stata nella documentazione di sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Esiste un esempio completo di un punto di riferimento di possibili soluzioni.

Come mostrato nella documentazione:

bulk_save_objects non è la soluzione migliore ma le prestazioni sono corrette.

La seconda migliore implementazione in termini di leggibilità penso sia stata con SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Il contesto di questa funzione è indicato nell'articolo della documentazione.


La risposta di Piere è corretta, ma un problema è che bulk_save_objects di default non restituisce le chiavi primarie degli oggetti, se questo ti preoccupa. Impostare return_defaults su True per ottenere questo comportamento.

La documentazione è here .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()

Questo è un modo:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Questo verrà inserito in questo modo:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Riferimento: le domande FAQ SQLAlchemy comprendono benchmark per vari metodi di commit.


SQLAlchemy ha introdotto quello nella versione 1.0.0 :

Operazioni di massa: documenti SQLAlchemy

Con queste operazioni, ora puoi fare inserimenti o aggiornamenti collettivi!

Ad esempio, puoi fare:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Qui verrà realizzato un inserto di massa.


SQLAlchemy ha introdotto quello nella versione 1.0.0 :

Operazioni di massa: documenti SQLAlchemy

Con queste operazioni, ora puoi fare inserimenti o aggiornamenti collettivi!

Ad esempio (se si desidera l'overhead più basso per gli INSERT della tabella semplice), è possibile utilizzare Session.bulk_insert_mappings() :

loadme = [
        (1, 'a')
    ,   (2, 'b')
    ,   (3, 'c')
    ]

dicts = []
for i in range(len(loadme)):
    dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Oppure, se vuoi, salta le tuple del loadme e scrivi i dizionari direttamente in dicts (ma trovo più facile lasciare tutta la parola dei dati e caricare un elenco di dizionari in un ciclo).


I documenti di sqlalchemy hanno una buona padronanza delle prestazioni delle varie tecniche che possono essere utilizzate per gli inserimenti di massa:

Gli ORM non sono fondamentalmente intesi per inserzioni collettive ad alte prestazioni: questa è l'intera ragione per cui SQLAlchemy offre il Core oltre all'ORM come componente di prima classe.

Per il caso d'uso degli inserimenti veloci di massa, la generazione SQL e il sistema di esecuzione su cui ORM si basa sono parte del core. Usando direttamente questo sistema, possiamo produrre un INSERTO competitivo con l'utilizzo dell'API del database raw direttamente.

In alternativa, SQLAlchemy ORM offre la suite di metodi Bulk Operations, che fornisce hook in sottosezioni dell'unità di processo di lavoro per emettere i costrutti INSERT e UPDATE di livello base con un piccolo grado di automazione basata su ORM.

L'esempio seguente illustra i test basati sul tempo per diversi metodi di inserimento delle righe, dal più automatico al più piccolo. Con cPython 2.7, i runtime hanno osservato:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

script:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)

Questo problema può verificarsi anche in Windows. Cmake analizza il registro e talvolta i valori python non sono impostati. Per quelli con problemi simili:

http://ericsilva.org/2012/10/11/restoring-your-python-registry-in-windows/

Basta creare un file .reg per impostare le chiavi necessarie e modificare di conseguenza per abbinare la configurazione.

Windows Registry Editor Version 5.00

[HKEY_CURRENT_USER\Software\Python]

[HKEY_CURRENT_USER\Software\Python\Pythoncore]

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.6]

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.6\InstallPath]
@="C:\\python26"

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.6\PythonPath]
@="C:\\python26;C:\\python26\\Lib\\;C:\\python26\\DLLs\\"

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.7]

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.7\InstallPath]
@="C:\\python27"

[HKEY_CURRENT_USER\Software\Python\Pythoncore\2.7\PythonPath]
@="C:\\python27;C:\\python27\\Lib\\;C:\\python27\\DLLs\\"






python mysql database orm sqlalchemy