IndustrialEnergyTracker/legacy/energy_monitor.py

145 lines
5.0 KiB
Python

import configparser
import platform
import pprint
import socket
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Tuple
import pyodbc
import snap7
from dotenv import load_dotenv
from snap7.util.getters import get_bool, get_lreal
# Load config.ini
config = configparser.ConfigParser()
config.read("config.ini")
# Determine the correct driver based on OS
SQL_DRIVER = "ODBC Driver 18 for SQL Server" if platform.system() == "Linux" else "SQL Server"
# Configuration
CONN_STR = (
f"Driver={{{SQL_DRIVER}}};"
f"Server={config['mssql']['host']};"
f"Database={config['mssql']['name']};"
f"UID={config['mssql']['user']};"
f"PWD={config['mssql']['password']};"
"TrustServerCertificate=yes;"
)
@dataclass
class PlcConfig:
id: int
ip: str
db_number: int
is_enabled: bool
air_offset: int
energy_offset: int
location: str
last_energy_read: float
last_air_read: float
runstop_status_offset: int
@dataclass
class SchedulerConfig:
interval: int
next_read: datetime
class DatabaseManager:
def __init__(self, connection_string: str):
self.conn_str = connection_string
def get_plc_configs(self) -> List[PlcConfig]:
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT Id, Ip, DbNumber, IsEnable, AirDbOffset, EnergyDbOffset, Location, LastEnergyRead, LastAirRead, RunStopStatusDbOffset
FROM sch.Plc
WHERE IsEnable = 1
""")
return [PlcConfig(*row) for row in cursor.fetchall()]
def get_scheduler_config(self) -> SchedulerConfig:
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT Interval, NextRead
FROM sch.SchedulerParameters
WHERE Name = 'Python_Energy_Scheduler'
""")
row = cursor.fetchone()
return (
SchedulerConfig(row[0], row[1]) if row else SchedulerConfig(30, datetime.now())
)
def save_energy_data(self, plc_id: int, energy: float, air: float, state: bool):
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sch.Energy (Energy, PlcId, Air, State, CreatedAt)
VALUES (?, ?, ?, ?, ?)
""",
(energy, plc_id, air, state, datetime.now()),
)
conn.commit()
def update_next_read(self, interval_seconds: int):
next_read = datetime.now() + timedelta(seconds=interval_seconds)
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sch.SchedulerParameters
SET NextRead = ?
WHERE Name = 'Python_Energy_Scheduler'
""",
(next_read,),
)
conn.commit()
class PlcManager:
def check_connection(self, ip: str, port: int = 102, timeout: int = 1) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex((ip, port)) == 0
def read_plc_data(self, config: PlcConfig) -> Tuple[float, float, bool]:
plc = snap7.client.Client()
try:
plc.connect(config.ip, 0, 1)
data = plc.db_read(
config.db_number, 0, config.runstop_status_offset
) # Define range of bytes to read
energy_value = get_lreal(data, config.energy_offset) # Read energy value
air_value = get_lreal(data, config.air_offset) # Read air value
# run_status_value = get_bool(data, config.runstop_status_offset, 1) # Read run start value
return air_value, energy_value, True
except Exception as e:
pprint.pp(
{
"msg": "Failed to read PLC data",
"plc_config": {
"id": config.id,
"ip": config.ip,
"db_number": config.db_number,
"air_offset": config.air_offset,
"energy_offset": config.energy_offset,
"runstop_status_offset": config.runstop_status_offset,
"location": config.location,
"ene_val": energy_value,
"air_val": air_value,
"run_status_val": True,
"timestamp": datetime.now().isoformat(),
},
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
)
raise
finally:
plc.disconnect()