quick snap test program
This commit is contained in:
parent
7872044226
commit
15f3e47aeb
131
classes.py
Normal file
131
classes.py
Normal file
@ -0,0 +1,131 @@
|
||||
import os
|
||||
import platform
|
||||
import socket
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import pyodbc
|
||||
import snap7
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# Entire scheduler config class
|
||||
class SchedulerConfig:
|
||||
interval: int
|
||||
next_read: datetime
|
||||
|
||||
|
||||
# PLC properties class (energy, air)
|
||||
class Plc:
|
||||
id: int
|
||||
name: str
|
||||
ip: str
|
||||
db_number: int # Where the data is stored in the PLC
|
||||
is_enabled: bool # True if PLC will be read from
|
||||
air_db_offset: int
|
||||
energy_db_offset: int
|
||||
state_db_offset: int
|
||||
location: str # H1 or H2
|
||||
last_energy_read: float
|
||||
last_air_read: float
|
||||
last_state_read: bool
|
||||
last_read_timestamp: datetime
|
||||
|
||||
def __init__(self, id: int, name: str, ip: str, db_number: int,
|
||||
air_db_offset: int, energy_db_offset: int,
|
||||
state_db_offset: int, location: str) -> None:
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.ip = ip
|
||||
self.db_number = db_number
|
||||
self.is_enabled = True # default value
|
||||
self.air_db_offset = air_db_offset
|
||||
self.energy_db_offset = energy_db_offset
|
||||
self.state_db_offset = state_db_offset
|
||||
self.location = location
|
||||
self.last_energy_read = 0.0
|
||||
self.last_air_read = 0.0
|
||||
self.last_state_read = False
|
||||
self.last_read_timestamp = None
|
||||
|
||||
|
||||
def check_connection_snap(self) -> bool:
|
||||
"""Check if the PLC is reachable."""
|
||||
try:
|
||||
client = snap7.client.Client()
|
||||
client.connect(self.ip, 0, 1)
|
||||
client.disconnect()
|
||||
return True
|
||||
except Exception as e:
|
||||
# logger.error(f"❌ Error checking PLC connection: {e}")
|
||||
return False
|
||||
|
||||
def check_connection_socket(self) -> bool:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.settimeout(1)
|
||||
return sock.connect_ex((self.ip, 102)) == 0
|
||||
|
||||
|
||||
# PAC properties class (energy)
|
||||
class Pac:
|
||||
id: int
|
||||
name: str
|
||||
ip: str
|
||||
port: int
|
||||
is_enabled: bool
|
||||
location: str # H1 or H2
|
||||
last_energy_read: float
|
||||
last_read_timestamp: datetime
|
||||
|
||||
|
||||
class Database:
|
||||
# Private class attributes (internal use)
|
||||
_sql_driver = (
|
||||
"ODBC Driver 18 for SQL Server"
|
||||
if platform.system() == "Linux"
|
||||
else "SQL Server"
|
||||
)
|
||||
_instance: Optional["Database"] = None
|
||||
_connection = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(Database, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
# Public methods (external interface)
|
||||
def __init__(self):
|
||||
if self._connection is None:
|
||||
self._connection = self._create_connection()
|
||||
|
||||
def execute_query(self, query: str, params: tuple = ()) -> pyodbc.Cursor:
|
||||
"""Execute SQL query and return cursor."""
|
||||
cursor = self.get_connection().cursor()
|
||||
cursor.execute(query, params)
|
||||
return cursor
|
||||
|
||||
def get_connection(self) -> pyodbc.Connection:
|
||||
"""Get database connection, create new if needed."""
|
||||
if not self._connection or not self._connection.connected:
|
||||
self._connection = self._create_connection()
|
||||
return self._connection
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close database connection."""
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
|
||||
# Private methods (internal use)
|
||||
def _create_connection(self) -> pyodbc.Connection:
|
||||
"""Create new database connection."""
|
||||
connection_string = (
|
||||
"DRIVER={SQL Server};"
|
||||
f"SERVER={os.getenv('DB_SERVER')};"
|
||||
f"DATABASE={os.getenv('DB_NAME')};"
|
||||
f"UID={os.getenv('DB_USER')};"
|
||||
f"PWD={os.getenv('DB_PASSWORD')}"
|
||||
)
|
||||
return pyodbc.connect(connection_string)
|
17
snap7_test.py
Normal file
17
snap7_test.py
Normal file
@ -0,0 +1,17 @@
|
||||
import time
|
||||
import snap7
|
||||
import pprint
|
||||
|
||||
LOOPS = 5
|
||||
counter = 0
|
||||
while counter < LOOPS:
|
||||
try:
|
||||
plc = snap7.client.Client()
|
||||
plc.connect("172.16.3.231", 0, 1)
|
||||
print(plc.get_cpu_state())
|
||||
except Exception as e:
|
||||
pprint.pprint(e)
|
||||
finally:
|
||||
plc.disconnect()
|
||||
counter += 1
|
||||
time.sleep(1)
|
Loading…
x
Reference in New Issue
Block a user