import os import platform import pprint import socket from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Tuple import pyodbc import snap7 from dotenv import load_dotenv from snap7.util.getters import get_bool, get_lreal load_dotenv() # Determine the correct driver based on OS SQL_DRIVER = ( "ODBC Driver 18 for SQL Server" if platform.system() == "Linux" else "SQL Server" ) # Configuration CONN_STR = ( f"Driver={{{SQL_DRIVER}}};" f"Server={os.getenv('DB_SERVER')};" f"Database={os.getenv('DB_NAME')};" f"UID={os.getenv('DB_USER')};" f"PWD={os.getenv('DB_PASSWORD')};" "TrustServerCertificate=yes;" ) @dataclass class PlcConfig: id: int ip: str db_number: int is_enabled: bool air_offset: int energy_offset: int location: str last_energy_read: float last_air_read: float runstop_status_offset: int @dataclass class SchedulerConfig: interval: int next_read: datetime class DatabaseManager: def __init__(self, connection_string: str): self.conn_str = connection_string def get_plc_configs(self) -> List[PlcConfig]: with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute(""" SELECT Id, Ip, DbNumber, IsEnable, AirDbOffset, EnergyDbOffset, Location, LastEnergyRead, LastAirRead, RunStopStatusDbOffset FROM sch.Plc WHERE IsEnable = 1 """) return [PlcConfig(*row) for row in cursor.fetchall()] def get_scheduler_config(self) -> SchedulerConfig: with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute(""" SELECT Interval, NextRead FROM sch.SchedulerParameters WHERE Name = 'Python_Energy_Scheduler' """) row = cursor.fetchone() return ( SchedulerConfig(row[0], row[1]) if row else SchedulerConfig(30, datetime.now()) ) def save_energy_data(self, plc_id: int, energy: float, air: float, state: bool): with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sch.Energy (Energy, PlcId, Air, State, CreatedAt) VALUES (?, ?, ?, ?, ?) """, (energy, plc_id, air, state, datetime.now()), ) conn.commit() def update_next_read(self, interval_seconds: int): next_read = datetime.now() + timedelta(seconds=interval_seconds) with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sch.SchedulerParameters SET NextRead = ? WHERE Name = 'Python_Energy_Scheduler' """, (next_read,), ) conn.commit() class PlcManager: def check_connection(self, ip: str, port: int = 102, timeout: int = 1) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex((ip, port)) == 0 def read_plc_data(self, config: PlcConfig) -> Tuple[float, float, bool]: plc = snap7.client.Client() try: plc.connect(config.ip, 0, 1) data = plc.db_read(config.db_number, 0, config.runstop_status_offset) # Define range of bytes to read energy_value = get_lreal(data, config.energy_offset) # Read energy value air_value = get_lreal(data, config.air_offset) # Read air value # run_status_value = get_bool(data, config.runstop_status_offset, 1) # Read run start value return air_value, energy_value, True except Exception as e: pprint.pp( { "msg": "Failed to read PLC data", "plc_config": { "id": config.id, "ip": config.ip, "db_number": config.db_number, "air_offset": config.air_offset, "energy_offset": config.energy_offset, "runstop_status_offset": config.runstop_status_offset, "location": config.location, "ene_val": energy_value, "air_val": air_value, "run_status_val": run_status_value, "timestamp": datetime.now().isoformat() }, "error": str(e), "timestamp": datetime.now().isoformat() } ) raise finally: plc.disconnect()