132 lines
3.8 KiB
Python

import os
import platform
import socket
from datetime import datetime
from typing import Optional
import pyodbc
import snap7
from dotenv import load_dotenv
load_dotenv()
# Entire scheduler config class
class SchedulerConfig:
interval: int
next_read: datetime
# PLC properties class (energy, air)
class Plc:
id: int
name: str
ip: str
db_number: int # Where the data is stored in the PLC
is_enabled: bool # True if PLC will be read from
air_db_offset: int
energy_db_offset: int
state_db_offset: int
location: str # H1 or H2
last_energy_read: float
last_air_read: float
last_state_read: bool
last_read_timestamp: datetime
def __init__(self, id: int, name: str, ip: str, db_number: int,
air_db_offset: int, energy_db_offset: int,
state_db_offset: int, location: str) -> None:
self.id = id
self.name = name
self.ip = ip
self.db_number = db_number
self.is_enabled = True # default value
self.air_db_offset = air_db_offset
self.energy_db_offset = energy_db_offset
self.state_db_offset = state_db_offset
self.location = location
self.last_energy_read = 0.0
self.last_air_read = 0.0
self.last_state_read = False
self.last_read_timestamp = None
def check_connection_snap(self) -> bool:
"""Check if the PLC is reachable."""
try:
client = snap7.client.Client()
client.connect(self.ip, 0, 1)
client.disconnect()
return True
except Exception as e:
# logger.error(f"❌ Error checking PLC connection: {e}")
return False
def check_connection_socket(self) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
return sock.connect_ex((self.ip, 102)) == 0
# PAC properties class (energy)
class Pac:
id: int
name: str
ip: str
port: int
is_enabled: bool
location: str # H1 or H2
last_energy_read: float
last_read_timestamp: datetime
class Database:
# Private class attributes (internal use)
_sql_driver = (
"ODBC Driver 18 for SQL Server"
if platform.system() == "Linux"
else "SQL Server"
)
_instance: Optional["Database"] = None
_connection = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Database, cls).__new__(cls)
return cls._instance
# Public methods (external interface)
def __init__(self):
if self._connection is None:
self._connection = self._create_connection()
def execute_query(self, query: str, params: tuple = ()) -> pyodbc.Cursor:
"""Execute SQL query and return cursor."""
cursor = self.get_connection().cursor()
cursor.execute(query, params)
return cursor
def get_connection(self) -> pyodbc.Connection:
"""Get database connection, create new if needed."""
if not self._connection or not self._connection.connected:
self._connection = self._create_connection()
return self._connection
def close(self) -> None:
"""Close database connection."""
if self._connection:
self._connection.close()
self._connection = None
# Private methods (internal use)
def _create_connection(self) -> pyodbc.Connection:
"""Create new database connection."""
connection_string = (
"DRIVER={SQL Server};"
f"SERVER={os.getenv('DB_SERVER')};"
f"DATABASE={os.getenv('DB_NAME')};"
f"UID={os.getenv('DB_USER')};"
f"PWD={os.getenv('DB_PASSWORD')}"
)
return pyodbc.connect(connection_string)