IndustrialEnergyTracker/energy_monitor.py
Igor Barcik 660ea7666f "Enhance PLC data monitoring and error handling"
Key changes include:

Improved error handling and logging in PLC data reading
Added detailed debug information with pprint
Restructured logging format for better readability
Removed redundant docstrings
Fixed PLC data reading logic with proper value extraction
Added support for extra properties in logger setup
Code cleanup and formatting improvements
2024-12-10 07:58:51 +01:00

134 lines
5.1 KiB
Python

import os
import platform
import pprint
import socket
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Tuple
import pyodbc
import snap7
from dotenv import load_dotenv
from snap7.util.getters import get_lreal, get_bool
load_dotenv()
# Determine the correct driver based on OS
SQL_DRIVER = (
"ODBC Driver 18 for SQL Server" if platform.system() == "Linux" else "SQL Server"
)
# Configuration
CONN_STR = (
f"Driver={{{SQL_DRIVER}}};"
f"Server={os.getenv('DB_SERVER')};"
f"Database={os.getenv('DB_NAME')};"
f"UID={os.getenv('DB_USER')};"
f"PWD={os.getenv('DB_PASSWORD')};"
"TrustServerCertificate=yes;"
)
@dataclass
class PlcConfig:
id: int
ip: str
db_number: int
is_enabled: bool
air_offset: int
energy_offset: int
location: str
last_energy_read: float
last_air_read: float
runstop_status_offset: int
@dataclass
class SchedulerConfig:
interval: int
next_read: datetime
class DatabaseManager:
def __init__(self, connection_string: str):
self.conn_str = connection_string
def get_plc_configs(self) -> List[PlcConfig]:
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT Id, Ip, DbNumber, IsEnable, AirDbOffset, EnergyDbOffset, Location, LastEnergyRead, LastAirRead, RunStopStatusDbOffset
FROM sch.Plc
WHERE IsEnable = 1
""")
return [PlcConfig(*row) for row in cursor.fetchall()]
def get_scheduler_config(self) -> SchedulerConfig:
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT Interval, NextRead
FROM sch.SchedulerParameters
WHERE Name = 'Python_Energy_Scheduler'
""")
row = cursor.fetchone()
return (
SchedulerConfig(row[0], row[1])
if row
else SchedulerConfig(30, datetime.now())
)
def save_energy_data(self, plc_id: int, energy: float, air: float, state: bool):
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sch.Energy (Energy, PlcId, Air, State, CreatedAt)
VALUES (?, ?, ?, ?, ?)
""",
(energy, plc_id, air, state, datetime.now()),
)
conn.commit()
def update_next_read(self, interval_seconds: int):
next_read = datetime.now() + timedelta(seconds=interval_seconds)
with pyodbc.connect(self.conn_str) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sch.SchedulerParameters
SET NextRead = ?
WHERE Name = 'Python_Energy_Scheduler'
""",
(next_read,),
)
conn.commit()
class PlcManager:
def check_connection(self, ip: str, port: int = 102, timeout: int = 1) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex((ip, port)) == 0
def read_plc_data(self, config: PlcConfig) -> Tuple[float, float, bool]:
plc = snap7.client.Client()
try:
plc.connect(config.ip, 0, 1)
data = plc.db_read(config.db_number, 0, config.runstop_status_offset) # Define range of bytes to read
energy_value = get_lreal(data, config.energy_offset) # Read energy value
air_value = get_lreal(data, config.air_offset) # Read air value
# run_status_value = get_bool(data, config.runstop_status_offset, 1) # Read run start value
return air_value, energy_value, True
except Exception as e:
pprint.pp(
{
"msg": "Failed to read PLC data",
"plc_config": {
"id": config.id,
"ip": config.ip,
"db_number": config.db_number,
"air_offset": config.air_offset,
"energy_offset": config.energy_offset,
"runstop_status_offset": config.runstop_status_offset,
"location": config.location,
"ene_val": energy_value,
"air_val": air_value,
"run_status_val": run_status_value,
"timestamp": datetime.now().isoformat()
},
"error": str(e),
"timestamp": datetime.now().isoformat()
}
)
raise
finally:
plc.disconnect()