import logging import os import socket from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Tuple import pyodbc import seqlog import snap7 from dotenv import load_dotenv from snap7.util.getters import get_lreal, get_ulint load_dotenv() # Configuration CONN_STR = ( f"Driver={{SQL Server}};" f"Server={os.getenv('DB_SERVER')};" f"Database={os.getenv('DB_NAME')};" f"UID={os.getenv('DB_USER')};" f"PWD={os.getenv('DB_PASSWORD')};" ) # Logger setup root_logger = logging.getLogger() # Logger setup seq_logger = seqlog.log_to_seq( server_url=os.getenv("SEQ_URL"), api_key=os.getenv("SEQ_API_KEY"), level=logging.INFO, support_extra_properties=True, ) @dataclass class PlcConfig: """ Represents the configuration for a Programmable Logic Controller (PLC) device. This class holds the necessary information to connect to and read data from a PLC, including its IP address, database number, and offsets for air and energy data. Attributes: `id` (int): The unique identifier for the PLC. `ip` (str): The IP address of the PLC. `db_number` (int): The database number to read from the PLC. `air_offset` (int): The offset within the database for the air data. `energy_offset` (int): The offset within the database for the energy data. `is_enabled` (bool): Whether the PLC is enabled and should be monitored. """ id: int ip: str db_number: int air_offset: int energy_offset: int runstatus_offset: int is_enabled: bool @dataclass class SchedulerConfig: interval: int next_read: datetime class DatabaseManager: def __init__(self, connection_string: str): self.conn_str = connection_string def get_plc_configs(self) -> List[PlcConfig]: """ Retrieves a list of enabled PLC configurations from the database. Returns: List[PlcConfig]: A list of PlcConfig objects representing the enabled PLCs. """ with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute(""" SELECT Id, Ip, DbNumber, AirDbOffset, EnergyDbOffset, RunStatusDbOffset,IsEnable FROM sch.Plc WHERE IsEnable = 1 """) return [PlcConfig(*row) for row in cursor.fetchall()] def get_scheduler_config(self) -> SchedulerConfig: """ Retrieves the scheduler configuration from the database. Returns: SchedulerConfig: The scheduler configuration, including the interval and the next read time. """ with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute(""" SELECT Interval, NextRead FROM sch.SchedulerParameters WHERE Name = 'Python_Energy_Scheduler' """) row = cursor.fetchone() return ( SchedulerConfig(row[0], row[1]) if row else SchedulerConfig(30, datetime.now()) ) def save_energy_data(self, plc_id: int, energy: float, air: float, state: bool): with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sch.Energy (Energy, PlcId, Air, State, CreatedAt) VALUES (?, ?, ?, ?, ?) """, (energy, plc_id, air, state, datetime.now()), ) conn.commit() def update_next_read(self, interval_seconds: int): next_read = datetime.now() + timedelta(seconds=interval_seconds) with pyodbc.connect(self.conn_str) as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sch.SchedulerParameters SET NextRead = ? WHERE Name = 'Python_Energy_Scheduler' """, (next_read,), ) conn.commit() class PlcManager: @staticmethod def check_connection(ip: str, port: int = 102, timeout: int = 1) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex((ip, port)) == 0 def read_plc_data(self, config: PlcConfig) -> Tuple[float, float, bool]: plc = snap7.client.Client() try: plc.connect(config.ip, 0, 1) db_data = plc.db_read( config.db_number, config.air_offset, config.energy_offset + 10 ) # Read up to energy offset + 8 bytes air_value = get_ulint(db_data, config.air_offset) # TODO if (air_value < prev_air_val): update in plc energy_value = get_lreal(db_data, config.energy_offset) stauts_value = getattr(db_data, config.runstatus_offset) return air_value, energy_value, stauts_value finally: plc.disconnect()