|
import logging
|
|
import pandas as pd
|
|
import os
|
|
from sqlalchemy import create_engine, text
|
|
import sqlalchemy
|
|
import sys
|
|
from datetime import timezone,datetime
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from datetime import datetime, timedelta
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
import requests
|
|
import json
|
|
from datetime import datetime,timedelta
|
|
import numpy as np
|
|
import re
|
|
|
|
|
|
|
|
#RL 14082023 - Credenciales de acceso a la base.
|
|
PGHOST = "localhost"
|
|
PGDATABASE = "postgres"
|
|
PGUSER = "postgres"
|
|
PGPASSWORD = "post3008"
|
|
|
|
# URL de la API de autenticación Altiplano
|
|
auth_url = "https://10.200.3.100:32443/altiplano-sso/realms/master/protocol/openid-connect/token"
|
|
# Datos de autenticación
|
|
username = "inp_nbiuser"
|
|
password = "Y4h5#L7="
|
|
api_url_base='https://10.200.3.100:32443/altiplano-opentsdb/api/'
|
|
|
|
#RL 14082023 - Fecha actual (siempre)
|
|
date = datetime.now().strftime('%Y%m%d')
|
|
|
|
DATABASE_URI = f"postgresql+psycopg2://{PGUSER}:{PGPASSWORD}@{PGHOST}:5432/{PGDATABASE}"
|
|
|
|
#RL 14082023 - Configuración de Logs.
|
|
###logging.basicConfig(filename= f"/home/atcfiber.com/sa.roberto.fittipald/cronlogs/bajada_tasa.log", level=logging.DEBUG)
|
|
|
|
def log_final_en_postgres(engine):
|
|
cnx=engine.raw_connection()
|
|
cursor=cnx.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO z_procesos_python.scripts_ejecutados (nombre_script, ultima_ejecucion)
|
|
VALUES ('bwpon', CURRENT_TIMESTAMP AT TIME ZONE 'America/Argentina/Buenos_Aires');
|
|
""")
|
|
cnx.commit()
|
|
|
|
|
|
##RL 14082023 - Obtiene datos de Postgresql y crea dataframe.
|
|
def execute_query(query, engine):
|
|
df = pd.read_sql(sqlalchemy.text(query), engine.connect())
|
|
if df.empty:
|
|
logging.warning("No se encontraron registros...")
|
|
return df
|
|
|
|
|
|
def borrar_registros_viejos(engine):
|
|
cnx=engine.raw_connection()
|
|
cursor=cnx.cursor()
|
|
|
|
cursor.execute("""
|
|
delete FROM altiplano.bws_pons bw
|
|
where age(now(), bw.datetime) > '365 days';
|
|
"""
|
|
)
|
|
cnx.commit()
|
|
|
|
def crea_tablas_bws_pons(engine):
|
|
### Crea la tabla potencias solo si no existe
|
|
# (es por si se borra todo)
|
|
cnx=engine.raw_connection()
|
|
cursor=cnx.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS altiplano.bws_pons (
|
|
interface varchar NULL,
|
|
datetime timestamp NULL,
|
|
bandwidth_down_mbps float NULL,
|
|
bandwidth_up_mbps float NULL,
|
|
|
|
upload_date text NULL
|
|
);
|
|
|
|
GRANT SELECT ON ALL TABLES IN SCHEMA altiplano TO om_read;
|
|
"""
|
|
)
|
|
cnx.commit()
|
|
|
|
|
|
def create_grant_tables(engine) -> None:
|
|
|
|
with engine.connect() as conn:
|
|
conn.execute(
|
|
text(
|
|
"""GRANT SELECT ON ALL TABLES IN SCHEMA altiplano TO om_read;"""
|
|
)
|
|
)
|
|
|
|
def export_to_sql(engine, df_bi,table_name) -> None:
|
|
#logger = logging.getLogger()
|
|
|
|
df_bi["upload_date"] = datetime.today().strftime("%m-%d-%Y")
|
|
df_bi = df_bi.convert_dtypes()
|
|
|
|
with engine.connect() as conn:
|
|
df_bi.to_sql(
|
|
table_name,
|
|
conn,
|
|
schema='altiplano',
|
|
if_exists="append",
|
|
index=False,
|
|
)
|
|
#conn.commit()
|
|
#logger.info(f"{path} data imported successfully to postgres")
|
|
|
|
|
|
def remove_dup_columns(frame):
|
|
keep_names = set()
|
|
keep_icols = list()
|
|
for icol, name in enumerate(frame.columns):
|
|
if name not in keep_names:
|
|
keep_names.add(name)
|
|
keep_icols.append(icol)
|
|
return frame.iloc[:, keep_icols]
|
|
|
|
|
|
def consulta_API_BWs(df: pd.DataFrame) -> pd.DataFrame:
|
|
###RF 17 06 2025 - Cambio el metodo para obtener el token acorde a la definición del NBI
|
|
# Realizar la autenticación y obtener el token
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
# Data to send (keys and values)
|
|
data = {
|
|
"grant_type": "password",
|
|
"client_secret": "a9428765-ddc4-485c-8bf1-897f39264eb2",
|
|
"client_id": "ALTIPLANO",
|
|
"username": username,
|
|
"password": password
|
|
}
|
|
|
|
auth_response = requests.post(auth_url,headers=headers,data=data,verify=False)
|
|
|
|
if auth_response.status_code == 200:
|
|
access_token = auth_response.json().get("access_token")
|
|
else:
|
|
print("Error al autenticar. Código de estado:", auth_response.status_code)
|
|
access_token = None
|
|
|
|
|
|
# Si la autenticación fue exitosa, realizar la consulta del status de la ONT con el método GET
|
|
df_rta=pd.DataFrame()
|
|
if access_token:
|
|
for index, row in df.iterrows():
|
|
|
|
#### Empezamos con el BW de downstream
|
|
###{{protocol}}://{{server-inp}}:{{port-inp}}/altiplano-opentsdb/api/query?start=24h-ago&m=none:statistics/out-octets.{{Device-Name}}.LT{{LT}}{relativeObjectID=anv__c_ietf-interfaces__c_interfaces-state/interface__e_CT_{{Device-Name}}-{{LT}}-{{PON}}_{{PON}}_GPON}
|
|
|
|
api_url = api_url_base+f"query?start=24h-ago&m=none:statistics/out-octets.{row['ne']}{{relativeObjectID=anv__c_ietf-interfaces__c_interfaces-state/interface__e_CT_{row['ne2']}}}"
|
|
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}", # Token obtenido anteriormente
|
|
"Content-Type":"application/yang-data+json",
|
|
"Accept":"application/yang-data+json"
|
|
}
|
|
|
|
response_status = requests.get(api_url, headers=headers, verify=False) # Guardamos la respuesta de la consulta a la API
|
|
|
|
if response_status.status_code == 200: # Si la respuesta fue exitosa, extraemos los datos de la consulta (se recibe un JSON)
|
|
data_status = response_status.json()
|
|
#print(response_status.text)
|
|
dstring = json.dumps(response_status.json())
|
|
json_data = json.loads(dstring)
|
|
|
|
# Extract data points
|
|
dps = json_data[0]['dps']
|
|
df_dw = pd.DataFrame(list(dps.items()), columns=['epoch_time', 'out_octets'])
|
|
|
|
# Convert epoch to datetime
|
|
df_dw['epoch_time'] = df_dw['epoch_time'].astype(int)
|
|
df_dw['datetime'] = pd.to_datetime(df_dw['epoch_time'], unit='s') - pd.Timedelta(hours=3)
|
|
|
|
# Calculate bandwidth in Mbps (assuming 5-minute intervals)
|
|
df_dw['bandwidth_down_mbps'] = df_dw['out_octets'].diff().fillna(0) * 8 / df_dw['epoch_time'].diff() / 1e6
|
|
|
|
# Extract interface name from relativeObjectID
|
|
relative_object_id = json_data[0]['tags']['relativeObjectID']
|
|
interface_name = relative_object_id.split('__e_CT_')[-1]
|
|
interface_name = re.sub(r'_[^_]+_GPON$', '', interface_name)
|
|
df_dw['interface'] = interface_name
|
|
|
|
# Remove unnecessary columns
|
|
df_dw = df_dw.drop(columns=['epoch_time', 'out_octets'])
|
|
|
|
# Reorder columns: interface, bandwidth_mbps, datetime
|
|
df_dw = df_dw[['interface', 'datetime','bandwidth_down_mbps']]
|
|
|
|
|
|
# Filter out zero bandwidth values
|
|
non_zero_df_dw = df_dw[df_dw['bandwidth_down_mbps'] > 0]
|
|
|
|
# Get rows with min and max bandwidth
|
|
min_bandwidth_row = non_zero_df_dw[non_zero_df_dw['bandwidth_down_mbps'] == non_zero_df_dw['bandwidth_down_mbps'].min()]
|
|
max_bandwidth_row = non_zero_df_dw[non_zero_df_dw['bandwidth_down_mbps'] == non_zero_df_dw['bandwidth_down_mbps'].max()]
|
|
|
|
|
|
# Combine the results
|
|
result_df = pd.concat([min_bandwidth_row, max_bandwidth_row])
|
|
|
|
# Display the result
|
|
#print(result_df)
|
|
|
|
#concatenate all responses in a single df
|
|
df_rta=pd.concat([df_rta,result_df])
|
|
|
|
for index, row in df.iterrows():
|
|
|
|
#### Seguimos con el BW de upstream
|
|
|
|
##https://{{server-inp}}:{{port-inp}}/altiplano-opentsdb/api/query?start=15m-ago&m=none:statistics/in-octets.{{Device-Name}}.LT{{LT}}{relativeObjectID=anv__c_ietf-interfaces__c_interfaces-state/interface__e_CT_{{Device-Name}}-{{LT}}-{{PON}}_{{PON}}_GPON}
|
|
|
|
api_url = api_url_base+f"query?start=24h-ago&m=none:statistics/in-octets.{row['ne']}{{relativeObjectID=anv__c_ietf-interfaces__c_interfaces-state/interface__e_CT_{row['ne2']}}}"
|
|
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}", # Token obtenido anteriormente
|
|
"Content-Type":"application/yang-data+json",
|
|
"Accept":"application/yang-data+json"
|
|
}
|
|
|
|
response_status = requests.get(api_url, headers=headers, verify=False) # Guardamos la respuesta de la consulta a la API
|
|
|
|
if response_status.status_code == 200: # Si la respuesta fue exitosa, extraemos los datos de la consulta (se recibe un JSON)
|
|
data_status = response_status.json()
|
|
#print(response_status.text)
|
|
dstring = json.dumps(response_status.json())
|
|
json_data = json.loads(dstring)
|
|
|
|
|
|
# Extract data points
|
|
dps = json_data[0]['dps']
|
|
df_up = pd.DataFrame(list(dps.items()), columns=['epoch_time', 'in_octets'])
|
|
|
|
# Convert epoch to datetime
|
|
df_up['epoch_time'] = df_up['epoch_time'].astype(int)
|
|
df_up['datetime'] = pd.to_datetime(df_up['epoch_time'], unit='s') - pd.Timedelta(hours=3)
|
|
|
|
|
|
# Calculate bandwidth in Mbps (assuming 5-minute intervals)
|
|
df_up['bandwidth_up_mbps'] = df_up['in_octets'].diff().fillna(0) * 8 / df_up['epoch_time'].diff() / 1e6
|
|
|
|
# Extract interface name from relativeObjectID
|
|
relative_object_id = json_data[0]['tags']['relativeObjectID']
|
|
interface_name = relative_object_id.split('__e_CT_')[-1]
|
|
interface_name = re.sub(r'_[^_]+_GPON$', '', interface_name)
|
|
df_up['interface'] = interface_name
|
|
|
|
# Remove unnecessary columns
|
|
df_up = df_up.drop(columns=['epoch_time', 'in_octets'])
|
|
|
|
# Reorder columns: interface, bandwidth_mbps, datetime
|
|
df_up = df_up[['interface', 'datetime','bandwidth_up_mbps']]
|
|
|
|
|
|
# Filter out zero bandwidth values
|
|
non_zero_df_up = df_up[df_up['bandwidth_up_mbps'] > 0]
|
|
|
|
# Get rows with min and max bandwidth
|
|
min_bandwidth_row = non_zero_df_up[non_zero_df_up['bandwidth_up_mbps'] == non_zero_df_up['bandwidth_up_mbps'].min()]
|
|
max_bandwidth_row = non_zero_df_up[non_zero_df_up['bandwidth_up_mbps'] == non_zero_df_up['bandwidth_up_mbps'].max()]
|
|
|
|
|
|
# Combine the results
|
|
result_df = pd.concat([min_bandwidth_row, max_bandwidth_row])
|
|
|
|
# Display the result
|
|
#print(result_df)
|
|
|
|
#concatenate all responses in a single df
|
|
df_rta=pd.concat([df_rta,result_df])
|
|
|
|
else:
|
|
print(f"Error al realizar la solicitud GET para Código de error:", response_status.status_code) # Si la solicitud GET falló, muestra este mensaje con el código de error
|
|
|
|
|
|
df_rta = df_rta.sort_values(by=['interface', 'datetime'], ascending=[True, True])
|
|
|
|
df_rta=df_rta.reset_index(drop=True)
|
|
return df_rta
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
try:
|
|
engine = create_engine(DATABASE_URI)
|
|
except SQLAlchemyError as e:
|
|
print("Falló la conexión a la base de datos...")
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
crea_tablas_bws_pons(engine)
|
|
|
|
#El siguiente query trae todos los ne únicos que hay en la red. Básicamente el ne es el objectname hasta la jerarquia de LT (sin puerto pon ni ONT id) y con formateo
|
|
#Ejemplo Formato Altiplano NE2 BA_OLTA_ES01_02-5-15-2_2_GPON (el -2_2_GPON es el Port PON 2, por algun motivo de altiplano se repite)
|
|
#Ejemplo Formato (para API) NE = 'BA_OLTA_ES01_02.LT15'
|
|
|
|
|
|
query_nokia = """
|
|
select distinct ((split_part(s.object_name,'-',1)||'.LT'||split_part(s.object_name,'-',2))) as ne,
|
|
((split_part(s.object_name,'-',1)||'-'||split_part(s.object_name,'-',2)||'-'||split_part(s.object_name,'-',3)||'_'||split_part(s.object_name,'-',3)||'_GPON')) as ne2
|
|
from altiplano.serial s
|
|
where (split_part(s.object_name,'-',1)||'.LT'||split_part(s.object_name,'-',2)) is not null
|
|
group by (split_part(s.object_name,'-',1)||'.LT'||split_part(s.object_name,'-',2)),
|
|
((split_part(s.object_name,'-',1)||'-'||split_part(s.object_name,'-',2)||'-'||split_part(s.object_name,'-',3)||'_'||split_part(s.object_name,'-',3)||'_GPON'))
|
|
"""
|
|
df = execute_query(query_nokia, engine)
|
|
df_bw=consulta_API_BWs(df)
|
|
|
|
export_to_sql(engine, df_bw,'bws_pons')
|
|
|
|
### Se borran registros de más de 1 año
|
|
borrar_registros_viejos(engine)
|
|
|
|
|
|
log_final_en_postgres(engine)
|
|
|
|
|