|
import geopandas as gpd
|
|
import geopandas
|
|
import sys
|
|
import csv
|
|
from datetime import datetime
|
|
import functools
|
|
from io import StringIO
|
|
import logging
|
|
from logging import config
|
|
import pandas as pd
|
|
from pandas.api.types import is_datetime64_any_dtype
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
from geoalchemy2 import Geometry
|
|
from config import (
|
|
DATABASE_URI,
|
|
folderPath_Local,
|
|
wkt_headend_fosc_sfat_mfat_bfat_path,
|
|
wkt_op_path,
|
|
wkt_cables_path,
|
|
ci_sfat_mfat_bfat_path,
|
|
ci_fosc_path,
|
|
ci_site_path,
|
|
ci_op_path,
|
|
ci_feederd_path,
|
|
ci_headend_path,
|
|
ci_mdu_path,
|
|
ci_olt_path,
|
|
ci_splitter_path,
|
|
report_isp_path,
|
|
report_e2e_path,
|
|
report_fusiones_path,
|
|
ifo_path,
|
|
ioo_path,
|
|
rio_path,
|
|
imf_path,
|
|
wkt_fibra_path,
|
|
)
|
|
from config import na_values
|
|
import os
|
|
import traceback
|
|
from arcgis.geocoding import reverse_geocode
|
|
import shutil
|
|
|
|
from shapely import wkt
|
|
from linetimer import CodeTimer
|
|
import warnings
|
|
from process_files import process_files
|
|
|
|
# ==== AMBIENTES Y ESQUEMAS (Paso 1) =========================================
|
|
import os
|
|
|
|
# 1) Flags por variable de entorno
|
|
TARGET_ENV = os.getenv("TARGET_ENV", "prod").strip().lower() # 'prod' | 'stg'
|
|
FULL_RELOAD = os.getenv("FULL_RELOAD", "true").strip().lower() == "true"
|
|
DRY_RUN = os.getenv("DRY_RUN", "false").strip().lower() == "true"
|
|
|
|
# 2) URIs según ambiente (se asume que config expone ambas; si no, usar env vars)
|
|
try:
|
|
from config import DATABASE_URI_STAGING # agregar en tu config.py
|
|
except Exception:
|
|
DATABASE_URI_STAGING = os.getenv("DATABASE_URI_STAGING", "")
|
|
|
|
# 3) Resolución de esquemas según ambiente
|
|
def resolve_schema(schema_name: str) -> str:
|
|
"""
|
|
Retorna el esquema efectivo según el ambiente.
|
|
- En prod: 'audit' / 'cm'
|
|
- En stg: 'audit_stg' / 'cm_stg'
|
|
"""
|
|
schema_name = schema_name.strip().lower()
|
|
if TARGET_ENV == "stg":
|
|
if schema_name == "audit":
|
|
return "audit_stg"
|
|
if schema_name == "cm":
|
|
return "cm_stg"
|
|
return schema_name # prod tal cual
|
|
|
|
SCHEMA_AUDIT = resolve_schema("audit")
|
|
SCHEMA_CM = resolve_schema("cm")
|
|
|
|
# 4) Ayudantes para centralizar I/O con esquema resuelto
|
|
def sql_write(conn, df, table_name, schema_name, if_exists="replace", index=False, method=None, dtype=None):
|
|
if DRY_RUN:
|
|
logging.getLogger("dryrun").info(f"[DRY_RUN] to_sql {schema_name}.{table_name} rows={len(df)}")
|
|
return
|
|
effective_schema = resolve_schema(schema_name)
|
|
df.to_sql(
|
|
table_name,
|
|
conn,
|
|
schema=effective_schema,
|
|
if_exists=if_exists,
|
|
index=index,
|
|
method=method,
|
|
dtype=dtype
|
|
)
|
|
|
|
def sql_write_gdf(conn, gdf, table_name, schema_name, if_exists="replace", index=False, dtype=None):
|
|
if DRY_RUN:
|
|
logging.getLogger("dryrun").info(f"[DRY_RUN] to_postgis {schema_name}.{table_name} rows={len(gdf)}")
|
|
return
|
|
effective_schema = resolve_schema(schema_name)
|
|
gdf.to_postgis(
|
|
table_name,
|
|
conn,
|
|
schema=effective_schema,
|
|
if_exists=if_exists,
|
|
index=index,
|
|
dtype=dtype
|
|
)
|
|
|
|
def sql_read(conn, query_text, geom_col=None):
|
|
"""
|
|
Lee SQL usando el esquema ya resuelto en la query si usas f-strings con SCHEMA_AUDIT/SCHEMA_CM.
|
|
Si necesitas GDF, pasa geom_col != None y usa geopandas.
|
|
"""
|
|
if geom_col:
|
|
return geopandas.GeoDataFrame.from_postgis(text(query_text), conn, geom_col=geom_col)
|
|
return pd.read_sql_query(text(query_text), conn)
|
|
# =============================================================================
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
|
|
engine = None
|
|
pd_co_nd = None
|
|
session = sessionmaker(engine, future=True)
|
|
|
|
# Setup_connection original
|
|
# def setup_connection():
|
|
# global engine
|
|
# if engine is None:
|
|
# engine = create_engine(
|
|
# DATABASE_URI, pool_size=100, max_overflow=50, future=True
|
|
# )
|
|
|
|
# ======= Nuevo setup_connection con stg
|
|
def setup_connection():
|
|
global engine
|
|
if engine is None:
|
|
db_uri = DATABASE_URI # default
|
|
if TARGET_ENV == "stg":
|
|
if not DATABASE_URI_STAGING:
|
|
raise RuntimeError("DATABASE_URI_STAGING no está definido (env o config).")
|
|
db_uri = DATABASE_URI_STAGING
|
|
engine = create_engine(db_uri, pool_size=100, max_overflow=50, future=True)
|
|
# =======
|
|
|
|
def log_final_en_postgres():
|
|
cnx=engine.raw_connection()
|
|
cursor=cnx.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO z_procesos_python.scripts_ejecutados (nombre_script, ultima_ejecucion)
|
|
VALUES ('Loader', CURRENT_TIMESTAMP AT TIME ZONE 'America/Argentina/Buenos_Aires');
|
|
""")
|
|
cnx.commit()
|
|
|
|
|
|
def recreate_database():
|
|
# Base.metadata.drop_all(engine)
|
|
# Base.metadata.create_all(engine)
|
|
pass
|
|
|
|
|
|
# define a function that handles and parses psycopg2 exceptions
|
|
def print_psycopg2_exception(err):
|
|
# get details about the exception
|
|
err_type, err_obj, traceback = sys.exc_info()
|
|
|
|
# get the line number when exception occured
|
|
line_num = traceback.tb_lineno
|
|
|
|
# print the connect() error
|
|
logging.error("\npsycopg2 ERROR:", err, "on line number:", line_num)
|
|
logging.error("psycopg2 traceback:", traceback, "-- type:", err_type)
|
|
|
|
# psycopg2 extensions.Diagnostics object attribute
|
|
print("\nextensions.Diagnostics:", err.diag)
|
|
|
|
# print the pgcode and pgerror exceptions
|
|
logging.error("pgerror:", err.pgerror)
|
|
logging.error("pgcode:", err.pgcode, "\n")
|
|
|
|
|
|
def psql_insert_copy(table, conn, keys, data_iter):
|
|
"""
|
|
Execute SQL statement inserting data
|
|
|
|
Parameters
|
|
----------
|
|
table : pandas.io.sql.SQLTable
|
|
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
|
|
keys : list of str
|
|
Column names
|
|
data_iter : Iterable that iterates the values to be inserted
|
|
"""
|
|
# gets a DBAPI connection that can provide a cursor
|
|
dbapi_conn = conn.connection
|
|
with dbapi_conn.cursor() as cur:
|
|
s_buf = StringIO()
|
|
writer = csv.writer(s_buf)
|
|
writer.writerows(data_iter)
|
|
s_buf.seek(0)
|
|
|
|
columns = ", ".join('"{}"'.format(k) for k in keys)
|
|
if table.schema:
|
|
table_name = "{}.{}".format(table.schema, table.name)
|
|
else:
|
|
table_name = table.name
|
|
|
|
sql = "COPY {} ({}) FROM STDIN WITH CSV".format(table_name, columns)
|
|
try:
|
|
cur.copy_expert(sql=sql, file=s_buf)
|
|
except Exception as err:
|
|
print_psycopg2_exception(err)
|
|
|
|
|
|
def to_sql(conn, df, table, dtype="", if_exists="replace", sep="|", encoding="utf8"):
|
|
# Create Table
|
|
df[:0].to_sql(table, engine, if_exists=if_exists, index=False, dtype=dtype)
|
|
|
|
# Prepare data
|
|
output = StringIO()
|
|
df.to_csv(output, index=False, sep=sep, header=False, encoding=encoding)
|
|
output.seek(0)
|
|
|
|
# Insert data
|
|
|
|
cursor = conn.cursor()
|
|
cursor.copy_from(output, table, sep=sep, null="")
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
def convert(val):
|
|
|
|
if val in na_values:
|
|
return 0
|
|
return val
|
|
|
|
|
|
@functools.lru_cache
|
|
def convert_str(val):
|
|
|
|
if val in na_values:
|
|
return ""
|
|
return val.strip().strip("\r\n")
|
|
|
|
|
|
def import_wkt_fibra_raw(conn, path) -> None:
|
|
logger = logging.getLogger("import_cm_wkt_fibra_raw_table")
|
|
logger.info("Processing wkt_fibra csv...")
|
|
df_wkt = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="unicode_escape",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# headers
|
|
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
|
|
|
|
def wkt_loads(x):
|
|
try:
|
|
return wkt.loads(x)
|
|
except Exception as e:
|
|
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
|
|
return wkt.loads("MULTILINESTRING EMPTY")
|
|
|
|
df_wkt["geom"] = df_wkt["geom"].apply(wkt_loads)
|
|
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")
|
|
|
|
# ⬇️ nuevo: helper + esquema resuelto
|
|
sql_write_gdf(
|
|
conn,
|
|
gdf,
|
|
"wkt_fibra_raw",
|
|
"audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
logger.debug("wkt_fibra data imported successfully")
|
|
|
|
def import_cm_inventory_cable_occupation(conn, with_geocoding=False) -> None:
|
|
logger = logging.getLogger("import_cm_wkt_op_table")
|
|
|
|
join_query = f"""
|
|
SELECT wfr."cable name",wfr."cable descrp","tipo cable", wfr.fibra, wfr."ruta fisica",
|
|
wfr."circuito",wfr."user by", wfr.owner, partido, asset, cnt_crossw,
|
|
ST_Union(geom) AS geom,
|
|
ST_length(ST_Union(geom)::geography)
|
|
FROM {SCHEMA_AUDIT}.wkt_fibra_raw wfr
|
|
GROUP BY wfr."cable name",wfr."cable descrp","tipo cable", wfr.fibra, wfr."ruta fisica",
|
|
wfr."circuito",wfr."user by", wfr.owner, partido, asset, cnt_crossw;
|
|
"""
|
|
|
|
logger.debug("Obteniendo datos desde PostgreSQL...")
|
|
|
|
df_wkt_op = pd.read_sql_query(text(join_query), conn)
|
|
|
|
logger.debug("Importando datos a inventory_cable_occupation....")
|
|
|
|
df_wkt_op["upload_date"] = datetime.today().strftime("%d-%m-%Y")
|
|
sql_write(
|
|
conn,
|
|
df_wkt_op,
|
|
"inventory_cable_occupation",
|
|
"cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
|
|
|
|
logger.debug("Datos de inventory_cable_occupation importados exitosamente")
|
|
|
|
def import_cm_wkt_op_raw_table(conn, path) -> None:
|
|
logger = logging.getLogger("import_cm_wkt_op_raw_table")
|
|
logger.info("Processing wkt_op csv...")
|
|
|
|
df_wkt = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="unicode_escape",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
|
|
df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)
|
|
|
|
def wkt_loads(x):
|
|
try:
|
|
return wkt.loads(x)
|
|
except Exception as e:
|
|
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
|
|
return wkt.loads("MULTILINESTRING EMPTY")
|
|
|
|
df_wkt["geom"] = df_wkt["geom"].apply(wkt_loads)
|
|
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")
|
|
|
|
# ⬇️ nuevo: helper + esquema resuelto
|
|
sql_write_gdf(
|
|
conn,
|
|
gdf,
|
|
"wkt_op_raw",
|
|
"audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
logger.debug("wkt_op_raw data imported successfully")
|
|
|
|
def import_cm_wkt_op_table(conn, with_geocoding=False) -> None:
|
|
logger = logging.getLogger("import_cm_wkt_op_table")
|
|
|
|
join_query = f"""
|
|
SELECT osname, cables, ST_Union(geom) AS geom, circuit
|
|
FROM {SCHEMA_AUDIT}.wkt_op_raw
|
|
GROUP BY osname, cables, circuit
|
|
"""
|
|
|
|
logger.debug("Obteniendo datos desde PostgreSQL...")
|
|
|
|
df_wkt_op = pd.read_sql_query(text(join_query), conn)
|
|
|
|
logger.debug("Importando datos a wkt_op....")
|
|
|
|
df_wkt_op["upload_date"] = datetime.today().strftime("%d-%m-%Y")
|
|
sql_write(
|
|
conn, df_wkt_op, "wkt_op", "audit",
|
|
if_exists="replace", index=False,
|
|
method=psql_insert_copy, dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
|
|
logger.debug("Datos de wkt_op importados exitosamente")
|
|
|
|
|
|
def import_report_fusiones(conn, path):
|
|
|
|
df_report_fusiones = pd.read_csv(
|
|
path, keep_default_na=False, encoding="latin1", na_values=na_values, sep="|"
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_report_fusiones.columns = df_report_fusiones.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_report_fusiones.columns = df_report_fusiones.columns.str.lower()
|
|
df_report_fusiones["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug("Importing data to df_report_fusiones....")
|
|
|
|
|
|
sql_write(
|
|
conn,
|
|
df_report_fusiones,
|
|
table_name="report_fusiones",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
|
|
|
|
logging.debug("df_report_fusiones data imported successfully")
|
|
|
|
|
|
|
|
|
|
def import_report_network_splitter_ocupation(conn, path):
|
|
report_network_splitter_ocupation = pd.read_excel(
|
|
path, sheet_name=r"Export Worksheet", usecols="A:T"
|
|
)
|
|
# Fixing double quotes on headers
|
|
report_network_splitter_ocupation.columns = (
|
|
report_network_splitter_ocupation.columns.str.strip('"')
|
|
)
|
|
# Fixing uppercase headers
|
|
report_network_splitter_ocupation.columns = (
|
|
report_network_splitter_ocupation.columns.str.lower()
|
|
)
|
|
logging.debug("Importing data to df_report_e2e....")
|
|
|
|
sql_write(
|
|
conn,
|
|
report_network_splitter_ocupation,
|
|
table_name="network_splitter_ocupation",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logging.debug("report_network_splitter_ocupation data imported successfully")
|
|
|
|
|
|
def import_wkt_cables_table(conn, path) -> None:
|
|
logger = logging.getLogger("import_wkt_cables_table")
|
|
logger.info("Processing wkt_cables csv...")
|
|
|
|
df_wkt = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="cp1252",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers + lowercase (igual al original, compactado)
|
|
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
|
|
|
|
df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)
|
|
|
|
# Mismo filtro de vacíos y parser WKT que el original
|
|
df_wkt["wkt"].dropna(inplace=True)
|
|
|
|
def wkt_loads(x):
|
|
try:
|
|
return wkt.loads(x)
|
|
except Exception as e:
|
|
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
|
|
return wkt.loads("POINT EMPTY")
|
|
|
|
df_wkt["geom"] = df_wkt["wkt"].apply(wkt_loads)
|
|
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
df_wkt.drop(["wkt"], axis=1, inplace=True)
|
|
|
|
gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")
|
|
|
|
# ⬇️ nuevo: helper + esquema resuelto (audit → audit_stg si TARGET_ENV=stg)
|
|
sql_write_gdf(
|
|
conn,
|
|
gdf,
|
|
"wkt_cables",
|
|
"audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
logger.debug("wkt_cables data imported successfully")
|
|
|
|
|
|
def import_cm_ci_sfat_mfat_bfat_raw_table(conn, path, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_sfat_mfat_bfat csv...")
|
|
|
|
df_ci_sfat_mfat_bfat = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="unicode_escape",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
|
|
# Fixing double quotes on headers
|
|
df_ci_sfat_mfat_bfat.columns = df_ci_sfat_mfat_bfat.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_sfat_mfat_bfat.columns = df_ci_sfat_mfat_bfat.columns.str.lower()
|
|
df_ci_sfat_mfat_bfat.drop("partido", axis=1, inplace=True)
|
|
|
|
df_ci_sfat_mfat_bfat.rename(
|
|
columns={"partido_fisico": "partido_despliegue"}, inplace=True
|
|
)
|
|
# Reducing footprint
|
|
df_ci_sfat_mfat_bfat["description"] = df_ci_sfat_mfat_bfat["nombre_atc"]
|
|
df_ci_sfat_mfat_bfat = df_ci_sfat_mfat_bfat.convert_dtypes()
|
|
df_ci_sfat_mfat_bfat["topologia"] = df_ci_sfat_mfat_bfat["topologia"].astype(
|
|
"category"
|
|
)
|
|
df_ci_sfat_mfat_bfat["operador"] = df_ci_sfat_mfat_bfat["operador"].astype(
|
|
"category"
|
|
)
|
|
# df_ci_sfat_mfat_bfat['partido_cabecera'] =
|
|
# df_ci_sfat_mfat_bfat['partido_cabecera'].astype('category')
|
|
df_ci_sfat_mfat_bfat["partido_despliegue"] = df_ci_sfat_mfat_bfat[
|
|
"partido_despliegue"
|
|
].astype("category")
|
|
df_ci_sfat_mfat_bfat["cabecera"] = df_ci_sfat_mfat_bfat["cabecera"].astype(
|
|
"category"
|
|
)
|
|
df_ci_sfat_mfat_bfat["tipo"] = df_ci_sfat_mfat_bfat["tipo"].astype("category")
|
|
df_ci_sfat_mfat_bfat["nombre_produto"] = df_ci_sfat_mfat_bfat[
|
|
"nombre_produto"
|
|
].astype("category")
|
|
df_ci_sfat_mfat_bfat["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
if with_geocoding:
|
|
for row in df_ci_sfat_mfat_bfat.itertuples():
|
|
df_ci_sfat_mfat_bfat.loc[
|
|
df_ci_sfat_mfat_bfat["id_nodo"] == row.id_nodo, "direccion"
|
|
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]
|
|
|
|
logger.debug("Importing data to ci_sfat_mfat_bfat_raw....")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ci_sfat_mfat_bfat,
|
|
table_name="ci_sfat_mfat_bfat_raw",
|
|
schema_name="audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_sfat_mfat_bfat_raw data imported successfully")
|
|
|
|
|
|
# 04102023 - RL - Se crea la instancia CI_SITE en schema audit
|
|
def import_cm_ci_site_table(conn, path, with_geocoding=False) -> None:
|
|
|
|
logger = logging.getLogger()
|
|
logger.info("Processing cm_ci_site json...")
|
|
|
|
shutil.copyfile(path, './files/file1-tmp.json')
|
|
df_ci_site = pd.read_json('./files/file1-tmp.json')
|
|
df_ci_site.reset_index(drop=True, inplace=True)
|
|
|
|
# Fixing double quotes on headers
|
|
df_ci_site.columns = df_ci_site.columns.astype(str).str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_site.columns = df_ci_site.columns.str.lower()
|
|
df_ci_site.drop("partido", axis=1, inplace=True)
|
|
df_ci_site.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)
|
|
|
|
df_ci_site["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
sql_write(
|
|
conn,
|
|
df_ci_site,
|
|
table_name="ci_site",
|
|
schema_name="audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_site data imported successfully")
|
|
|
|
def import_cm_ci_fosc_raw_table(conn, path, path2, with_geocoding=False) -> None:
|
|
|
|
###RF 30-05-2023 Esta tabla viene de UAT es para probar
|
|
###RF 15-06-2023 Pongo en productivo esto sacando la antigua tabla
|
|
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_fosc json...")
|
|
|
|
shutil.copyfile(path, './files/file2-tmp.json')
|
|
df_ci_fosc = pd.read_json('./files/file2-tmp.json')
|
|
df_ci_fosc.reset_index(drop=True, inplace=True)
|
|
|
|
shutil.copyfile(path2, './files/file3-tmp.json')
|
|
df_ci_site = pd.read_json('./files/file3-tmp.json')
|
|
df_ci_site.reset_index(drop=True, inplace=True)
|
|
|
|
# Fixing double quotes on headers
|
|
df_ci_fosc.columns = df_ci_fosc.columns.astype(str).str.strip('"')
|
|
df_ci_site.columns = df_ci_site.columns.astype(str).str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_fosc.columns = df_ci_fosc.columns.str.lower()
|
|
df_ci_fosc.drop("partido", axis=1, inplace=True)
|
|
df_ci_fosc.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)
|
|
|
|
df_ci_site.columns = df_ci_site.columns.str.lower()
|
|
df_ci_site.drop("partido", axis=1, inplace=True)
|
|
df_ci_site.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)
|
|
|
|
|
|
df_ci_fosc["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
df_ci_site["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
df_ci_fosc=pd.concat([df_ci_site,df_ci_fosc])
|
|
|
|
logger.debug("Importing data to ci_fosc_raw....")
|
|
# copy_from_stringio(conn, df_instalaciones, "instalaciones")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ci_fosc,
|
|
table_name="ci_fosc_raw",
|
|
schema_name="audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_fosc_raw data imported successfully")
|
|
|
|
# 16.12.2025 (se agregan fraw.asset y fraw.fecha_renovacion)
|
|
|
|
def import_cm_ci_fosc_table(conn, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_fosc_raw...")
|
|
join_query = f"""select
|
|
fraw.id_botella,
|
|
fraw.topologia,
|
|
fraw.nombre_atc,
|
|
fraw.nombre_cliente,
|
|
fraw.operador,
|
|
fraw.cabecera,
|
|
fraw.nombre_produto,
|
|
fraw.partido_despliegue,
|
|
fraw.fecha_creacion,
|
|
fraw.latitud,
|
|
fraw.longitud,
|
|
fraw.direccion,
|
|
fraw.id_mslink,
|
|
fraw.id_cm,
|
|
fraw.ci_name,
|
|
fraw.tipo,
|
|
fraw.fusiones,
|
|
fraw.built_date,
|
|
fraw.asset,
|
|
fraw.fecha_renovacion,
|
|
whfsmb.feeder,
|
|
whfsmb.geom
|
|
from
|
|
{SCHEMA_AUDIT}.ci_fosc_raw fraw,
|
|
{SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat whfsmb
|
|
where
|
|
fraw.id_cm = whfsmb.name
|
|
and st_isvalid(whfsmb.geom);"""
|
|
#RF 12-04-2023 Se agrega isvalid en el query para evitar malas geometrías
|
|
df_ci_fosc = geopandas.GeoDataFrame.from_postgis(
|
|
text(join_query), conn, geom_col="geom"
|
|
)
|
|
|
|
if with_geocoding:
|
|
for row in df_ci_fosc.itertuples():
|
|
df_ci_fosc.loc[
|
|
df_ci_fosc["id_botella"] == row.id_botella, "direccion"
|
|
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]
|
|
|
|
logger.debug("Importing data to ci_fosc....")
|
|
|
|
df_ci_fosc["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
conn.commit()
|
|
df_ci_fosc.to_postgis(
|
|
"ci_fosc",
|
|
conn,
|
|
schema="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geometry": Geometry("POINT", srid=4326)},
|
|
)
|
|
|
|
logger.debug("ci_fosc data imported successfully")
|
|
|
|
|
|
def import_cm_ci_mdu_table(conn, path, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_mdu csv...")
|
|
|
|
df_ci_mdu = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="latin1",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci_mdu.columns = df_ci_mdu.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_mdu.columns = df_ci_mdu.columns.str.lower()
|
|
|
|
if with_geocoding:
|
|
for row in df_ci_mdu.itertuples():
|
|
df_ci_mdu.loc[
|
|
df_ci_mdu["id_edificio"] == row.id_edificio, "direccion"
|
|
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]
|
|
|
|
df_ci_mdu["id_cm"] = df_ci_mdu["id_cm"].str.replace("\t", "", regex=True)
|
|
df_ci_mdu["id_cm"] = df_ci_mdu["id_cm"].str.replace("\r\n", "", regex=True)
|
|
logger.debug("Importing data to ci_mdu....")
|
|
df_ci_mdu["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
sql_write(
|
|
conn,
|
|
df_ci_mdu,
|
|
table_name="ci_mdu",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_mdu data imported successfully")
|
|
|
|
|
|
def import_cm_ci_olt_table(conn, path, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_olt csv...")
|
|
|
|
df_ci_olt = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="utf-8",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci_olt.columns = df_ci_olt.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_olt.columns = df_ci_olt.columns.str.lower()
|
|
|
|
logger.debug("Importing data to ci_olt....")
|
|
|
|
df_ci_olt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
sql_write(
|
|
conn,
|
|
df_ci_olt,
|
|
table_name="ci_olt",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_olt data imported successfully")
|
|
|
|
|
|
def import_cm_ci_op_raw_table(conn, path, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_op_raw csv...")
|
|
|
|
df_ci_op = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="latin-1",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci_op.columns = df_ci_op.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_ci_op.columns = df_ci_op.columns.str.lower()
|
|
df_ci_op.rename(columns={"partido": "partido_despliegue"}, inplace=True)
|
|
|
|
df_ci_op["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
logger.debug("Importing data to ci_op_raw....")
|
|
|
|
# Escritura alineada a staging
|
|
sql_write(
|
|
conn,
|
|
df_ci_op,
|
|
table_name="ci_op_raw",
|
|
schema_name="audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
|
|
|
|
def import_cm_ci_op_table(conn, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_op csv...")
|
|
|
|
#RF 11-04-2023 Agrego el filtro de is_valid para la geometria
|
|
# Query (esquema resuelto a prod/stg) + lectura
|
|
join_query = f"""select
|
|
null as nombre_op,
|
|
oraw.id_co_name,
|
|
oraw.nombre_co as nombre_sn_co,
|
|
oraw.id_segmento,
|
|
oraw.id_cm,
|
|
oraw.operador,
|
|
oraw.partido_despliegue,
|
|
oraw.cabecera,
|
|
oraw.ci_name,
|
|
oraw.nombre_co_claro,
|
|
oraw.nombre_co_atc,
|
|
oraw.topologia,
|
|
wop.cables,
|
|
wop.geom
|
|
from
|
|
{SCHEMA_AUDIT}.ci_op_raw oraw,
|
|
{SCHEMA_AUDIT}.wkt_op wop
|
|
where
|
|
oraw.id_cm = wop.osname
|
|
and st_isvalid(wop.geom);"""
|
|
|
|
df_ci_op = pd.read_sql_query(text(join_query), con=conn)
|
|
|
|
df_ci_op["nombre_op"] = df_ci_op.apply(
|
|
lambda row: row["nombre_co_atc"] if ("AMX" != row["operador"]) else row["nombre_co_claro"],
|
|
axis=1,
|
|
)
|
|
df_ci_op["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ci_op,
|
|
table_name="ci_op",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
|
|
)
|
|
logger.debug("ci_op data imported successfully")
|
|
|
|
|
|
def import_cm_ci_feeder_distribution_raw_table(conn, path) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_feeder_distribution_raw csv...")
|
|
|
|
df_ci_fd = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="cp1252",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci_fd.columns = df_ci_fd.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_fd.columns = df_ci_fd.columns.str.lower()
|
|
df_ci_fd["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logger.debug("Importing data to ci_feeder_distribution_raw....")
|
|
# copy_from_stringio(conn, df_instalaciones, "instalaciones")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ci_fd,
|
|
table_name="ci_feeder_distribution_raw",
|
|
schema_name="audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logger.debug("ci_feeder_distribution_raw data imported successfully")
|
|
|
|
|
|
def import_cm_ci_feeder_distribution_table(conn) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_feeder_distribution csv...")
|
|
|
|
# 16.12.2025 (se agregan fraw.asset y fraw.fecha_renovacion)
|
|
|
|
|
|
# DESPUÉS (esquema resuelto prod/stg)
|
|
join_query = f"""select
|
|
fraw.id_cable,
|
|
fraw.largo,
|
|
fraw.cable_type,
|
|
fraw.nombre_atc,
|
|
fraw.nombre_clilente,
|
|
fraw.operador,
|
|
fraw.partido,
|
|
fraw.cabecera,
|
|
fraw.nombre_produto,
|
|
fraw.fecha_creacion,
|
|
fraw.latitud,
|
|
fraw.longitud,
|
|
fraw.direccion,
|
|
fraw.id_mslink,
|
|
fraw.cantidad_pelos,
|
|
fraw.id_cm,
|
|
fraw.ci_name,
|
|
fraw.tipo,
|
|
fraw.built_date,
|
|
fraw.asset,
|
|
fraw.fecha_renovacion,
|
|
wcables.feeder,
|
|
wcables.geom
|
|
from
|
|
{SCHEMA_AUDIT}.ci_feeder_distribution_raw fraw,
|
|
{SCHEMA_AUDIT}.wkt_cables wcables
|
|
where
|
|
fraw.id_cm = wcables.name
|
|
and wcables.geom is not null
|
|
and st_isvalid(wcables.geom);
|
|
"""
|
|
|
|
###RF 20/03/2023
|
|
#Se agrega en el query st_geometrytype(wcables.geom)='ST_LineString' para evitar malas geometrias
|
|
##RF 11/04/2023 Se agrega isvalid geom para evitar malas geometrias
|
|
###RF 09/08/2023
|
|
### Quito del query and st_geometrytype(wcables.geom)='ST_LineString'
|
|
|
|
df_ci_fd = geopandas.GeoDataFrame.from_postgis(
|
|
text(join_query), conn, geom_col="geom"
|
|
)
|
|
|
|
logger.debug("Importing data to ci_feeder_distribution....")
|
|
|
|
df_ci_fd["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
conn.commit()
|
|
df_ci_fd.to_postgis(
|
|
"ci_feeder_distribution",
|
|
conn,
|
|
schema="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geometry": Geometry("LINESTRING", srid=4326)},
|
|
)
|
|
|
|
logger.debug("ci_feeder_distribution data imported successfully")
|
|
|
|
|
|
def import_cm_ci_splitter_table(conn, path) -> None:
|
|
logger = logging.getLogger("import_cm_ci_splitter_table")
|
|
logger.info("Processing ci_splitter_table csv...")
|
|
|
|
df_ci = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="unicode_escape",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci.columns = df_ci.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci.columns = df_ci.columns.str.lower()
|
|
|
|
logger.debug("Importing data to ci_splitter....")
|
|
df_ci["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ci,
|
|
table_name="ci_splitter",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
|
|
logger.debug("ci_splitter data imported successfully")
|
|
|
|
|
|
def import_cm_wkt_headend_fosc_sfat_mfat_bfat_table(conn, path) -> None:
|
|
logger = logging.getLogger("import_cm_wkt_headend_fosc_sfat_mfat_bfat_table")
|
|
logger.info(f"Processing {path} ...")
|
|
|
|
df_wkt = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="unicode_escape",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
|
|
df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)
|
|
|
|
# forzado de vacíos -> 'Point (0 0)' (igual que original)
|
|
df_wkt_vacios = df_wkt[~df_wkt["wkt"].astype(bool)]
|
|
logger.info(f"geom vacios a arreglar\n {df_wkt_vacios} ...")
|
|
df_wkt_vacios = df_wkt[df_wkt["wkt"] == 'Point ( )']
|
|
logger.info(f"geom con Point vacios a arreglar\n {df_wkt_vacios} ...")
|
|
df_wkt.loc[~df_wkt["wkt"].astype(bool), "wkt"] = 'Point (0 0)'
|
|
df_wkt.loc[df_wkt["wkt"] == 'Point ( )', "wkt"] = 'Point (0 0)'
|
|
|
|
df_wkt["geom"] = geopandas.GeoSeries.from_wkt(df_wkt["wkt"])
|
|
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
gdf = geopandas.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")
|
|
|
|
# ⬇️ nuevo: helper + esquema resuelto
|
|
sql_write_gdf(
|
|
conn,
|
|
gdf,
|
|
"wkt_headend_fosc_sfat_mfat_bfat",
|
|
"audit",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geom": Geometry("POINT", srid=4326)},
|
|
)
|
|
logger.debug("headend_fosc_sfat_mfat_bfat data imported successfully")
|
|
|
|
|
|
def translate_descr_to_poleno(descr) -> str:
|
|
map__descr_to_poleno = {
|
|
"ESCOBAR": "ES",
|
|
"TRES DE FEBRERO": "TF",
|
|
"VICENTE LOPEZ": "VL",
|
|
"HURLINGHAM": "HR",
|
|
"MORON": "MO",
|
|
"TIGRE": "TG",
|
|
"SAN FERNANDO": "SF",
|
|
"SAN ISIDRO": "SI",
|
|
"SAN MARTIN": "SM",
|
|
"ITUZAINGO": "IT",
|
|
"LA MATANZA": "LM",
|
|
"MALVINAS ARGENTINAS": "MV",
|
|
}
|
|
|
|
if descr in map__descr_to_poleno:
|
|
return map__descr_to_poleno[descr]
|
|
else:
|
|
return "UNK"
|
|
|
|
|
|
def import_cm_ci_sfat_mfat_bfat_table(conn, with_geocoding=False) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_sfat_mfat_bfat csv...")
|
|
join_query = f"""select
|
|
sraw.id_nodo,
|
|
sraw.topologia,
|
|
sraw.nombre_atc,
|
|
sraw.nombre_cliente,
|
|
sraw.operador,
|
|
sraw.partido_despliegue,
|
|
sraw.cabecera,
|
|
sraw.nombre_produto,
|
|
sraw.fecha_creacion,
|
|
sraw.latitud,
|
|
sraw.longitud,
|
|
sraw.direccion,
|
|
sraw.id_mslink,
|
|
sraw.id_cm,
|
|
sraw.potencia_real,
|
|
sraw.potencia_referencia,
|
|
sraw.ci_name,
|
|
sraw.tipo,
|
|
sraw.estado,
|
|
sraw.nfctag,
|
|
sraw.circuit,
|
|
whfsmb.feeder,
|
|
whfsmb.geom
|
|
from
|
|
{SCHEMA_AUDIT}.ci_sfat_mfat_bfat_raw sraw,
|
|
{SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat whfsmb
|
|
where
|
|
sraw.id_cm = whfsmb.name;"""
|
|
df_ci_sfat_mfat_bfat = geopandas.GeoDataFrame.from_postgis(
|
|
text(join_query), conn, geom_col="geom"
|
|
)
|
|
|
|
if with_geocoding:
|
|
for row in df_ci_sfat_mfat_bfat.itertuples():
|
|
df_ci_sfat_mfat_bfat.loc[
|
|
df_ci_sfat_mfat_bfat["id_nodo"] == row.id_nodo, "direccion"
|
|
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]
|
|
|
|
logger.debug("Importing data to ci_sfat_mfat_bfat....")
|
|
# copy_from_stringio(conn, df_instalaciones, "instalaciones")
|
|
|
|
df_ci_sfat_mfat_bfat["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
### RL 18042023 Comento estas lineas ya que cambio el acronimo original de los partidos (proveniente de CM)
|
|
"""df_ci_sfat_mfat_bfat["partido_despliegue"] = df_ci_sfat_mfat_bfat[
|
|
"partido_despliegue"
|
|
].apply(translate_descr_to_poleno)
|
|
"""
|
|
|
|
###RF 27032023 Comento esta linea porque no hace falta extrapolar el nombre cliente ya que viene de CM
|
|
"""
|
|
df_ci_sfat_mfat_bfat["nombre_cliente"] = df_ci_sfat_mfat_bfat.apply(
|
|
apply_convert_cname_atc_to_om, axis=1
|
|
)
|
|
"""
|
|
|
|
|
|
df_ci_sfat_mfat_bfat["nombre_op"] = df_ci_sfat_mfat_bfat.apply(
|
|
apply_generate_nombre_op, axis=1
|
|
)
|
|
df_ci_sfat_mfat_bfat["ci_name"] = df_ci_sfat_mfat_bfat["nombre_op"]
|
|
df_ci_sfat_mfat_bfat.drop_duplicates(
|
|
subset="id_cm", keep="first", inplace=True, ignore_index=True
|
|
)
|
|
conn.commit()
|
|
df_ci_sfat_mfat_bfat.to_postgis(
|
|
"ci_sfat_mfat_bfat",
|
|
conn,
|
|
schema="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geometry": Geometry("POINT", srid=4326)},
|
|
)
|
|
|
|
logger.debug("ci_sfat_mfat_bfat data imported successfully")
|
|
|
|
|
|
def import_cm_ci_headend_table(
|
|
conn, path, df_wkt_network, with_geocoding=False
|
|
) -> None:
|
|
logger = logging.getLogger()
|
|
logger.info("Processing ci_headend csv...")
|
|
|
|
df_ci_headend = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
encoding="utf-8",
|
|
keep_default_na=False,
|
|
index_col=False,
|
|
na_values=na_values,
|
|
sep=";",
|
|
)
|
|
# Fixing double quotes on headers
|
|
df_ci_headend.columns = df_ci_headend.columns.str.strip('"')
|
|
|
|
# Fixing uppercase headers
|
|
df_ci_headend.columns = df_ci_headend.columns.str.lower()
|
|
df_ci_headend["name"] = df_ci_headend["id_cm"]
|
|
|
|
df_ci_headend_merged = pd.merge(df_ci_headend, df_wkt_network, on="name")
|
|
|
|
if with_geocoding:
|
|
for row in df_ci_headend.itertuples():
|
|
df_ci_headend.loc[
|
|
df_ci_headend["id_cabecera"] == row.id_cabecera, "direccion"
|
|
] = reverse_geocode([row.longituid, row.latitud])["address"]["Address"]
|
|
|
|
df_ci_headend_merged.drop(
|
|
["tipo_y", "description", "owner", "feeder", "tecnologia"], axis=1, inplace=True
|
|
)
|
|
df_ci_headend_merged.rename(
|
|
columns={"tipo_x": "tipo", "partido_x": "partido_despliegue"}, inplace=True
|
|
)
|
|
|
|
df_ci_headend_merged["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logger.debug("Importing data to ci_headend....")
|
|
|
|
gdf = gpd.GeoDataFrame(df_ci_headend_merged, crs="EPSG:4326", geometry="geom")
|
|
|
|
gdf.to_postgis(
|
|
"ci_headend",
|
|
conn,
|
|
schema="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
dtype={"geom": Geometry("POINT", srid=4326)},
|
|
)
|
|
|
|
logger.debug("ci_headend data imported successfully")
|
|
|
|
|
|
def import_cm_report_isp(conn, path):
|
|
df_risp = pd.read_csv(
|
|
path, keep_default_na=False, encoding="latin1", na_values=na_values, sep=";"
|
|
)
|
|
|
|
# Fixing double quotes on headers
|
|
df_risp.columns = df_risp.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_risp.columns = df_risp.columns.str.lower()
|
|
|
|
df_risp["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug("Importing data to cm_report_isp....")
|
|
|
|
|
|
sql_write(
|
|
conn,
|
|
df_risp,
|
|
table_name="cm_report_isp",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
|
|
logging.debug("cm_report_isp data imported successfully")
|
|
|
|
|
|
def import_report_isp_osp(conn, path):
|
|
df_rio = pd.read_csv(
|
|
path, keep_default_na=False, encoding="latin1", sep=";", na_values=na_values
|
|
)
|
|
|
|
# Fixing double quotes on headers
|
|
df_rio.columns = df_rio.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_rio.columns = df_rio.columns.str.lower()
|
|
|
|
df_rio["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug("Importing data to df_ioo....")
|
|
df_rio = df_rio.convert_dtypes()
|
|
|
|
sql_write(
|
|
conn,
|
|
df_rio,
|
|
table_name="report_isp_osp",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
|
|
logging.debug("report_isp_osp data imported successfully")
|
|
|
|
|
|
def import_report_e2e(conn, path):
|
|
df_report_e2e = pd.read_csv(
|
|
path, keep_default_na=False, encoding="latin-1", na_values=na_values, sep=","
|
|
)
|
|
|
|
# Fixing double quotes on headers
|
|
df_report_e2e.columns = df_report_e2e.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_report_e2e.columns = df_report_e2e.columns.str.lower()
|
|
df_report_e2e["location_name"].str.strip('" ?\n\t')
|
|
df_report_e2e["location_description"].str.strip('" ?\n\t')
|
|
df_report_e2e["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug("Importing data to df_report_e2e....")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_report_e2e,
|
|
table_name="report_e2e",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logging.debug("report_e2e data imported successfully")
|
|
|
|
|
|
def import_inventory_fat_occupation(conn, path):
|
|
#RF 06/03/2023 cambio el separador a pipe (acordado con IT) porque a veces vienen comas decimales en las coordenadas
|
|
df_ifo = pd.read_csv(
|
|
path,
|
|
engine="python",
|
|
keep_default_na=False,
|
|
encoding="latin1",
|
|
quoting=csv.QUOTE_ALL,
|
|
na_values=na_values,
|
|
sep="|",
|
|
)
|
|
# Fixing uppercase headers
|
|
df_ifo.columns = df_ifo.columns.str.lower()
|
|
df_ifo["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug("Importing data to df_ifo....")
|
|
df_ifo = df_ifo.convert_dtypes()
|
|
|
|
#RF 14/1/2023- Cambiado el formateo de fechas con linea coerce para evitar los errores de fechas cuando viene '0' (que no debería)
|
|
|
|
df_ifo["reserved_date"] = (
|
|
pd.to_datetime(df_ifo["reserved_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
df_ifo["provided_date"] = (
|
|
pd.to_datetime(df_ifo["provided_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
df_ifo["cease_date"] = (
|
|
pd.to_datetime(df_ifo["cease_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
|
|
sql_write(
|
|
conn,
|
|
df_ifo,
|
|
table_name="inventory_fat_occupation",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logging.debug("inventory_fat_occupation data imported successfully")
|
|
|
|
|
|
def import_inventory_olt_occupation(conn, path):
|
|
|
|
# La lectura la hago en chunks porque es un csv muy grande
|
|
chunks = []
|
|
for chunk in pd.read_csv(path, keep_default_na=False, encoding="latin1", sep=";", na_values=na_values, chunksize=100000): #Ajustar chunksize segun se necesite
|
|
chunk.drop_duplicates(inplace=True)
|
|
chunks.append(chunk)
|
|
|
|
df_ioo = pd.concat(chunks, ignore_index=True)
|
|
df_ioo.drop_duplicates(inplace=True)
|
|
|
|
# Fixing double quotes on headersk8ukmjiugnm6gt64y8r,l
|
|
df_ioo.columns = df_ioo.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_ioo.columns = df_ioo.columns.str.lower()
|
|
df_ioo["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
|
|
logging.debug(f"csv file {path} - Importing data to df_ioo....")
|
|
df_ioo = df_ioo.convert_dtypes()
|
|
#RF 30/01/2023. El access id es un numero y se usa como string. Python a veces lo toma como float
|
|
#y al convertirlo a string le agrega .0 al final. Se lo quito
|
|
df_ioo["access_id"]=df_ioo["access_id"].astype(str).str.removesuffix(".0")
|
|
df_ioo["fat_port_access_id"]=df_ioo["fat_port_access_id"].astype(str).str.removesuffix(".0")
|
|
|
|
#RF 14/1/2023- Cambiado el formateo de fechas con linea coerce para evitar los errores de fechas cuando viene '0' (que no debería)
|
|
df_ioo["fat_port_reserved_date"] = (
|
|
pd.to_datetime(df_ioo["fat_port_reserved_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
df_ioo["fat_port_provid_date"] = (
|
|
pd.to_datetime(df_ioo["fat_port_provid_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
df_ioo["fat_port_cease_date"] = (
|
|
pd.to_datetime(df_ioo["fat_port_cease_date"],errors='coerce')
|
|
.dt.strftime("%m/%d/%Y %H:%M:%S")
|
|
.str.strip(" \r\n\t")
|
|
)
|
|
sql_write(
|
|
conn,
|
|
df_ioo,
|
|
table_name="inventory_olt_occupation",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logging.debug("inventory_olt_occupation data imported successfully")
|
|
|
|
|
|
def import_inventory_mdu_bfat(conn, path):
|
|
df_imf = pd.read_csv(
|
|
path, keep_default_na=False, encoding="latin1", sep="|", na_values=na_values
|
|
)
|
|
|
|
# Fixing double quotes on headersk8ukmjiugnm6gt64y8r,l
|
|
df_imf.columns = df_imf.columns.str.strip('"')
|
|
# Fixing uppercase headers
|
|
df_imf.columns = df_imf.columns.str.lower()
|
|
|
|
df_imf["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
logging.debug("Importing data to df_imf....")
|
|
|
|
sql_write(
|
|
conn,
|
|
df_imf,
|
|
table_name="inventory_mdu_bfat",
|
|
schema_name="cm",
|
|
if_exists="replace",
|
|
index=False,
|
|
method=psql_insert_copy,
|
|
)
|
|
logging.debug("inventory_mdu_bfat data imported successfully")
|
|
|
|
|
|
|
|
|
|
def load_cm_tables():
|
|
|
|
with engine.connect() as conn:
|
|
|
|
###RF 20/5/2025 - Agregada la importacion del wkt_fibra
|
|
import_wkt_fibra_raw(conn, wkt_fibra_path)
|
|
conn.commit()
|
|
|
|
import_cm_wkt_headend_fosc_sfat_mfat_bfat_table(
|
|
conn, wkt_headend_fosc_sfat_mfat_bfat_path
|
|
)
|
|
import_cm_wkt_op_raw_table(conn, wkt_op_path)
|
|
import_wkt_cables_table(conn, wkt_cables_path)
|
|
import_cm_ci_sfat_mfat_bfat_raw_table(conn, ci_sfat_mfat_bfat_path)
|
|
import_cm_ci_fosc_raw_table(conn, ci_fosc_path,ci_site_path)
|
|
import_cm_ci_site_table(conn, ci_site_path)
|
|
import_cm_ci_op_raw_table(conn, ci_op_path)
|
|
import_cm_ci_feeder_distribution_raw_table(conn, ci_feederd_path)
|
|
|
|
conn.commit()
|
|
df_wkt_network = sql_read(
|
|
conn,
|
|
f"select * from {SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat",
|
|
geom_col="geom",
|
|
)
|
|
# # # Reducing footprint
|
|
df_wkt_network = df_wkt_network.convert_dtypes()
|
|
df_wkt_network["tipo"] = df_wkt_network["tipo"].astype("category")
|
|
df_wkt_network["tecnologia"] = df_wkt_network["tecnologia"].astype("category")
|
|
df_wkt_network["partido_despliegue"] = df_wkt_network[
|
|
"partido_despliegue"
|
|
].astype("category")
|
|
df_wkt_network["owner"] = df_wkt_network["owner"].astype("category")
|
|
conn.commit()
|
|
import_cm_ci_headend_table(conn, ci_headend_path, df_wkt_network)
|
|
conn.commit()
|
|
import_cm_ci_sfat_mfat_bfat_table(conn)
|
|
conn.commit()
|
|
import_cm_wkt_op_table(conn)
|
|
conn.commit()
|
|
import_cm_ci_fosc_table(conn)
|
|
conn.commit()
|
|
import_cm_ci_mdu_table(conn, ci_mdu_path)
|
|
conn.commit()
|
|
import_cm_ci_olt_table(conn, ci_olt_path)
|
|
conn.commit()
|
|
import_cm_ci_op_table(conn)
|
|
conn.commit()
|
|
import_cm_ci_feeder_distribution_table(conn)
|
|
conn.commit()
|
|
import_cm_ci_splitter_table(conn, ci_splitter_path)
|
|
conn.commit()
|
|
import_cm_report_isp(conn, report_isp_path)
|
|
conn.commit()
|
|
import_report_e2e(conn, report_e2e_path)
|
|
conn.commit()
|
|
import_report_fusiones(conn, report_fusiones_path)
|
|
conn.commit()
|
|
import_inventory_fat_occupation(conn, ifo_path)
|
|
conn.commit()
|
|
###RF 16/5/2025 - Comento la siguiente tabla porque tiene errores de la exportacion desde CM
|
|
import_inventory_olt_occupation(conn, ioo_path)
|
|
conn.commit()
|
|
import_report_isp_osp(conn, rio_path)
|
|
conn.commit()
|
|
import_inventory_mdu_bfat(conn, imf_path)
|
|
conn.commit()
|
|
###RF 20/5/2025 - Agregada la importacion del cm_inventory_cable_occupation(conn) que es un query sobre el wkt_fibra_raw
|
|
import_cm_inventory_cable_occupation(conn)
|
|
conn.commit()
|
|
#import_cm_ci_fosc_table_uat(conn)
|
|
#conn.commit()
|
|
|
|
|
|
|
|
@functools.lru_cache
|
|
|
|
|
|
|
|
def parse_odd_co_names(co_name) -> str:
|
|
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
|
|
co_name=str(co_name)
|
|
# EDN CO have another format EX: EDN_CT_MORON_HURLINGHAM
|
|
if "EDN" in co_name:
|
|
return "EDN"
|
|
elif ("IPRAN" in co_name) or ("RAMX" in co_name):
|
|
return "AMX"
|
|
elif ("RATC" in co_name) or ("ANILLO GPON ITX" in co_name):
|
|
return "ATC"
|
|
elif ("ANILLO ME" in co_name) or ("ANILLO GP" in co_name) or ("ANILLO_ME" in co_name) or ("BBIP" in co_name):
|
|
return "AMX"
|
|
else:
|
|
chunks = co_name.split("_")
|
|
lchunks = len(chunks)
|
|
if lchunks < 2:
|
|
chunks = co_name.split("-")
|
|
lchunks = len(chunks)
|
|
if lchunks == 3 and "R" in chunks[1]:
|
|
return "AMX"
|
|
|
|
return None
|
|
|
|
if "CL" in chunks[1]:
|
|
return "AMX"
|
|
elif "CYC" in chunks[1]:
|
|
return "ATC"
|
|
elif "MV" in chunks[1]:
|
|
return "MOV"
|
|
elif "TP" in chunks[1]:
|
|
return "TCP"
|
|
else:
|
|
return chunks[1]
|
|
|
|
|
|
def parse_odd_co_names_for_cab(co_name) -> str:
|
|
|
|
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
|
|
co_name=str(co_name)
|
|
|
|
# EDN CO have another format EX: EDN_CT_MORON_HURLINGHAM
|
|
|
|
|
|
if "EDN" in co_name:
|
|
return "TBD"
|
|
elif "IPRAN" in co_name:
|
|
return "TBD"
|
|
elif "RAMX" in co_name:
|
|
chunks = co_name.split("-")
|
|
lchunks = len(chunks)
|
|
if lchunks < 2:
|
|
return "TDB"
|
|
return chunks[0]
|
|
elif "RATC" in co_name:
|
|
chunks = co_name.split("-")
|
|
lchunks = len(chunks)
|
|
if lchunks < 2:
|
|
return "TDB"
|
|
return chunks[0]
|
|
elif "ANILLO GPON ITX" in co_name:
|
|
chunks = co_name.split(" ")
|
|
lchunks = len(chunks)
|
|
if lchunks < 4:
|
|
return "TDB"
|
|
return chunks[3][:5]
|
|
elif "ANILLO GP" in co_name:
|
|
return "AMX"
|
|
elif "ANILLO ME" in co_name:
|
|
return "AMX"
|
|
elif "ANILLO_ME" in co_name:
|
|
return "AMX"
|
|
elif "BBIP" in co_name:
|
|
return "AMX"
|
|
else:
|
|
chunks = co_name.split("_")
|
|
lchunks = len(chunks)
|
|
if lchunks < 2:
|
|
chunks = co_name.split("-")
|
|
lchunks = len(chunks)
|
|
if lchunks == 3 and "R" in chunks[1]:
|
|
return chunks[0]
|
|
|
|
return "TDB"
|
|
|
|
return chunks[0]
|
|
|
|
|
|
def set_co_operador(co_name) -> str:
|
|
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
|
|
co_name=str(co_name)
|
|
mask = pd_co_nd["osname"].values == co_name
|
|
if mask.any():
|
|
return pd_co_nd[mask]["operador"].iloc[0]
|
|
else:
|
|
return parse_odd_co_names(co_name)
|
|
|
|
|
|
def set_co_cab(co_name) -> str:
|
|
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
|
|
co_name=str(co_name)
|
|
mask = pd_co_nd["osname"].values == co_name
|
|
if mask.any():
|
|
return pd_co_nd[mask]["cab"].iloc[0]
|
|
else:
|
|
return parse_odd_co_names_for_cab(co_name)
|
|
|
|
|
|
def apply_set_co_operador(co):
|
|
return set_co_operador(co["osname"])
|
|
|
|
|
|
def apply_set_co_cab(co):
|
|
return set_co_cab(co["osname"])
|
|
|
|
|
|
def convert_co_atc_to_om(operador, co_name) -> str:
|
|
|
|
# if operador != "AMX":
|
|
# return co_name
|
|
|
|
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
|
|
co_name=str(co_name)
|
|
|
|
if operador == "AMX":
|
|
chunks = co_name.split("-")
|
|
count_chunks = len(chunks)
|
|
if count_chunks < 4 or 4 > count_chunks:
|
|
return co_name
|
|
|
|
co_fixed = f"{chunks[0]}-R{chunks[3][-4:]}-001"
|
|
elif operador == "ATC":
|
|
if "PATH" in co_name: # Is a CM name ES010101-PATH-03-000297-0000
|
|
chunks = co_name.split("-")
|
|
if count_chunks == 5:
|
|
co_fixed = f"{chunks[0][0:4]}-RATC-0-{chunks[3]}"
|
|
else:
|
|
co_fixed = co_name
|
|
else:
|
|
co_fixed = co_name
|
|
else:
|
|
co_fixed = co_name
|
|
|
|
return co_fixed
|
|
|
|
|
|
def convert_cname_atc_to_om(operador, cname) -> str:
|
|
if operador != "AMX":
|
|
return cname
|
|
|
|
if operador == "AMX" and cname:
|
|
chunks = cname.split("-")
|
|
|
|
count_chunks = len(chunks)
|
|
if count_chunks < 4 or 4 > count_chunks:
|
|
return cname
|
|
|
|
cname_fixed = f"{chunks[0]}-R{chunks[3][-4:]}-001"
|
|
|
|
return cname_fixed
|
|
else:
|
|
return cname
|
|
|
|
|
|
def generate_nombre_op(operator, atc_name, customer_name) -> str:
|
|
if operator == "AMX":
|
|
return customer_name
|
|
else:
|
|
return atc_name
|
|
|
|
|
|
def apply_convert_co_atc_to_om(co):
|
|
return convert_co_atc_to_om(co["operador"], co["osname"])
|
|
|
|
|
|
def apply_convert_cname_atc_to_om(cname):
|
|
return convert_cname_atc_to_om(cname["operador"], cname["nombre_cliente"])
|
|
|
|
|
|
def apply_generate_nombre_op(fat):
|
|
return generate_nombre_op(fat["operador"], fat["nombre_atc"], fat["nombre_cliente"])
|
|
|
|
|
|
|
|
def create_grant_tables() -> None:
|
|
logger = logging.getLogger()
|
|
|
|
with engine.connect() as conn:
|
|
#conn.execute(text("""REVOKE ALL ON ALL TABLES IN SCHEMA public FROM PUBLIC;"""))
|
|
conn.execute(
|
|
text(
|
|
"""GRANT CREATE,USAGE ON SCHEMA cm,audit TO postgres;"""
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"""GRANT USAGE ON SCHEMA cm , audit TO om_read;"""
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"""GRANT SELECT ON ALL TABLES IN SCHEMA audit,cm TO om_read;"""
|
|
)
|
|
)
|
|
conn.commit()
|
|
logger.debug("\t*** Grants given to all tables successfully in PostgreSQL ")
|
|
|
|
def create_grant_tables_extra() -> None:
|
|
logger = logging.getLogger()
|
|
|
|
with engine.connect() as conn:
|
|
#conn.execute(text("""REVOKE ALL ON ALL TABLES IN SCHEMA public FROM PUBLIC;"""))
|
|
|
|
|
|
|
|
|
|
conn.execute(
|
|
text(
|
|
"""GRANT USAGE ON SCHEMA cm , audit TO jibanez;"""
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"""GRANT SELECT ON ALL TABLES IN SCHEMA audit,cm TO jibanez;"""
|
|
)
|
|
)
|
|
conn.commit()
|
|
logger.debug("\t*** Grants given to all tables successfully in PostgreSQL ")
|
|
|
|
if __name__ == "__main__":
|
|
|
|
try:
|
|
|
|
log_file_path = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "log.ini"
|
|
)
|
|
|
|
logging.config.fileConfig(log_file_path, disable_existing_loggers=False)
|
|
|
|
logger = logging.getLogger("main")
|
|
|
|
process_files()
|
|
setup_connection()
|
|
recreate_database()
|
|
|
|
with CodeTimer("load_cm_data"):
|
|
load_cm_tables()
|
|
|
|
create_grant_tables()
|
|
create_grant_tables_extra()
|
|
log_final_en_postgres()
|
|
except Exception:
|
|
logging.error(f"uncaught exception: {traceback.format_exc()}")
|
|
finally:
|
|
logging.info(msg="PostgreSQL conn is closed")
|