Key changes include: Improved error handling and logging in PLC data reading Added detailed debug information with pprint Restructured logging format for better readability Removed redundant docstrings Fixed PLC data reading logic with proper value extraction Added support for extra properties in logger setup Code cleanup and formatting improvements
134 lines
5.1 KiB
Python
134 lines
5.1 KiB
Python
import os
|
|
import platform
|
|
import pprint
|
|
import socket
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Tuple
|
|
|
|
import pyodbc
|
|
import snap7
|
|
from dotenv import load_dotenv
|
|
from snap7.util.getters import get_lreal, get_bool
|
|
load_dotenv()
|
|
# Determine the correct driver based on OS
|
|
SQL_DRIVER = (
|
|
"ODBC Driver 18 for SQL Server" if platform.system() == "Linux" else "SQL Server"
|
|
)
|
|
# Configuration
|
|
CONN_STR = (
|
|
f"Driver={{{SQL_DRIVER}}};"
|
|
f"Server={os.getenv('DB_SERVER')};"
|
|
f"Database={os.getenv('DB_NAME')};"
|
|
f"UID={os.getenv('DB_USER')};"
|
|
f"PWD={os.getenv('DB_PASSWORD')};"
|
|
"TrustServerCertificate=yes;"
|
|
)
|
|
|
|
@dataclass
|
|
class PlcConfig:
|
|
id: int
|
|
ip: str
|
|
db_number: int
|
|
is_enabled: bool
|
|
air_offset: int
|
|
energy_offset: int
|
|
location: str
|
|
last_energy_read: float
|
|
last_air_read: float
|
|
runstop_status_offset: int
|
|
|
|
@dataclass
|
|
class SchedulerConfig:
|
|
interval: int
|
|
next_read: datetime
|
|
|
|
class DatabaseManager:
|
|
def __init__(self, connection_string: str):
|
|
self.conn_str = connection_string
|
|
def get_plc_configs(self) -> List[PlcConfig]:
|
|
with pyodbc.connect(self.conn_str) as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT Id, Ip, DbNumber, IsEnable, AirDbOffset, EnergyDbOffset, Location, LastEnergyRead, LastAirRead, RunStopStatusDbOffset
|
|
FROM sch.Plc
|
|
WHERE IsEnable = 1
|
|
""")
|
|
return [PlcConfig(*row) for row in cursor.fetchall()]
|
|
def get_scheduler_config(self) -> SchedulerConfig:
|
|
with pyodbc.connect(self.conn_str) as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT Interval, NextRead
|
|
FROM sch.SchedulerParameters
|
|
WHERE Name = 'Python_Energy_Scheduler'
|
|
""")
|
|
row = cursor.fetchone()
|
|
return (
|
|
SchedulerConfig(row[0], row[1])
|
|
if row
|
|
else SchedulerConfig(30, datetime.now())
|
|
)
|
|
def save_energy_data(self, plc_id: int, energy: float, air: float, state: bool):
|
|
with pyodbc.connect(self.conn_str) as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO sch.Energy (Energy, PlcId, Air, State, CreatedAt)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(energy, plc_id, air, state, datetime.now()),
|
|
)
|
|
conn.commit()
|
|
def update_next_read(self, interval_seconds: int):
|
|
next_read = datetime.now() + timedelta(seconds=interval_seconds)
|
|
with pyodbc.connect(self.conn_str) as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
UPDATE sch.SchedulerParameters
|
|
SET NextRead = ?
|
|
WHERE Name = 'Python_Energy_Scheduler'
|
|
""",
|
|
(next_read,),
|
|
)
|
|
conn.commit()
|
|
|
|
class PlcManager:
|
|
def check_connection(self, ip: str, port: int = 102, timeout: int = 1) -> bool:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(timeout)
|
|
return sock.connect_ex((ip, port)) == 0
|
|
def read_plc_data(self, config: PlcConfig) -> Tuple[float, float, bool]:
|
|
plc = snap7.client.Client()
|
|
try:
|
|
plc.connect(config.ip, 0, 1)
|
|
data = plc.db_read(config.db_number, 0, config.runstop_status_offset) # Define range of bytes to read
|
|
energy_value = get_lreal(data, config.energy_offset) # Read energy value
|
|
air_value = get_lreal(data, config.air_offset) # Read air value
|
|
# run_status_value = get_bool(data, config.runstop_status_offset, 1) # Read run start value
|
|
return air_value, energy_value, True
|
|
except Exception as e:
|
|
pprint.pp(
|
|
{
|
|
"msg": "Failed to read PLC data",
|
|
"plc_config": {
|
|
"id": config.id,
|
|
"ip": config.ip,
|
|
"db_number": config.db_number,
|
|
"air_offset": config.air_offset,
|
|
"energy_offset": config.energy_offset,
|
|
"runstop_status_offset": config.runstop_status_offset,
|
|
"location": config.location,
|
|
"ene_val": energy_value,
|
|
"air_val": air_value,
|
|
"run_status_val": run_status_value,
|
|
"timestamp": datetime.now().isoformat()
|
|
},
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
)
|
|
raise
|
|
finally:
|
|
plc.disconnect() |