Ahoj. Jsem úplný začátečník s pythonem na linuxu. Uvítal bych radu expertů co pracují na vyšší úrovni programování, nebo třeba s ORM. Myslel jsem že to vyřeším pomocí pandas, ale začínám pochybovat o efektivitě toho řešení. Hned na úvod musím říct, že mám jen 4GB ram a zbývá mi už cca 320-350 mega. Ale xFce je stabilní a má minimalistické požadavky, takže v pohodě.
Chtěl jsem napsat program na vyexportování dat (počet záznamů jde do milionu nebo víc). Některé tabulky jsem vyexportoval pomocí pandas a pymysql. Nic těžkého. Nicméně chtěl jsem udělat něco složitějšího... Momentální tabulka se jmenuje routes.txt . A jsou tam některé data pro mě přebytečná jako dlouhé popisy jízd (obsahují obvykle název města, zastávky nebo obce a aut.n.). Myslel jsem si, hele to by šlo zkrátit, místo těch názvů stanic v popisu dám sid (idečko zastávky). Když nenajdu zastávku, tak ten popis prostě smažu. To by znamenalo, že před zápisem musím provést select s left joinem na tabulku. znám route_id, a v tabulce trips je route_id a sid. V další tabulce stop_names je sid a stop_name. Prakticky to znamená udělat distinct select z trips, a pak select z stop_names.
Jenže jsem narazil na zvláštní problémy. pandas.read_sql vyžaduje string či query, a zdá se, že buď neakceptuje objekt nebo mě chce donutit k tomu abych to stringifoval. Jenže si myslím, že když to stringyfuju, tak pak to sqlalchemy nemá vůbec smysl. Proč zavádět objekty když ve výsledku to celé bude stejně neefektivní a použije se textová forma SQL dotazu.
Hlavní je zpracovávat čtené data po chunkách minimálně 1000 řádků najednou.
Na to jsem chtěl zavolat tu funkci, získat ty idečka, 1000 na jednou. Tu na testu mám jen 10. 1000 in a do selectu dát WHERE route_id IN (route_ids)
in execute
chunk.to_sql(name='routes', con=engine, if_exists='append', index=False)
raise TypeError("Query must be a string unless using sqlalchemy.")
TypeError: Query must be a string unless using sqlalchemy.
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DECIMAL, ForeignKey, text
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
import pandas as pd
import zipfile
import io
import re
Base = declarative_base()
# Definice modelu pro tabulku stops
class Stop(Base):
__tablename__ = 'stops'
sid = Column(Integer, primary_key=True)
stop_lat = Column(DECIMAL(10, 8))
stop_lon = Column(DECIMAL(11, 8))
dopravce = Column(String(255))
id = Column(Integer)
# Definice modelu pro tabulku stop_names
class StopName(Base):
__tablename__ = 'stop_names'
sid = Column(Integer, primary_key=True, autoincrement=True)
stop_name = Column(String(255), unique=True)
# Definice modelu pro tabulku trips
class Trip(Base):
__tablename__ = 'trips'
route_id = Column(String(255))
trip_id_1 = Column(Integer, primary_key=True)
trip_id_2 = Column(Integer, primary_key=True)
trip_id_3 = Column(Integer, primary_key=True)
direction_id = Column(Integer)
sid = Column(Integer, ForeignKey('stops.sid'))
stop = relationship("Stop")
# Připojení k databázi
password=""
db="trafic"
try:
# Vytvoření SQLAlchemy engine s použitím pymysql
engine = create_engine(f"mysql+pymysql://admin:{password}@localhost/{db}")
# engine = create_engine('mysql+pymysql://', creator=lambda: mydb)
print("Engine byl úspěšně vytvořen.")
except Exception as e:
print("Při vytváření enginu došlo k chybě:", e)
try:
connection = engine.connect()
print("Spojení k databázi je aktivní.")
except Exception as e:
print("Při testování spojení došlo k chybě:", e)
# Vytvoření tabulek v databázi
Base.metadata.create_all(engine)
# Funkce pro zpracování a vložení dat do databáze
def process_and_insert_data(session, data):
for chunk in data:
# Nahrazení všech výskytů "-CISR-" v route_id
chunk['route_id'] = chunk['route_id'].str.replace('-CISR-', '')
# Získání jedinečných route_id z aktuálního chunku
route_ids = chunk['route_id'].unique()
# Dotaz pro získání sid a stop_name
#query = session.query(Trip.sid, StopName.stop_name).join(StopName).filter(Trip.route_id.in_(route_ids)).distinct()
session.statement = session.query(Trip.sid, StopName.stop_name).select_from(Trip).join(StopName, Trip.sid == StopName.sid).filter(Trip.route_id.in_(route_ids)).distinct()
# Provedení dotazu a získání výsledků
# chunk_result = query.all()
# session.statement = returns the sql query.
chunk_result = pd.read_sql(session.statement, session.bind)
# Zpracování a vložení dat
for row in chunk_result:
needle = row[1]
for index, route_long_name in chunk['route_long_name'].items():
pattern = re.compile(r'\b' + re.escape(needle) + r'\b')
if re.search(pattern, route_long_name):
chunk.at[index, 'route_long_name'] = re.sub(pattern, f"#{row[0]}", route_long_name)
else:
city_name = needle.split(',')[0].strip()
city_pattern = re.compile(r'\b' + re.escape(city_name) + r'\b')
if re.search(city_pattern, route_long_name):
chunk.at[index, 'route_long_name'] = re.sub(city_pattern, f"#{row[0]}", route_long_name)
# Vložení dat do databáze
# chunk.to_sql(name='routes', con=session.connection(), if_exists='append', index=False)
chunk.to_sql(name='routes', con=engine, if_exists='append', index=False)
# Načtení dat ze souboru routes.txt z ZIP archivu po menších blocích
zip_path = "/mnt/ramdisk/JDF_merged_GTFS.zip"
chunksize = 10
with zipfile.ZipFile(zip_path, 'r') as zip_file:
with zip_file.open('routes.txt', 'r') as routes_file:
data = pd.read_csv(io.TextIOWrapper(routes_file, 'utf-8'), chunksize=chunksize)
Session = sessionmaker(bind=engine)
with Session() as session:
process_and_insert_data(session, data)
# Uzavření spojení
engine.dispose()
PS: Nemohl by vlastník stránek zpravit písmo textu, aby nebylo bílé a bílém nejde tu nic přečíst? Používám plugin dark-reader ve FF a vše je bílé.