Project

General

Profile

Tareas #27 » loader_DK_13032026.py

Demo MiGestion365 Admin, 03/16/2026 07:33 PM

 
import geopandas as gpd
import geopandas
import sys
import csv
from datetime import datetime
import functools
from io import StringIO
import logging
from logging import config
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from geoalchemy2 import Geometry
from config import (
DATABASE_URI,
folderPath_Local,
wkt_headend_fosc_sfat_mfat_bfat_path,
wkt_op_path,
wkt_cables_path,
ci_sfat_mfat_bfat_path,
ci_fosc_path,
ci_site_path,
ci_op_path,
ci_feederd_path,
ci_headend_path,
ci_mdu_path,
ci_olt_path,
ci_splitter_path,
report_isp_path,
report_e2e_path,
report_fusiones_path,
ifo_path,
ioo_path,
rio_path,
imf_path,
wkt_fibra_path,
)
from config import na_values
import os
import traceback
from arcgis.geocoding import reverse_geocode
import shutil

from shapely import wkt
from linetimer import CodeTimer
import warnings
from process_files import process_files

# ==== AMBIENTES Y ESQUEMAS (Paso 1) =========================================
import os

# 1) Flags por variable de entorno
TARGET_ENV = os.getenv("TARGET_ENV", "prod").strip().lower() # 'prod' | 'stg'
FULL_RELOAD = os.getenv("FULL_RELOAD", "true").strip().lower() == "true"
DRY_RUN = os.getenv("DRY_RUN", "false").strip().lower() == "true"

# 2) URIs según ambiente (se asume que config expone ambas; si no, usar env vars)
try:
from config import DATABASE_URI_STAGING # agregar en tu config.py
except Exception:
DATABASE_URI_STAGING = os.getenv("DATABASE_URI_STAGING", "")

# 3) Resolución de esquemas según ambiente
def resolve_schema(schema_name: str) -> str:
"""
Retorna el esquema efectivo según el ambiente.
- En prod: 'audit' / 'cm'
- En stg: 'audit_stg' / 'cm_stg'
"""
schema_name = schema_name.strip().lower()
if TARGET_ENV == "stg":
if schema_name == "audit":
return "audit_stg"
if schema_name == "cm":
return "cm_stg"
return schema_name # prod tal cual

SCHEMA_AUDIT = resolve_schema("audit")
SCHEMA_CM = resolve_schema("cm")

# 4) Ayudantes para centralizar I/O con esquema resuelto
def sql_write(conn, df, table_name, schema_name, if_exists="replace", index=False, method=None, dtype=None):
if DRY_RUN:
logging.getLogger("dryrun").info(f"[DRY_RUN] to_sql {schema_name}.{table_name} rows={len(df)}")
return
effective_schema = resolve_schema(schema_name)
df.to_sql(
table_name,
conn,
schema=effective_schema,
if_exists=if_exists,
index=index,
method=method,
dtype=dtype
)

def sql_write_gdf(conn, gdf, table_name, schema_name, if_exists="replace", index=False, dtype=None):
if DRY_RUN:
logging.getLogger("dryrun").info(f"[DRY_RUN] to_postgis {schema_name}.{table_name} rows={len(gdf)}")
return
effective_schema = resolve_schema(schema_name)
gdf.to_postgis(
table_name,
conn,
schema=effective_schema,
if_exists=if_exists,
index=index,
dtype=dtype
)

def sql_read(conn, query_text, geom_col=None):
"""
Lee SQL usando el esquema ya resuelto en la query si usas f-strings con SCHEMA_AUDIT/SCHEMA_CM.
Si necesitas GDF, pasa geom_col != None y usa geopandas.
"""
if geom_col:
return geopandas.GeoDataFrame.from_postgis(text(query_text), conn, geom_col=geom_col)
return pd.read_sql_query(text(query_text), conn)
# =============================================================================

warnings.filterwarnings("ignore")


engine = None
pd_co_nd = None
session = sessionmaker(engine, future=True)

# Setup_connection original
# def setup_connection():
# global engine
# if engine is None:
# engine = create_engine(
# DATABASE_URI, pool_size=100, max_overflow=50, future=True
# )

# ======= Nuevo setup_connection con stg
def setup_connection():
global engine
if engine is None:
db_uri = DATABASE_URI # default
if TARGET_ENV == "stg":
if not DATABASE_URI_STAGING:
raise RuntimeError("DATABASE_URI_STAGING no está definido (env o config).")
db_uri = DATABASE_URI_STAGING
engine = create_engine(db_uri, pool_size=100, max_overflow=50, future=True)
# =======

def log_final_en_postgres():
cnx=engine.raw_connection()
cursor=cnx.cursor()
cursor.execute("""
INSERT INTO z_procesos_python.scripts_ejecutados (nombre_script, ultima_ejecucion)
VALUES ('Loader', CURRENT_TIMESTAMP AT TIME ZONE 'America/Argentina/Buenos_Aires');
""")
cnx.commit()


def recreate_database():
# Base.metadata.drop_all(engine)
# Base.metadata.create_all(engine)
pass


# define a function that handles and parses psycopg2 exceptions
def print_psycopg2_exception(err):
# get details about the exception
err_type, err_obj, traceback = sys.exc_info()

# get the line number when exception occured
line_num = traceback.tb_lineno

# print the connect() error
logging.error("\npsycopg2 ERROR:", err, "on line number:", line_num)
logging.error("psycopg2 traceback:", traceback, "-- type:", err_type)

# psycopg2 extensions.Diagnostics object attribute
print("\nextensions.Diagnostics:", err.diag)

# print the pgcode and pgerror exceptions
logging.error("pgerror:", err.pgerror)
logging.error("pgcode:", err.pgcode, "\n")


def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data

Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)

columns = ", ".join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = "{}.{}".format(table.schema, table.name)
else:
table_name = table.name

sql = "COPY {} ({}) FROM STDIN WITH CSV".format(table_name, columns)
try:
cur.copy_expert(sql=sql, file=s_buf)
except Exception as err:
print_psycopg2_exception(err)


def to_sql(conn, df, table, dtype="", if_exists="replace", sep="|", encoding="utf8"):
# Create Table
df[:0].to_sql(table, engine, if_exists=if_exists, index=False, dtype=dtype)

# Prepare data
output = StringIO()
df.to_csv(output, index=False, sep=sep, header=False, encoding=encoding)
output.seek(0)

# Insert data

cursor = conn.cursor()
cursor.copy_from(output, table, sep=sep, null="")
conn.commit()
cursor.close()

def convert(val):

if val in na_values:
return 0
return val


@functools.lru_cache
def convert_str(val):

if val in na_values:
return ""
return val.strip().strip("\r\n")


def import_wkt_fibra_raw(conn, path) -> None:
logger = logging.getLogger("import_cm_wkt_fibra_raw_table")
logger.info("Processing wkt_fibra csv...")
df_wkt = pd.read_csv(
path,
engine="python",
encoding="unicode_escape",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# headers
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()

def wkt_loads(x):
try:
return wkt.loads(x)
except Exception as e:
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
return wkt.loads("MULTILINESTRING EMPTY")

df_wkt["geom"] = df_wkt["geom"].apply(wkt_loads)
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")

gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")

# ⬇️ nuevo: helper + esquema resuelto
sql_write_gdf(
conn,
gdf,
"wkt_fibra_raw",
"audit",
if_exists="replace",
index=False,
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)
logger.debug("wkt_fibra data imported successfully")

def import_cm_inventory_cable_occupation(conn, with_geocoding=False) -> None:
logger = logging.getLogger("import_cm_wkt_op_table")
join_query = f"""
SELECT wfr."cable name",wfr."cable descrp","tipo cable", wfr.fibra, wfr."ruta fisica",
wfr."circuito",wfr."user by", wfr.owner, partido, asset, cnt_crossw,
ST_Union(geom) AS geom,
ST_length(ST_Union(geom)::geography)
FROM {SCHEMA_AUDIT}.wkt_fibra_raw wfr
GROUP BY wfr."cable name",wfr."cable descrp","tipo cable", wfr.fibra, wfr."ruta fisica",
wfr."circuito",wfr."user by", wfr.owner, partido, asset, cnt_crossw;
"""
logger.debug("Obteniendo datos desde PostgreSQL...")

df_wkt_op = pd.read_sql_query(text(join_query), conn)

logger.debug("Importando datos a inventory_cable_occupation....")

df_wkt_op["upload_date"] = datetime.today().strftime("%d-%m-%Y")
sql_write(
conn,
df_wkt_op,
"inventory_cable_occupation",
"cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)


logger.debug("Datos de inventory_cable_occupation importados exitosamente")

def import_cm_wkt_op_raw_table(conn, path) -> None:
logger = logging.getLogger("import_cm_wkt_op_raw_table")
logger.info("Processing wkt_op csv...")

df_wkt = pd.read_csv(
path,
engine="python",
encoding="unicode_escape",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)

def wkt_loads(x):
try:
return wkt.loads(x)
except Exception as e:
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
return wkt.loads("MULTILINESTRING EMPTY")

df_wkt["geom"] = df_wkt["geom"].apply(wkt_loads)
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")

# ⬇️ nuevo: helper + esquema resuelto
sql_write_gdf(
conn,
gdf,
"wkt_op_raw",
"audit",
if_exists="replace",
index=False,
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)
logger.debug("wkt_op_raw data imported successfully")

def import_cm_wkt_op_table(conn, with_geocoding=False) -> None:
logger = logging.getLogger("import_cm_wkt_op_table")
join_query = f"""
SELECT osname, cables, ST_Union(geom) AS geom, circuit
FROM {SCHEMA_AUDIT}.wkt_op_raw
GROUP BY osname, cables, circuit
"""
logger.debug("Obteniendo datos desde PostgreSQL...")

df_wkt_op = pd.read_sql_query(text(join_query), conn)

logger.debug("Importando datos a wkt_op....")

df_wkt_op["upload_date"] = datetime.today().strftime("%d-%m-%Y")
sql_write(
conn, df_wkt_op, "wkt_op", "audit",
if_exists="replace", index=False,
method=psql_insert_copy, dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)

logger.debug("Datos de wkt_op importados exitosamente")


def import_report_fusiones(conn, path):

df_report_fusiones = pd.read_csv(
path, keep_default_na=False, encoding="latin1", na_values=na_values, sep="|"
)
# Fixing double quotes on headers
df_report_fusiones.columns = df_report_fusiones.columns.str.strip('"')
# Fixing uppercase headers
df_report_fusiones.columns = df_report_fusiones.columns.str.lower()
df_report_fusiones["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug("Importing data to df_report_fusiones....")


sql_write(
conn,
df_report_fusiones,
table_name="report_fusiones",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)


logging.debug("df_report_fusiones data imported successfully")




def import_report_network_splitter_ocupation(conn, path):
report_network_splitter_ocupation = pd.read_excel(
path, sheet_name=r"Export Worksheet", usecols="A:T"
)
# Fixing double quotes on headers
report_network_splitter_ocupation.columns = (
report_network_splitter_ocupation.columns.str.strip('"')
)
# Fixing uppercase headers
report_network_splitter_ocupation.columns = (
report_network_splitter_ocupation.columns.str.lower()
)
logging.debug("Importing data to df_report_e2e....")

sql_write(
conn,
report_network_splitter_ocupation,
table_name="network_splitter_ocupation",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logging.debug("report_network_splitter_ocupation data imported successfully")


def import_wkt_cables_table(conn, path) -> None:
logger = logging.getLogger("import_wkt_cables_table")
logger.info("Processing wkt_cables csv...")

df_wkt = pd.read_csv(
path,
engine="python",
encoding="cp1252",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers + lowercase (igual al original, compactado)
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()

df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)

# Mismo filtro de vacíos y parser WKT que el original
df_wkt["wkt"].dropna(inplace=True)

def wkt_loads(x):
try:
return wkt.loads(x)
except Exception as e:
logger.warning(f"validate_wkt - Error in WKT {e} - {x}")
return wkt.loads("POINT EMPTY")

df_wkt["geom"] = df_wkt["wkt"].apply(wkt_loads)
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
df_wkt.drop(["wkt"], axis=1, inplace=True)

gdf = gpd.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")

# ⬇️ nuevo: helper + esquema resuelto (audit → audit_stg si TARGET_ENV=stg)
sql_write_gdf(
conn,
gdf,
"wkt_cables",
"audit",
if_exists="replace",
index=False,
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)
logger.debug("wkt_cables data imported successfully")


def import_cm_ci_sfat_mfat_bfat_raw_table(conn, path, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_sfat_mfat_bfat csv...")

df_ci_sfat_mfat_bfat = pd.read_csv(
path,
engine="python",
encoding="unicode_escape",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)

# Fixing double quotes on headers
df_ci_sfat_mfat_bfat.columns = df_ci_sfat_mfat_bfat.columns.str.strip('"')

# Fixing uppercase headers
df_ci_sfat_mfat_bfat.columns = df_ci_sfat_mfat_bfat.columns.str.lower()
df_ci_sfat_mfat_bfat.drop("partido", axis=1, inplace=True)

df_ci_sfat_mfat_bfat.rename(
columns={"partido_fisico": "partido_despliegue"}, inplace=True
)
# Reducing footprint
df_ci_sfat_mfat_bfat["description"] = df_ci_sfat_mfat_bfat["nombre_atc"]
df_ci_sfat_mfat_bfat = df_ci_sfat_mfat_bfat.convert_dtypes()
df_ci_sfat_mfat_bfat["topologia"] = df_ci_sfat_mfat_bfat["topologia"].astype(
"category"
)
df_ci_sfat_mfat_bfat["operador"] = df_ci_sfat_mfat_bfat["operador"].astype(
"category"
)
# df_ci_sfat_mfat_bfat['partido_cabecera'] =
# df_ci_sfat_mfat_bfat['partido_cabecera'].astype('category')
df_ci_sfat_mfat_bfat["partido_despliegue"] = df_ci_sfat_mfat_bfat[
"partido_despliegue"
].astype("category")
df_ci_sfat_mfat_bfat["cabecera"] = df_ci_sfat_mfat_bfat["cabecera"].astype(
"category"
)
df_ci_sfat_mfat_bfat["tipo"] = df_ci_sfat_mfat_bfat["tipo"].astype("category")
df_ci_sfat_mfat_bfat["nombre_produto"] = df_ci_sfat_mfat_bfat[
"nombre_produto"
].astype("category")
df_ci_sfat_mfat_bfat["upload_date"] = datetime.today().strftime("%m-%d-%Y")

if with_geocoding:
for row in df_ci_sfat_mfat_bfat.itertuples():
df_ci_sfat_mfat_bfat.loc[
df_ci_sfat_mfat_bfat["id_nodo"] == row.id_nodo, "direccion"
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]

logger.debug("Importing data to ci_sfat_mfat_bfat_raw....")

sql_write(
conn,
df_ci_sfat_mfat_bfat,
table_name="ci_sfat_mfat_bfat_raw",
schema_name="audit",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_sfat_mfat_bfat_raw data imported successfully")


# 04102023 - RL - Se crea la instancia CI_SITE en schema audit
def import_cm_ci_site_table(conn, path, with_geocoding=False) -> None:

logger = logging.getLogger()
logger.info("Processing cm_ci_site json...")
shutil.copyfile(path, './files/file1-tmp.json')
df_ci_site = pd.read_json('./files/file1-tmp.json')
df_ci_site.reset_index(drop=True, inplace=True)

# Fixing double quotes on headers
df_ci_site.columns = df_ci_site.columns.astype(str).str.strip('"')

# Fixing uppercase headers
df_ci_site.columns = df_ci_site.columns.str.lower()
df_ci_site.drop("partido", axis=1, inplace=True)
df_ci_site.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)
df_ci_site["upload_date"] = datetime.today().strftime("%m-%d-%Y")
sql_write(
conn,
df_ci_site,
table_name="ci_site",
schema_name="audit",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_site data imported successfully")

def import_cm_ci_fosc_raw_table(conn, path, path2, with_geocoding=False) -> None:
###RF 30-05-2023 Esta tabla viene de UAT es para probar
###RF 15-06-2023 Pongo en productivo esto sacando la antigua tabla
logger = logging.getLogger()
logger.info("Processing ci_fosc json...")

shutil.copyfile(path, './files/file2-tmp.json')
df_ci_fosc = pd.read_json('./files/file2-tmp.json')
df_ci_fosc.reset_index(drop=True, inplace=True)
shutil.copyfile(path2, './files/file3-tmp.json')
df_ci_site = pd.read_json('./files/file3-tmp.json')
df_ci_site.reset_index(drop=True, inplace=True)

# Fixing double quotes on headers
df_ci_fosc.columns = df_ci_fosc.columns.astype(str).str.strip('"')
df_ci_site.columns = df_ci_site.columns.astype(str).str.strip('"')

# Fixing uppercase headers
df_ci_fosc.columns = df_ci_fosc.columns.str.lower()
df_ci_fosc.drop("partido", axis=1, inplace=True)
df_ci_fosc.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)
df_ci_site.columns = df_ci_site.columns.str.lower()
df_ci_site.drop("partido", axis=1, inplace=True)
df_ci_site.rename(columns={"partido_fisico": "partido_despliegue"}, inplace=True)

df_ci_fosc["upload_date"] = datetime.today().strftime("%m-%d-%Y")
df_ci_site["upload_date"] = datetime.today().strftime("%m-%d-%Y")

df_ci_fosc=pd.concat([df_ci_site,df_ci_fosc])

logger.debug("Importing data to ci_fosc_raw....")
# copy_from_stringio(conn, df_instalaciones, "instalaciones")

sql_write(
conn,
df_ci_fosc,
table_name="ci_fosc_raw",
schema_name="audit",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_fosc_raw data imported successfully")

# 16.12.2025 (se agregan fraw.asset y fraw.fecha_renovacion)

def import_cm_ci_fosc_table(conn, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_fosc_raw...")
join_query = f"""select
fraw.id_botella,
fraw.topologia,
fraw.nombre_atc,
fraw.nombre_cliente,
fraw.operador,
fraw.cabecera,
fraw.nombre_produto,
fraw.partido_despliegue,
fraw.fecha_creacion,
fraw.latitud,
fraw.longitud,
fraw.direccion,
fraw.id_mslink,
fraw.id_cm,
fraw.ci_name,
fraw.tipo,
fraw.fusiones,
fraw.built_date,
fraw.asset,
fraw.fecha_renovacion,
whfsmb.feeder,
whfsmb.geom
from
{SCHEMA_AUDIT}.ci_fosc_raw fraw,
{SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat whfsmb
where
fraw.id_cm = whfsmb.name
and st_isvalid(whfsmb.geom);"""
#RF 12-04-2023 Se agrega isvalid en el query para evitar malas geometrías
df_ci_fosc = geopandas.GeoDataFrame.from_postgis(
text(join_query), conn, geom_col="geom"
)

if with_geocoding:
for row in df_ci_fosc.itertuples():
df_ci_fosc.loc[
df_ci_fosc["id_botella"] == row.id_botella, "direccion"
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]

logger.debug("Importing data to ci_fosc....")

df_ci_fosc["upload_date"] = datetime.today().strftime("%m-%d-%Y")
conn.commit()
df_ci_fosc.to_postgis(
"ci_fosc",
conn,
schema="cm",
if_exists="replace",
index=False,
dtype={"geometry": Geometry("POINT", srid=4326)},
)

logger.debug("ci_fosc data imported successfully")


def import_cm_ci_mdu_table(conn, path, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_mdu csv...")

df_ci_mdu = pd.read_csv(
path,
engine="python",
encoding="latin1",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci_mdu.columns = df_ci_mdu.columns.str.strip('"')

# Fixing uppercase headers
df_ci_mdu.columns = df_ci_mdu.columns.str.lower()

if with_geocoding:
for row in df_ci_mdu.itertuples():
df_ci_mdu.loc[
df_ci_mdu["id_edificio"] == row.id_edificio, "direccion"
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]

df_ci_mdu["id_cm"] = df_ci_mdu["id_cm"].str.replace("\t", "", regex=True)
df_ci_mdu["id_cm"] = df_ci_mdu["id_cm"].str.replace("\r\n", "", regex=True)
logger.debug("Importing data to ci_mdu....")
df_ci_mdu["upload_date"] = datetime.today().strftime("%m-%d-%Y")
sql_write(
conn,
df_ci_mdu,
table_name="ci_mdu",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_mdu data imported successfully")


def import_cm_ci_olt_table(conn, path, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_olt csv...")

df_ci_olt = pd.read_csv(
path,
engine="python",
encoding="utf-8",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci_olt.columns = df_ci_olt.columns.str.strip('"')

# Fixing uppercase headers
df_ci_olt.columns = df_ci_olt.columns.str.lower()

logger.debug("Importing data to ci_olt....")

df_ci_olt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
sql_write(
conn,
df_ci_olt,
table_name="ci_olt",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_olt data imported successfully")


def import_cm_ci_op_raw_table(conn, path, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_op_raw csv...")

df_ci_op = pd.read_csv(
path,
engine="python",
encoding="latin-1",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci_op.columns = df_ci_op.columns.str.strip('"')
# Fixing uppercase headers
df_ci_op.columns = df_ci_op.columns.str.lower()
df_ci_op.rename(columns={"partido": "partido_despliegue"}, inplace=True)

df_ci_op["upload_date"] = datetime.today().strftime("%m-%d-%Y")
logger.debug("Importing data to ci_op_raw....")

# Escritura alineada a staging
sql_write(
conn,
df_ci_op,
table_name="ci_op_raw",
schema_name="audit",
if_exists="replace",
index=False,
method=psql_insert_copy,
)


def import_cm_ci_op_table(conn, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_op csv...")

#RF 11-04-2023 Agrego el filtro de is_valid para la geometria
# Query (esquema resuelto a prod/stg) + lectura
join_query = f"""select
null as nombre_op,
oraw.id_co_name,
oraw.nombre_co as nombre_sn_co,
oraw.id_segmento,
oraw.id_cm,
oraw.operador,
oraw.partido_despliegue,
oraw.cabecera,
oraw.ci_name,
oraw.nombre_co_claro,
oraw.nombre_co_atc,
oraw.topologia,
wop.cables,
wop.geom
from
{SCHEMA_AUDIT}.ci_op_raw oraw,
{SCHEMA_AUDIT}.wkt_op wop
where
oraw.id_cm = wop.osname
and st_isvalid(wop.geom);"""

df_ci_op = pd.read_sql_query(text(join_query), con=conn)

df_ci_op["nombre_op"] = df_ci_op.apply(
lambda row: row["nombre_co_atc"] if ("AMX" != row["operador"]) else row["nombre_co_claro"],
axis=1,
)
df_ci_op["upload_date"] = datetime.today().strftime("%m-%d-%Y")

sql_write(
conn,
df_ci_op,
table_name="ci_op",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
dtype={"geom": Geometry("GEOMETRY", srid=4326)},
)
logger.debug("ci_op data imported successfully")


def import_cm_ci_feeder_distribution_raw_table(conn, path) -> None:
logger = logging.getLogger()
logger.info("Processing ci_feeder_distribution_raw csv...")

df_ci_fd = pd.read_csv(
path,
engine="python",
encoding="cp1252",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci_fd.columns = df_ci_fd.columns.str.strip('"')

# Fixing uppercase headers
df_ci_fd.columns = df_ci_fd.columns.str.lower()
df_ci_fd["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logger.debug("Importing data to ci_feeder_distribution_raw....")
# copy_from_stringio(conn, df_instalaciones, "instalaciones")

sql_write(
conn,
df_ci_fd,
table_name="ci_feeder_distribution_raw",
schema_name="audit",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logger.debug("ci_feeder_distribution_raw data imported successfully")


def import_cm_ci_feeder_distribution_table(conn) -> None:
logger = logging.getLogger()
logger.info("Processing ci_feeder_distribution csv...")
# 16.12.2025 (se agregan fraw.asset y fraw.fecha_renovacion)


# DESPUÉS (esquema resuelto prod/stg)
join_query = f"""select
fraw.id_cable,
fraw.largo,
fraw.cable_type,
fraw.nombre_atc,
fraw.nombre_clilente,
fraw.operador,
fraw.partido,
fraw.cabecera,
fraw.nombre_produto,
fraw.fecha_creacion,
fraw.latitud,
fraw.longitud,
fraw.direccion,
fraw.id_mslink,
fraw.cantidad_pelos,
fraw.id_cm,
fraw.ci_name,
fraw.tipo,
fraw.built_date,
fraw.asset,
fraw.fecha_renovacion,
wcables.feeder,
wcables.geom
from
{SCHEMA_AUDIT}.ci_feeder_distribution_raw fraw,
{SCHEMA_AUDIT}.wkt_cables wcables
where
fraw.id_cm = wcables.name
and wcables.geom is not null
and st_isvalid(wcables.geom);
"""

###RF 20/03/2023
#Se agrega en el query st_geometrytype(wcables.geom)='ST_LineString' para evitar malas geometrias
##RF 11/04/2023 Se agrega isvalid geom para evitar malas geometrias
###RF 09/08/2023
### Quito del query and st_geometrytype(wcables.geom)='ST_LineString'
df_ci_fd = geopandas.GeoDataFrame.from_postgis(
text(join_query), conn, geom_col="geom"
)

logger.debug("Importing data to ci_feeder_distribution....")

df_ci_fd["upload_date"] = datetime.today().strftime("%m-%d-%Y")
conn.commit()
df_ci_fd.to_postgis(
"ci_feeder_distribution",
conn,
schema="cm",
if_exists="replace",
index=False,
dtype={"geometry": Geometry("LINESTRING", srid=4326)},
)

logger.debug("ci_feeder_distribution data imported successfully")


def import_cm_ci_splitter_table(conn, path) -> None:
logger = logging.getLogger("import_cm_ci_splitter_table")
logger.info("Processing ci_splitter_table csv...")

df_ci = pd.read_csv(
path,
engine="python",
encoding="unicode_escape",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci.columns = df_ci.columns.str.strip('"')

# Fixing uppercase headers
df_ci.columns = df_ci.columns.str.lower()

logger.debug("Importing data to ci_splitter....")
df_ci["upload_date"] = datetime.today().strftime("%m-%d-%Y")


sql_write(
conn,
df_ci,
table_name="ci_splitter",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)

logger.debug("ci_splitter data imported successfully")


def import_cm_wkt_headend_fosc_sfat_mfat_bfat_table(conn, path) -> None:
logger = logging.getLogger("import_cm_wkt_headend_fosc_sfat_mfat_bfat_table")
logger.info(f"Processing {path} ...")

df_wkt = pd.read_csv(
path,
engine="python",
encoding="unicode_escape",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
df_wkt.columns = df_wkt.columns.str.strip('"').str.lower()
df_wkt.rename(columns={"partido": "partido_despliegue"}, inplace=True)

# forzado de vacíos -> 'Point (0 0)' (igual que original)
df_wkt_vacios = df_wkt[~df_wkt["wkt"].astype(bool)]
logger.info(f"geom vacios a arreglar\n {df_wkt_vacios} ...")
df_wkt_vacios = df_wkt[df_wkt["wkt"] == 'Point ( )']
logger.info(f"geom con Point vacios a arreglar\n {df_wkt_vacios} ...")
df_wkt.loc[~df_wkt["wkt"].astype(bool), "wkt"] = 'Point (0 0)'
df_wkt.loc[df_wkt["wkt"] == 'Point ( )', "wkt"] = 'Point (0 0)'

df_wkt["geom"] = geopandas.GeoSeries.from_wkt(df_wkt["wkt"])
df_wkt["upload_date"] = datetime.today().strftime("%m-%d-%Y")
gdf = geopandas.GeoDataFrame(df_wkt, crs="EPSG:4326", geometry="geom")

# ⬇️ nuevo: helper + esquema resuelto
sql_write_gdf(
conn,
gdf,
"wkt_headend_fosc_sfat_mfat_bfat",
"audit",
if_exists="replace",
index=False,
dtype={"geom": Geometry("POINT", srid=4326)},
)
logger.debug("headend_fosc_sfat_mfat_bfat data imported successfully")


def translate_descr_to_poleno(descr) -> str:
map__descr_to_poleno = {
"ESCOBAR": "ES",
"TRES DE FEBRERO": "TF",
"VICENTE LOPEZ": "VL",
"HURLINGHAM": "HR",
"MORON": "MO",
"TIGRE": "TG",
"SAN FERNANDO": "SF",
"SAN ISIDRO": "SI",
"SAN MARTIN": "SM",
"ITUZAINGO": "IT",
"LA MATANZA": "LM",
"MALVINAS ARGENTINAS": "MV",
}

if descr in map__descr_to_poleno:
return map__descr_to_poleno[descr]
else:
return "UNK"


def import_cm_ci_sfat_mfat_bfat_table(conn, with_geocoding=False) -> None:
logger = logging.getLogger()
logger.info("Processing ci_sfat_mfat_bfat csv...")
join_query = f"""select
sraw.id_nodo,
sraw.topologia,
sraw.nombre_atc,
sraw.nombre_cliente,
sraw.operador,
sraw.partido_despliegue,
sraw.cabecera,
sraw.nombre_produto,
sraw.fecha_creacion,
sraw.latitud,
sraw.longitud,
sraw.direccion,
sraw.id_mslink,
sraw.id_cm,
sraw.potencia_real,
sraw.potencia_referencia,
sraw.ci_name,
sraw.tipo,
sraw.estado,
sraw.nfctag,
sraw.circuit,
whfsmb.feeder,
whfsmb.geom
from
{SCHEMA_AUDIT}.ci_sfat_mfat_bfat_raw sraw,
{SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat whfsmb
where
sraw.id_cm = whfsmb.name;"""
df_ci_sfat_mfat_bfat = geopandas.GeoDataFrame.from_postgis(
text(join_query), conn, geom_col="geom"
)

if with_geocoding:
for row in df_ci_sfat_mfat_bfat.itertuples():
df_ci_sfat_mfat_bfat.loc[
df_ci_sfat_mfat_bfat["id_nodo"] == row.id_nodo, "direccion"
] = reverse_geocode([row.longitud, row.latitud])["address"]["Address"]

logger.debug("Importing data to ci_sfat_mfat_bfat....")
# copy_from_stringio(conn, df_instalaciones, "instalaciones")

df_ci_sfat_mfat_bfat["upload_date"] = datetime.today().strftime("%m-%d-%Y")
### RL 18042023 Comento estas lineas ya que cambio el acronimo original de los partidos (proveniente de CM)
"""df_ci_sfat_mfat_bfat["partido_despliegue"] = df_ci_sfat_mfat_bfat[
"partido_despliegue"
].apply(translate_descr_to_poleno)
"""

###RF 27032023 Comento esta linea porque no hace falta extrapolar el nombre cliente ya que viene de CM
"""
df_ci_sfat_mfat_bfat["nombre_cliente"] = df_ci_sfat_mfat_bfat.apply(
apply_convert_cname_atc_to_om, axis=1
)
"""
df_ci_sfat_mfat_bfat["nombre_op"] = df_ci_sfat_mfat_bfat.apply(
apply_generate_nombre_op, axis=1
)
df_ci_sfat_mfat_bfat["ci_name"] = df_ci_sfat_mfat_bfat["nombre_op"]
df_ci_sfat_mfat_bfat.drop_duplicates(
subset="id_cm", keep="first", inplace=True, ignore_index=True
)
conn.commit()
df_ci_sfat_mfat_bfat.to_postgis(
"ci_sfat_mfat_bfat",
conn,
schema="cm",
if_exists="replace",
index=False,
dtype={"geometry": Geometry("POINT", srid=4326)},
)

logger.debug("ci_sfat_mfat_bfat data imported successfully")


def import_cm_ci_headend_table(
conn, path, df_wkt_network, with_geocoding=False
) -> None:
logger = logging.getLogger()
logger.info("Processing ci_headend csv...")

df_ci_headend = pd.read_csv(
path,
engine="python",
encoding="utf-8",
keep_default_na=False,
index_col=False,
na_values=na_values,
sep=";",
)
# Fixing double quotes on headers
df_ci_headend.columns = df_ci_headend.columns.str.strip('"')

# Fixing uppercase headers
df_ci_headend.columns = df_ci_headend.columns.str.lower()
df_ci_headend["name"] = df_ci_headend["id_cm"]

df_ci_headend_merged = pd.merge(df_ci_headend, df_wkt_network, on="name")

if with_geocoding:
for row in df_ci_headend.itertuples():
df_ci_headend.loc[
df_ci_headend["id_cabecera"] == row.id_cabecera, "direccion"
] = reverse_geocode([row.longituid, row.latitud])["address"]["Address"]

df_ci_headend_merged.drop(
["tipo_y", "description", "owner", "feeder", "tecnologia"], axis=1, inplace=True
)
df_ci_headend_merged.rename(
columns={"tipo_x": "tipo", "partido_x": "partido_despliegue"}, inplace=True
)

df_ci_headend_merged["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logger.debug("Importing data to ci_headend....")

gdf = gpd.GeoDataFrame(df_ci_headend_merged, crs="EPSG:4326", geometry="geom")

gdf.to_postgis(
"ci_headend",
conn,
schema="cm",
if_exists="replace",
index=False,
dtype={"geom": Geometry("POINT", srid=4326)},
)

logger.debug("ci_headend data imported successfully")


def import_cm_report_isp(conn, path):
df_risp = pd.read_csv(
path, keep_default_na=False, encoding="latin1", na_values=na_values, sep=";"
)

# Fixing double quotes on headers
df_risp.columns = df_risp.columns.str.strip('"')
# Fixing uppercase headers
df_risp.columns = df_risp.columns.str.lower()

df_risp["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug("Importing data to cm_report_isp....")


sql_write(
conn,
df_risp,
table_name="cm_report_isp",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)

logging.debug("cm_report_isp data imported successfully")


def import_report_isp_osp(conn, path):
df_rio = pd.read_csv(
path, keep_default_na=False, encoding="latin1", sep=";", na_values=na_values
)

# Fixing double quotes on headers
df_rio.columns = df_rio.columns.str.strip('"')
# Fixing uppercase headers
df_rio.columns = df_rio.columns.str.lower()

df_rio["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug("Importing data to df_ioo....")
df_rio = df_rio.convert_dtypes()

sql_write(
conn,
df_rio,
table_name="report_isp_osp",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)

logging.debug("report_isp_osp data imported successfully")


def import_report_e2e(conn, path):
df_report_e2e = pd.read_csv(
path, keep_default_na=False, encoding="latin-1", na_values=na_values, sep=","
)

# Fixing double quotes on headers
df_report_e2e.columns = df_report_e2e.columns.str.strip('"')
# Fixing uppercase headers
df_report_e2e.columns = df_report_e2e.columns.str.lower()
df_report_e2e["location_name"].str.strip('" ?\n\t')
df_report_e2e["location_description"].str.strip('" ?\n\t')
df_report_e2e["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug("Importing data to df_report_e2e....")

sql_write(
conn,
df_report_e2e,
table_name="report_e2e",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logging.debug("report_e2e data imported successfully")


def import_inventory_fat_occupation(conn, path):
#RF 06/03/2023 cambio el separador a pipe (acordado con IT) porque a veces vienen comas decimales en las coordenadas
df_ifo = pd.read_csv(
path,
engine="python",
keep_default_na=False,
encoding="latin1",
quoting=csv.QUOTE_ALL,
na_values=na_values,
sep="|",
)
# Fixing uppercase headers
df_ifo.columns = df_ifo.columns.str.lower()
df_ifo["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug("Importing data to df_ifo....")
df_ifo = df_ifo.convert_dtypes()

#RF 14/1/2023- Cambiado el formateo de fechas con linea coerce para evitar los errores de fechas cuando viene '0' (que no debería)

df_ifo["reserved_date"] = (
pd.to_datetime(df_ifo["reserved_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)
df_ifo["provided_date"] = (
pd.to_datetime(df_ifo["provided_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)
df_ifo["cease_date"] = (
pd.to_datetime(df_ifo["cease_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)

sql_write(
conn,
df_ifo,
table_name="inventory_fat_occupation",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logging.debug("inventory_fat_occupation data imported successfully")


def import_inventory_olt_occupation(conn, path):
# La lectura la hago en chunks porque es un csv muy grande
chunks = []
for chunk in pd.read_csv(path, keep_default_na=False, encoding="latin1", sep=";", na_values=na_values, chunksize=100000): #Ajustar chunksize segun se necesite
chunk.drop_duplicates(inplace=True)
chunks.append(chunk)

df_ioo = pd.concat(chunks, ignore_index=True)
df_ioo.drop_duplicates(inplace=True)

# Fixing double quotes on headersk8ukmjiugnm6gt64y8r,l
df_ioo.columns = df_ioo.columns.str.strip('"')
# Fixing uppercase headers
df_ioo.columns = df_ioo.columns.str.lower()
df_ioo["upload_date"] = datetime.today().strftime("%m-%d-%Y")

logging.debug(f"csv file {path} - Importing data to df_ioo....")
df_ioo = df_ioo.convert_dtypes()
#RF 30/01/2023. El access id es un numero y se usa como string. Python a veces lo toma como float
#y al convertirlo a string le agrega .0 al final. Se lo quito
df_ioo["access_id"]=df_ioo["access_id"].astype(str).str.removesuffix(".0")
df_ioo["fat_port_access_id"]=df_ioo["fat_port_access_id"].astype(str).str.removesuffix(".0")

#RF 14/1/2023- Cambiado el formateo de fechas con linea coerce para evitar los errores de fechas cuando viene '0' (que no debería)
df_ioo["fat_port_reserved_date"] = (
pd.to_datetime(df_ioo["fat_port_reserved_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)
df_ioo["fat_port_provid_date"] = (
pd.to_datetime(df_ioo["fat_port_provid_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)
df_ioo["fat_port_cease_date"] = (
pd.to_datetime(df_ioo["fat_port_cease_date"],errors='coerce')
.dt.strftime("%m/%d/%Y %H:%M:%S")
.str.strip(" \r\n\t")
)
sql_write(
conn,
df_ioo,
table_name="inventory_olt_occupation",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logging.debug("inventory_olt_occupation data imported successfully")


def import_inventory_mdu_bfat(conn, path):
df_imf = pd.read_csv(
path, keep_default_na=False, encoding="latin1", sep="|", na_values=na_values
)

# Fixing double quotes on headersk8ukmjiugnm6gt64y8r,l
df_imf.columns = df_imf.columns.str.strip('"')
# Fixing uppercase headers
df_imf.columns = df_imf.columns.str.lower()

df_imf["upload_date"] = datetime.today().strftime("%m-%d-%Y")
logging.debug("Importing data to df_imf....")

sql_write(
conn,
df_imf,
table_name="inventory_mdu_bfat",
schema_name="cm",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
logging.debug("inventory_mdu_bfat data imported successfully")




def load_cm_tables():

with engine.connect() as conn:
###RF 20/5/2025 - Agregada la importacion del wkt_fibra
import_wkt_fibra_raw(conn, wkt_fibra_path)
conn.commit()
import_cm_wkt_headend_fosc_sfat_mfat_bfat_table(
conn, wkt_headend_fosc_sfat_mfat_bfat_path
)
import_cm_wkt_op_raw_table(conn, wkt_op_path)
import_wkt_cables_table(conn, wkt_cables_path)
import_cm_ci_sfat_mfat_bfat_raw_table(conn, ci_sfat_mfat_bfat_path)
import_cm_ci_fosc_raw_table(conn, ci_fosc_path,ci_site_path)
import_cm_ci_site_table(conn, ci_site_path)
import_cm_ci_op_raw_table(conn, ci_op_path)
import_cm_ci_feeder_distribution_raw_table(conn, ci_feederd_path)

conn.commit()
df_wkt_network = sql_read(
conn,
f"select * from {SCHEMA_AUDIT}.wkt_headend_fosc_sfat_mfat_bfat",
geom_col="geom",
)
# # # Reducing footprint
df_wkt_network = df_wkt_network.convert_dtypes()
df_wkt_network["tipo"] = df_wkt_network["tipo"].astype("category")
df_wkt_network["tecnologia"] = df_wkt_network["tecnologia"].astype("category")
df_wkt_network["partido_despliegue"] = df_wkt_network[
"partido_despliegue"
].astype("category")
df_wkt_network["owner"] = df_wkt_network["owner"].astype("category")
conn.commit()
import_cm_ci_headend_table(conn, ci_headend_path, df_wkt_network)
conn.commit()
import_cm_ci_sfat_mfat_bfat_table(conn)
conn.commit()
import_cm_wkt_op_table(conn)
conn.commit()
import_cm_ci_fosc_table(conn)
conn.commit()
import_cm_ci_mdu_table(conn, ci_mdu_path)
conn.commit()
import_cm_ci_olt_table(conn, ci_olt_path)
conn.commit()
import_cm_ci_op_table(conn)
conn.commit()
import_cm_ci_feeder_distribution_table(conn)
conn.commit()
import_cm_ci_splitter_table(conn, ci_splitter_path)
conn.commit()
import_cm_report_isp(conn, report_isp_path)
conn.commit()
import_report_e2e(conn, report_e2e_path)
conn.commit()
import_report_fusiones(conn, report_fusiones_path)
conn.commit()
import_inventory_fat_occupation(conn, ifo_path)
conn.commit()
###RF 16/5/2025 - Comento la siguiente tabla porque tiene errores de la exportacion desde CM
import_inventory_olt_occupation(conn, ioo_path)
conn.commit()
import_report_isp_osp(conn, rio_path)
conn.commit()
import_inventory_mdu_bfat(conn, imf_path)
conn.commit()
###RF 20/5/2025 - Agregada la importacion del cm_inventory_cable_occupation(conn) que es un query sobre el wkt_fibra_raw
import_cm_inventory_cable_occupation(conn)
conn.commit()
#import_cm_ci_fosc_table_uat(conn)
#conn.commit()



@functools.lru_cache



def parse_odd_co_names(co_name) -> str:
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
co_name=str(co_name)
# EDN CO have another format EX: EDN_CT_MORON_HURLINGHAM
if "EDN" in co_name:
return "EDN"
elif ("IPRAN" in co_name) or ("RAMX" in co_name):
return "AMX"
elif ("RATC" in co_name) or ("ANILLO GPON ITX" in co_name):
return "ATC"
elif ("ANILLO ME" in co_name) or ("ANILLO GP" in co_name) or ("ANILLO_ME" in co_name) or ("BBIP" in co_name):
return "AMX"
else:
chunks = co_name.split("_")
lchunks = len(chunks)
if lchunks < 2:
chunks = co_name.split("-")
lchunks = len(chunks)
if lchunks == 3 and "R" in chunks[1]:
return "AMX"

return None

if "CL" in chunks[1]:
return "AMX"
elif "CYC" in chunks[1]:
return "ATC"
elif "MV" in chunks[1]:
return "MOV"
elif "TP" in chunks[1]:
return "TCP"
else:
return chunks[1]


def parse_odd_co_names_for_cab(co_name) -> str:

#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
co_name=str(co_name)

# EDN CO have another format EX: EDN_CT_MORON_HURLINGHAM
if "EDN" in co_name:
return "TBD"
elif "IPRAN" in co_name:
return "TBD"
elif "RAMX" in co_name:
chunks = co_name.split("-")
lchunks = len(chunks)
if lchunks < 2:
return "TDB"
return chunks[0]
elif "RATC" in co_name:
chunks = co_name.split("-")
lchunks = len(chunks)
if lchunks < 2:
return "TDB"
return chunks[0]
elif "ANILLO GPON ITX" in co_name:
chunks = co_name.split(" ")
lchunks = len(chunks)
if lchunks < 4:
return "TDB"
return chunks[3][:5]
elif "ANILLO GP" in co_name:
return "AMX"
elif "ANILLO ME" in co_name:
return "AMX"
elif "ANILLO_ME" in co_name:
return "AMX"
elif "BBIP" in co_name:
return "AMX"
else:
chunks = co_name.split("_")
lchunks = len(chunks)
if lchunks < 2:
chunks = co_name.split("-")
lchunks = len(chunks)
if lchunks == 3 and "R" in chunks[1]:
return chunks[0]

return "TDB"

return chunks[0]


def set_co_operador(co_name) -> str:
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
co_name=str(co_name)
mask = pd_co_nd["osname"].values == co_name
if mask.any():
return pd_co_nd[mask]["operador"].iloc[0]
else:
return parse_odd_co_names(co_name)


def set_co_cab(co_name) -> str:
#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
co_name=str(co_name)
mask = pd_co_nd["osname"].values == co_name
if mask.any():
return pd_co_nd[mask]["cab"].iloc[0]
else:
return parse_odd_co_names_for_cab(co_name)


def apply_set_co_operador(co):
return set_co_operador(co["osname"])


def apply_set_co_cab(co):
return set_co_cab(co["osname"])


def convert_co_atc_to_om(operador, co_name) -> str:

# if operador != "AMX":
# return co_name

#RF 12/01/2023 Casteo a str co_name porque hay algunos que quedan sin tipo (creo que es porque hay osnames nulos)
co_name=str(co_name)
if operador == "AMX":
chunks = co_name.split("-")
count_chunks = len(chunks)
if count_chunks < 4 or 4 > count_chunks:
return co_name

co_fixed = f"{chunks[0]}-R{chunks[3][-4:]}-001"
elif operador == "ATC":
if "PATH" in co_name: # Is a CM name ES010101-PATH-03-000297-0000
chunks = co_name.split("-")
if count_chunks == 5:
co_fixed = f"{chunks[0][0:4]}-RATC-0-{chunks[3]}"
else:
co_fixed = co_name
else:
co_fixed = co_name
else:
co_fixed = co_name

return co_fixed


def convert_cname_atc_to_om(operador, cname) -> str:
if operador != "AMX":
return cname

if operador == "AMX" and cname:
chunks = cname.split("-")

count_chunks = len(chunks)
if count_chunks < 4 or 4 > count_chunks:
return cname

cname_fixed = f"{chunks[0]}-R{chunks[3][-4:]}-001"

return cname_fixed
else:
return cname


def generate_nombre_op(operator, atc_name, customer_name) -> str:
if operator == "AMX":
return customer_name
else:
return atc_name


def apply_convert_co_atc_to_om(co):
return convert_co_atc_to_om(co["operador"], co["osname"])


def apply_convert_cname_atc_to_om(cname):
return convert_cname_atc_to_om(cname["operador"], cname["nombre_cliente"])


def apply_generate_nombre_op(fat):
return generate_nombre_op(fat["operador"], fat["nombre_atc"], fat["nombre_cliente"])



def create_grant_tables() -> None:
logger = logging.getLogger()

with engine.connect() as conn:
#conn.execute(text("""REVOKE ALL ON ALL TABLES IN SCHEMA public FROM PUBLIC;"""))
conn.execute(
text(
"""GRANT CREATE,USAGE ON SCHEMA cm,audit TO postgres;"""
)
)
conn.execute(
text(
"""GRANT USAGE ON SCHEMA cm , audit TO om_read;"""
)
)
conn.execute(
text(
"""GRANT SELECT ON ALL TABLES IN SCHEMA audit,cm TO om_read;"""
)
)
conn.commit()
logger.debug("\t*** Grants given to all tables successfully in PostgreSQL ")

def create_grant_tables_extra() -> None:
logger = logging.getLogger()

with engine.connect() as conn:
#conn.execute(text("""REVOKE ALL ON ALL TABLES IN SCHEMA public FROM PUBLIC;"""))

conn.execute(
text(
"""GRANT USAGE ON SCHEMA cm , audit TO jibanez;"""
)
)
conn.execute(
text(
"""GRANT SELECT ON ALL TABLES IN SCHEMA audit,cm TO jibanez;"""
)
)
conn.commit()
logger.debug("\t*** Grants given to all tables successfully in PostgreSQL ")

if __name__ == "__main__":

try:

log_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "log.ini"
)

logging.config.fileConfig(log_file_path, disable_existing_loggers=False)

logger = logging.getLogger("main")

process_files()
setup_connection()
recreate_database()

with CodeTimer("load_cm_data"):
load_cm_tables()
create_grant_tables()
create_grant_tables_extra()
log_final_en_postgres()
except Exception:
logging.error(f"uncaught exception: {traceback.format_exc()}")
finally:
logging.info(msg="PostgreSQL conn is closed")
    (1-1/1)