Replies: 12 comments
-
|
Yes, it is |
Beta Was this translation helpful? Give feedback.
-
|
@pcorbel thanks, that's a way of working around the problem, but not very performant and also not efficient to SQLITE and PostgreSQL both have upsert Of course it's still early in the game for this library, but it would be nice if there was an |
Beta Was this translation helpful? Give feedback.
-
|
@mborus this is one way of doing a bulk_upsert, though it has the internal overhead of requiring a select for each record. (maybe someone can improve on that?) It could also be improved with iterators instead of lists. It will work on any model object representing a table and any list of column attributes/names you designate to match against. The benefit of this version is you don't have to rewrite if for different situations, it should work universally. Uncomment the print statements to verify it is working as required. My vote is for a dedicated upsert also, but in the mean time... from typing import Union, Sequence, List, Dict
from sqlmodel import Session, SQLModel, and_, create_engine, or_, select
def bulk_upsert(data: Union[Dict, Sequence[Dict]], model: str, columns: Union[str, List[str]]) -> None:
""" Given a dict record or list of records, update existing records on specified columns, or
insert new record. This works by building clauses and reducing them into the statement.
Usage:
data = {"name": John, "ticket": "A95134"}
# or
data = [{"name": John, "ticket": "A95134"}, {"name": Jane, "ticket": "S87221"}]"
# perform upsert matching on a single column
bulk_upsert(data, "Entries", "ticket")
# perform upsert matching on multiple columns
bulk_upsert(data, "Entries", ["name", "ticket"])
Parameters:
:param data: Union[Dict, Sequence[Dict]]: A dictionary or list of dictionary representing
records in a table.
:param model: str: The name of the SQLModel representing a table.
:param columns: Union[str, List[str]]: Column(s) to match on for upsert.
:return: None: Takes internal action; bulk upsert.
"""
# ensure listyness
if isinstance(data, Dict):
data = [data]
if isinstance(columns, str):
columns = [columns]
# open a session with the database engine
with Session(engine) as session:
# get specified model
obj = getattr(models, model)
for record in data:
# get obj columns and designate matches
match_on = [getattr(obj, col) == record.get(col) for col in columns]
# reduce clauses into statement
statement = select(obj).where(and_(*match_on))
# check for existing record
upsert = session.exec(statement).first()
# prepare the upsert
if upsert:
_ = [setattr(upsert, key, record[key]) for key in record]
# print(f"Upserting {upsert.id} with {record}")
else:
upsert = obj(**record)
# print(f"Inserting {record}")
# add to the session
session.add(upsert)
# commit session
session.commit() |
Beta Was this translation helpful? Give feedback.
-
|
Relevant SO discussion concerning SQLAlchemy's facility to accomplish upserts. How to do an upsert with SqlAlchemy? |
Beta Was this translation helpful? Give feedback.
-
|
Another relevant SO answer in the same question. This one is interesting because it adds functionality to the model at time of definition. I imagine building something like this that is used as a base model would make the behavior available by default for all models. https://stackoverflow.com/a/67969355/1663382 |
Beta Was this translation helpful? Give feedback.
-
|
I implemented something that allows modifying the SQL on the go: from uuid import UUID
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, BigInteger, ForeignKey
from sqlmodel import Field, SQLModel, UniqueConstraint, create_engine, Session, select, delete, insert
class Files(SQLModel, table=True):
__table_args__ = dict(info=dict(magic_insert=True))
id: Optional[int] = Field(default=None, sa_column=Column(BigInteger(), primary_key=True, autoincrement=True))
hash: str = Field(unique=True)
@compiles(Insert, "postgresql")
def visit_insert_postgresql(element, compiler, **kw):
sql: str = compiler.visit_insert(element, **kw)
magic_insert = element.entity_description["table"].info.get("magic_insert", False)
if magic_insert:
returning_idx = sql.find(" RETURNING")
if returning_idx != -1:
sql = sql[:returning_idx] + " ON CONFLICT DO NOTHING" + sql[returning_idx:]
else:
sql += " ON CONFLICT DO NOTHING"
print(f"visit_insert_postgresql: {sql}")
return sqlBut this fails because of the Might be useful for someone who wants to implement this properly 🤷🏻♂️ The 'upsert' or 'insert if not exists' operations seem extremely fundamental to me, so too bad they are missing. It would not be possible for me to do a select for 10 million rows, just to know if they were already inserted... |
Beta Was this translation helpful? Give feedback.
-
|
i have a system that supports sqlite and postgresql databases. i started with the simple a solution that works well for me is:
this is slightly less performant than doing an note that i'm using AsyncSession, but this should work with sync Session as well. # db.py
from sqlalchemy.dialects import postgresql
from sqlmodel import Field, SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
async def insert_if_not_exists(session: AsyncSession, model: SQLModel) -> bool:
"""
Inserts the provided record if a row with the same primary key(s) does already exist in the table.
Returns True if the record was inserted, False if it already existed.
"""
# the postgresql.insert function is used to generate an INSERT statement with an ON CONFLICT DO NOTHING clause.
# note that sqlite also supports ON CONFLICT DO NOTHING, so this works with both database types.
statement = (
postgresql.insert(model.__class__).values(**model.model_dump(exclude_unset=True)).on_conflict_do_nothing()
)
conn = await session.connection()
result = await conn.execute(statement)
return result.rowcount > 0
class User(SQLModel, table=True):
user_id: str = Field(primary_key=True)
name: strexample usage for upsert equivalent behavior: async def add_or_update_user(
session: AsyncSession,
user_id: str,
user_name: str
) -> None:
inserted = await db.insert_if_not_exists(session, db.User(user_id=user_id, name=user_name))
if inserted:
return await session.commit()
user = (
await session.exec(select(db.User).where(db.User.user_id == user_id).with_for_update())
).one()
user.name = user_name
session.add(user)
await session.commit() |
Beta Was this translation helpful? Give feedback.
-
|
My current workaround for this one at the moment looks like this. Create a import datetime
from typing import ClassVar
from sqlmodel import SQLModel, Field
from sqlalchemy.dialects.postgresql import insert
class BaseModel(SQLModel):
"""Represents the base class for all models."""
# Specifies the set of index elements which represent the ON CONFLICT target
UPSERT_INDEX_ELEMENTS: ClassVar[set[str]] = set()
# Specifies the set of fields to exclude from updating in the resulting
# UPSERT statement
UPSERT_EXCLUDE_FIELDS: ClassVar[set[str]] = set()
id: int | None = Field(default=None, primary_key=True)
created_at: datetime.datetime | None = Field(
default_factory=datetime.datetime.utcnow,
)
updated_at: datetime.datetime | None = Field(
default_factory=datetime.datetime.utcnow,
)
def upsert(self):
"""Returns an UPSERT statement"""
exclude_fields = self.UPSERT_EXCLUDE_FIELDS.copy()
# Common fields which we should exclude when updating.
exclude_fields.add('id')
exclude_fields.add('created_at')
# Dump the model and exclude the specified fields during update.
obj_dict = self.model_dump()
to_update = obj_dict.copy()
for field in exclude_fields:
_ = to_update.pop(field, None)
stmt = insert(self.__class__).values(obj_dict)
stmt = stmt.on_conflict_do_update(
index_elements=self.UPSERT_INDEX_ELEMENTS,
set_=to_update,
)
return stmtThen would create my models based on the above class SomeModel(BaseModel, table=True):
"""My model"""
__tablename__ = 'some_table'
UPSERT_INDEX_ELEMENTS: ClassVar[set[str]] = {'some_unique_field'}
UPSERT_EXCLUDE_FIELDS: ClassVar[set[str]] = {'some_unique_field'}
some_unique_field: str = Field(unique=True)Most of the time Then using this code looks like this. # Import your path.to.module SomeModule here
# Import your engine here as well
from sqlmodel import Session
foo = SomeModel(some_unique_field='foo')
bar = SomeModel(some_unique_field='bar')
with Session(engine) as session:
session.exec(foo.upsert())
session.exec(bar.upsert())
session.commit() |
Beta Was this translation helpful? Give feedback.
-
|
Created a library with an upsert helper class based on @dnaeon's workaround suggestion: https://github.com/dan1elt0m/sadel |
Beta Was this translation helpful? Give feedback.
-
|
To anyone here from google, |
Beta Was this translation helpful? Give feedback.
-
|
@dnaeon 🐐 !! Thank you for sharing that. It was the inspiration I needed to create my own method for the class BaseModel(SQLModel):
...
@property
def primary_key(self):
return [k for k, v in self.model_fields.items() if v.primary_key][0]
@classmethod
def bulk_insert_ignore_conflicts(cls, model_instances: list[SQLModel]):
insertables = []
for obj in model_instances:
insertables.append(obj.model_dump(exclude={obj.primary_key}))
if not isinstance(obj, cls):
raise ValueError("Can only bulk insert into one model at a time")
return insert(cls).values(insertables).on_conflict_do_nothing()P.S. Feels like there should be a baked-in way to get the model's primary key, but I didn't see one. |
Beta Was this translation helpful? Give feedback.
-
|
SQLModel doesn't have a built-in Using def upsert_item(db: Session, item: Item) -> Item:
merged = db.merge(item) # inserts if no PK match, updates if exists
db.commit()
db.refresh(merged)
return mergedUsing PostgreSQL from sqlalchemy.dialects.postgresql import insert as pg_insert
def upsert_item(db: Session, item: Item, conflict_columns: list[str]):
data = item.model_dump()
stmt = pg_insert(Item).values(**data)
stmt = stmt.on_conflict_do_update(
index_elements=conflict_columns,
set_={k: stmt.excluded[k] for k in data if k not in conflict_columns}
)
db.execute(stmt)
db.commit()For MySQL use If you need def get_or_create(db: Session, model, defaults: dict | None = None, **kwargs):
instance = db.exec(select(model).filter_by(**kwargs)).first()
if instance:
return instance, False
instance = model(**kwargs, **(defaults or {}))
db.add(instance)
db.commit()
db.refresh(instance)
return instance, True |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
First Check
Commit to Help
Example Code
Description
Let say I have an instance of a model
I want to upsert
hero_1into the database: insert if this id does not exist yet, otherwise update. Is there a pattern to do that?Operating System
Linux, macOS
Operating System Details
No response
SQLModel Version
0.0.4
Python Version
3.9.6
Additional Context
No response
Beta Was this translation helpful? Give feedback.
All reactions