Speed up bulk insert #2290
-
I come back to the community with a new question. I have two question:
I'm also open to any general remark. The real relevant function here is I currently use docker image edgedb/edgedb:1-alpha7 I run twice the import asyncio
import sys
from datetime import datetime, timedelta
from typing import List, Tuple
import pandas as pd
from numpy.random import MT19937
from numpy.random import RandomState, SeedSequence
import edgedb
import ujson
from aiostream import stream
from edgedb import AsyncIOConnection, AsyncIOPool, InvalidReferenceError
init_esdl = [
"""drop type MyTable;""",
"""drop type SubTable;""",
"""drop scalar type posfloat64;""",
"""create scalar type posfloat64 extending float64 {
create constraint min_value(0);
}""",
"""create type SubTable {
create required property name -> str;
};""",
"""create type MyTable {
create required link sub -> SubTable;
create required property timestamp -> datetime;
create required property value -> posfloat64;
create index on (.timestamp);
};""",
"""INSERT SubTable {
name := "NAME"
};""",
]
async def batch(iterable, n=1):
"""Iterates asynchronously over batches of an iterable.
batch([1, 2, 3, 4, 5, 6, 7], 3) -> [[1, 2, 3], [4, 5, 6], [7]]
"""
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx : min(ndx + n, l)]
async def check_if_history_is_in_db(
connection: AsyncIOConnection, values: List[Tuple[str, float]]
) -> bool:
query = f"""
SELECT (
{len(values)} <= count( MyTable
FILTER MyTable.timestamp >= <datetime>$first_date
AND MyTable.timestamp <= <datetime>$last_date)
) AND (
0 < count( MyTable {{ value }}
FILTER MyTable.timestamp = <datetime>$first_date
AND MyTable.value = <posfloat64>$first_value)
) AND (
0 < count( History {{ value }}
FILTER MyTable.timestamp = <datetime>$last_date
AND MyTable.value = <posfloat64>$last_value)
);"""
start = datetime.now()
res = await connection.query_one(
query,
first_date=datetime.fromisoformat(f"{values[0][0]}:00"),
first_value=values[0][1],
last_date=datetime.fromisoformat(f"{values[-1][0]}:00"),
last_value=values[-1][1],
)
print(
f"Checking if it's already in DB took {(datetime.now() - start).total_seconds()} s."
)
return res
async def insert_history_values(
connection: AsyncIOConnection,
values: List[Tuple[str, float]],
batch_size: int = 50_000,
) -> None:
ddl = """
WITH SNAME := (SELECT SubTable FILTER .name = <str>$subname),
FOR x IN {
json_array_unpack(<json>$data)
}
UNION (INSERT MyTable {
sub := SNAME,
timestamp := <datetime>x[0],
value := <posfloat64>x[1]
});
"""
n_sub_queries = (len(values) - 1) // batch_size + 1
print(f"Splitting the query into {n_sub_queries} sub-queries.")
start = datetime.now()
xs = stream.enumerate(batch(values, batch_size), start=1)
async with xs.stream() as streamer:
async with connection.transaction():
try:
async for i, batch_values in streamer:
print(
f"Running query {i}/{n_sub_queries} of {len(ddl)} characters and {len(batch_values)} values."
)
json_batch_values = ujson.dumps(batch_values)
start_sub = datetime.now()
await connection.query(ddl, data=json_batch_values, subname="NAME")
print(
f"DDL query {i}/{n_sub_queries} completed in {(datetime.now() - start_sub).total_seconds()} secs!"
)
except edgedb.errors.InternalServerError as e:
print(e, file=sys.stderr)
raise e
except Exception as e:
print(e, file=sys.stderr)
raise e
print(f"Done in {(datetime.now() - start).total_seconds()} secs!")
async def insert_history_into_db(connection: AsyncIOConnection, values) -> bool:
try:
is_in_db = await check_if_history_is_in_db(connection=connection, values=values)
print(
f"History of {values[0][0]} has{' ' if is_in_db else ' NOT '}already been inserted."
)
if not is_in_db:
await insert_history_values(connection=connection, values=values)
except Exception as e:
print(e, file=sys.stderr)
return False
else:
return True
async def init_base(connection: AsyncIOConnection) -> None:
print("Initializing the DB!")
for s in init_esdl:
print(s)
try:
await connection.query(s)
except InvalidReferenceError as e:
print(e, file=sys.stderr)
async def init_data(connection: AsyncIOConnection) -> None:
"""Create `r` records per day for `duration` days."""
r = 60_000
duration = 5
begin = datetime(2015, 1, 1)
days = [begin + timedelta(days=d) for d in range(duration)]
rs = RandomState(MT19937(SeedSequence(123456789)))
data = [
[
(t.isoformat(), n)
for t, n in zip(
pd.date_range(
day, day + timedelta(days=1, microseconds=-1), periods=r, tz="utc"
).to_pydatetime(),
rs.uniform(low=23, high=42, size=(r,)),
)
]
for day in days
]
for day in data:
await insert_history_into_db(connection=connection, values=day)
async def init(pool: AsyncIOPool, *, skip_init=True) -> None:
async with pool.acquire() as connection:
if not skip_init:
await init_base(connection)
await init_data(connection)
async def main(*, skip_init=True) -> None:
async with await edgedb.create_async_pool(
min_size=2, host="localhost", port=5656, user="edgedb", database="edgedb"
) as pool:
await init(pool, skip_init=skip_init)
if __name__ == "__main__":
asyncio.run(main(skip_init=False))
asyncio.run(main(skip_init=True)) The output:
We can see that Thanks in advance and I hope I made it easy for anyone who is interested to help. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
We'll implement batched Meanwhile, the best way is to open a bunch of concurrent connections and chunk data in. Example: https://github.com/edgedb/webapp-bench/blob/366a9a74f59442cf279b02ab482d4ff48c6b5b2b/_edgedb/loaddata.py
Do you mean the actual size in megabytes or the number of records in an object set? |
Beta Was this translation helpful? Give feedback.
-
This is relevant to my use case. @Mulugruntz are you in the community chat? |
Beta Was this translation helpful? Give feedback.
We'll implement batched
executemany
soon a-la asyncpg, so this'll be much faster.Meanwhile, the best way is to open a bunch of concurrent connections and chunk data in. Example: https://github.com/edgedb/webapp-bench/blob/366a9a74f59442cf279b02ab482d4ff48c6b5b2b/_edgedb/loaddata.py
Do you mean the actual size in megabytes or the number of records in an object set?