Compare commits

..

No commits in common. "TES" and "main" have entirely different histories.
TES ... main

14 changed files with 23 additions and 350 deletions

View file

@ -3,4 +3,3 @@ MODULE_REL4HVI4_LEVELS=01234567 #number of stack level (0-7) not separated
MODULE_INDUSTRIALAUTOMATION_LEVELS=01234567 #number of stack level (0-7) not separated MODULE_INDUSTRIALAUTOMATION_LEVELS=01234567 #number of stack level (0-7) not separated
VARIABLES_SAVE_DIR=variables_storage #path to dir for saving variables VARIABLES_SAVE_DIR=variables_storage #path to dir for saving variables
BACKGROUND_SCRIPTS_RUN=0 #1 - true/enable | 0 - false/disable BACKGROUND_SCRIPTS_RUN=0 #1 - true/enable | 0 - false/disable
DEBUG=1 #debug messages 1 - true/enable | 0 - false/disable

42
pdm.lock generated
View file

@ -3,9 +3,9 @@
[metadata] [metadata]
groups = ["default"] groups = ["default"]
strategy = ["inherit_metadata"] strategy = ["cross_platform", "inherit_metadata"]
lock_version = "4.4.1" lock_version = "4.4.1"
content_hash = "sha256:05e7b80dbe53b7565ff910459a8f1943128867d3cbe2a7a676f00c05b3002392" content_hash = "sha256:46b66a4e3fd5632663e1dc2463271c94fb141dc874868edc93adc853429d6466"
[[package]] [[package]]
name = "annotated-types" name = "annotated-types"
@ -73,19 +73,6 @@ files = [
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
] ]
[[package]]
name = "colorzero"
version = "2.0"
summary = "Yet another Python color library"
groups = ["default"]
dependencies = [
"setuptools",
]
files = [
{file = "colorzero-2.0-py2.py3-none-any.whl", hash = "sha256:0e60d743a6b8071498a56465f7719c96a5e92928f858bab1be2a0d606c9aa0f8"},
{file = "colorzero-2.0.tar.gz", hash = "sha256:e7d5a5c26cd0dc37b164ebefc609f388de24f8593b659191e12d85f8f9d5eb58"},
]
[[package]] [[package]]
name = "fastapi" name = "fastapi"
version = "0.115.5" version = "0.115.5"
@ -102,20 +89,6 @@ files = [
{file = "fastapi-0.115.5.tar.gz", hash = "sha256:0e7a4d0dc0d01c68df21887cce0945e72d3c48b9f4f79dfe7a7d53aa08fbb289"}, {file = "fastapi-0.115.5.tar.gz", hash = "sha256:0e7a4d0dc0d01c68df21887cce0945e72d3c48b9f4f79dfe7a7d53aa08fbb289"},
] ]
[[package]]
name = "gpiozero"
version = "2.0.1"
requires_python = ">=3.9"
summary = "A simple interface to GPIO devices with Raspberry Pi"
groups = ["default"]
dependencies = [
"colorzero",
]
files = [
{file = "gpiozero-2.0.1-py3-none-any.whl", hash = "sha256:8f621de357171d574c0b7ea0e358cb66e560818a47b0eeedf41ce1cdbd20c70b"},
{file = "gpiozero-2.0.1.tar.gz", hash = "sha256:d4ea1952689ec7e331f9d4ebc9adb15f1d01c2c9dcfabb72e752c9869ab7e97e"},
]
[[package]] [[package]]
name = "h11" name = "h11"
version = "0.14.0" version = "0.14.0"
@ -224,17 +197,6 @@ files = [
{file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"}, {file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"},
] ]
[[package]]
name = "setuptools"
version = "75.8.0"
requires_python = ">=3.9"
summary = "Easily download, build, install, upgrade, and uninstall Python packages"
groups = ["default"]
files = [
{file = "setuptools-75.8.0-py3-none-any.whl", hash = "sha256:e3982f444617239225d675215d51f6ba05f845d4eec313da4418fdbb56fb27e3"},
{file = "setuptools-75.8.0.tar.gz", hash = "sha256:c5afc8f407c626b8313a86e10311dd3f661c6cd9c09d4bf8c15c0e11f9f2b0e6"},
]
[[package]] [[package]]
name = "sm4relind" name = "sm4relind"
version = "1.0.3" version = "1.0.3"

View file

@ -13,7 +13,6 @@ dependencies = [
"python-dotenv>=1.0.1", "python-dotenv>=1.0.1",
"SMmegaind>=1.0.3", "SMmegaind>=1.0.3",
"apscheduler>=3.11.0", "apscheduler>=3.11.0",
"gpiozero>=2.0.1",
] ]
requires-python = ">=3.12" requires-python = ">=3.12"
readme = "README.md" readme = "README.md"

7
scripts/example.py Normal file
View file

@ -0,0 +1,7 @@
from src.variables.service import get_variables
#modules example import: from src.modules.RTD8.service import read_temp
def test_function():
print("Example script")
print(f"variables: {" | ".join(get_variables())}")

View file

@ -1,62 +0,0 @@
from src.variables.service import get_variable, set_variable
from src.modules.RTD8.service import read_temp
from src.modules.IndustrialAutomation.service_analog import set_0_10_out
fans = [
{
"inputs": [ #temperature inputs
{
"type": "RTD", #Card type
"stack": 1, #Card stack
"channel": 1 #Card channel
}
],
"temp_min": 30, #minimum temp for fan_min
"temp_max": 50, #maximum temp for fan_max
"fan_min": 1.1, #fan minimum 0-10V set
"fan_max": 10, #fan maximum 0-10V set
"output_stack": 0, #stack card
"output_channel": 2 #card channel
}
]
def get_temp(channel: dict) -> float:
match channel["type"]:
case "RTD":
return read_temp(stack=channel["stack"], channel=channel["channel"])
case _:
return 0
def max_temp(channels: list) -> float:
temp: float = 0
for channel in channels:
temp_ch = get_temp(channel=channel)
if temp < temp_ch:
temp = temp_ch
return temp
def fan_control():
for fan in fans:
temp = max_temp(fan["inputs"])
if temp > fan["temp_max"]:
set_0_10_out(stack=fan["output_stack"], channel=fan["output_channel"], value=fan["fan_max"])
elif temp < fan["temp_min"]:
set_0_10_out(stack=fan["output_stack"], channel=fan["output_channel"], value=fan["fan_min"])
else:
temp_calc = temp - fan["temp_min"]
temp_max_calc = fan["temp_max"] - fan["temp_min"]
calc_out = round(temp_calc / temp_max_calc, 4) * 10
set_0_10_out(stack=fan["output_stack"], channel=fan["output_channel"], value=calc_out)

View file

@ -1,131 +0,0 @@
from src.variables.service import get_variable, set_variable
from src.modules.IndustrialAutomation.service_analog import read_0_10_out, set_0_10_out
import os
#variables names
variable_control_channel: str = "speed_controller_channel"
variable_control_stack: str = "speed_controller_stack"
#variables rpm
variable_actual_rpm: str = "actual_rpm"
variable_requested_rpm: str = "requested_rpm"
#variables limits
variable_max_rpm: str = "max_rpm"
variable_max_U: str = "max_U"
#variables controling step
variable_control_step_U: str = "control_step_U"
variable_control_step_rpm: str = "control_step_rpm"
variable_control_step_U_1: str = "control_step_U_1"
variable_control_step_rpm_1: str = "control_step_rpm_1"
variable_control_step_U_2: str = "control_step_U_2"
variable_control_step_rpm_2: str = "control_step_rpm_2"
variable_control_blocking: str = "control_blocking"
#init variables in variables system
if get_variable(variable_actual_rpm) == None:
set_variable(variable_actual_rpm, 0, True)
if get_variable(variable_requested_rpm) == None or int(get_variable(variable_requested_rpm)) != 0:
set_variable(variable_requested_rpm, 0, True)
if get_variable(variable_max_rpm) == None:
set_variable(variable_max_rpm, 1900, True)
if get_variable(variable_max_U) == None:
set_variable(variable_max_U, 8, True)
if get_variable(variable_control_step_U) == None:
set_variable(variable_control_step_U, 0.1, True)
if get_variable(variable_control_step_rpm) == None:
set_variable(variable_control_step_rpm, 200, False)
if get_variable(variable_control_step_U_1) == None:
set_variable(variable_control_step_U_1, 0.01, False)
if get_variable(variable_control_step_rpm_1) == None:
set_variable(variable_control_step_rpm_1, 100, False)
if get_variable(variable_control_step_U_2) == None:
set_variable(variable_control_step_U_2, 0.001, False)
if get_variable(variable_control_step_rpm_2) == None:
set_variable(variable_control_step_rpm_2, 5, False)
if get_variable(variable_control_stack) == None:
set_variable(variable_control_stack, 0, True)
if get_variable(variable_control_channel) == None:
set_variable(variable_control_channel, 1, True)
if get_variable(variable_control_blocking) == None:
set_variable(variable_control_blocking, 1, True)
# set rpm control output
def set_output(value: float):
if value < float(get_variable(variable_max_U)):
set_0_10_out(stack=int(get_variable(variable_control_stack)), channel=int(get_variable(variable_control_channel)), value=value)
def printer(text):
if bool(os.getenv("DEBUG")):
print(text)
#control RPM
def rpm_control():
if float(get_variable(variable_control_blocking)) == 1:
print("speed control blocked")
return
actual_rpm = float(get_variable(variable_actual_rpm))
requested_rpm = float(get_variable(variable_requested_rpm))
actual_set_U = read_0_10_out(stack=int(get_variable(variable_control_stack)), channel=int(get_variable(variable_control_channel)))
#slow down rotating if over maximum
if actual_rpm > float(get_variable(variable_max_rpm)):
set_output(actual_set_U - 0.1)
print("max_rpm_reached")
return
if requested_rpm == 0:
set_output(0)
print("control_set: 0")
return
if actual_set_U > float(get_variable(variable_max_U)):
set_output(float(get_variable(variable_max_U)))
print("Maximum out voltage")
return
#algorithm for RPM control
if (requested_rpm - float(get_variable(variable_control_step_rpm))) > actual_rpm:
set_output(actual_set_U + float(get_variable(variable_control_step_U)))
printer("RPM control 1 +")
return
elif (requested_rpm + float(get_variable(variable_control_step_rpm))) < actual_rpm:
set_output(actual_set_U - float(get_variable(variable_control_step_U)))
printer("RPM control 1 -")
return
if (requested_rpm - float(get_variable(variable_control_step_rpm_1))) > actual_rpm:
set_output(actual_set_U + float(get_variable(variable_control_step_U_1)))
printer("RPM control 2 +")
return
elif (requested_rpm + float(get_variable(variable_control_step_rpm_1))) < actual_rpm:
set_output(actual_set_U - float(get_variable(variable_control_step_U_1)))
printer("RPM control 2 -")
return
if (requested_rpm - float(get_variable(variable_control_step_rpm_2))) > actual_rpm:
set_output(actual_set_U + float(get_variable(variable_control_step_U_2)))
printer("RPM control 3 +")
return
elif (requested_rpm + float(get_variable(variable_control_step_rpm_2))) < actual_rpm:
set_output(actual_set_U - float(get_variable(variable_control_step_U_2)))
printer("RPM control 3 -")
return
print("RPM control any change")

View file

@ -1,48 +0,0 @@
from src.variables.service import get_variable, set_variable
from gpiozero import Button
import datetime, os, json
pulses_per_rotation = 8
reader = Button(4, pull_up=False, bounce_time=0.0005)
set_variable("speed_reader_last_impulse_time", datetime.datetime.now(), False)
#set_variable("actual_rpm_unfiltered", 0, False)
set_variable("rpm_filter", "[0]", False)
def printer(text):
if bool(os.getenv("DEBUG")):
print(text)
def event():
actual = datetime.datetime.now()
last = get_variable("speed_reader_last_impulse_time")
difference = (actual - last).total_seconds()
set_variable("speed_reader_last_impulse_time", actual, False)
rpm = (60 / difference) / pulses_per_rotation
#actual_rpm = (float(get_variable("actual_rpm_unfiltered")) + rpm) / 2
#set_variable("actual_rpm_unfiltered", rpm, False)
rpm_filter = json.loads(get_variable("rpm_filter"))
printer(rpm_filter)
if len(rpm_filter) > 8:
rpm_filter.remove(rpm_filter[0])
rpm_filter.append(rpm)
set_variable("rpm_filter", json.dumps(rpm_filter), False)
average = 0
for i in rpm_filter:
average = average + i
actual_rpm = average / len(rpm_filter)
set_variable("actual_rpm", round(actual_rpm, 1), False)
printer("Measured RPM: {}".format(round(actual_rpm, 1)))
reader.when_pressed = event

View file

@ -1,45 +0,0 @@
from src.variables.service import get_variable, set_variable
from gpiozero import Button
import datetime, os, json
pulses_per_rotation = 8
reader = Button(4, pull_up=False, bounce_time=0.0005)
set_variable("rpm_pulses_counter", "[]", False)
set_variable("rpm_time_span_sec", 1, False)
def printer(text):
if bool(os.getenv("DEBUG")):
print(text)
def event():
actual = datetime.datetime.now()
printer("Impulse readed")
rpm_pulses_counter = [datetime.datetime.fromisoformat(t) for t in json.loads(get_variable("rpm_pulses_counter"))]
printer(rpm_pulses_counter)
rpm_pulses_counter.append(actual)
set_variable("rpm_pulses_counter", json.dumps([t.isoformat() for t in rpm_pulses_counter]), False)
rpm_time_span_sec = int(get_variable("rpm_time_span_sec"))
pulses = len([t for t in rpm_pulses_counter if (actual - t).total_seconds() <= rpm_time_span_sec])
actual_rpm = (pulses / pulses_per_rotation) * (60 / rpm_time_span_sec)
set_variable("actual_rpm", round(actual_rpm, 1), False)
printer("Measured RPM: {}".format(round(actual_rpm, 1)))
def remover():
actual = datetime.datetime.now()
rpm_pulses_counter = [datetime.datetime.fromisoformat(t) for t in json.loads(get_variable("rpm_pulses_counter"))]
rpm_time_span_sec = int(get_variable("rpm_time_span_sec")) + 1
printer("cleaning event")
rpm_pulses_counter = [t for t in rpm_pulses_counter if (actual - t).total_seconds() <= rpm_time_span_sec]
set_variable("rpm_pulses_counter", json.dumps([t.isoformat() for t in rpm_pulses_counter]), False)
reader.when_pressed = event

View file

@ -4,21 +4,14 @@ from apscheduler.triggers.cron import CronTrigger
import os import os
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
if bool(os.getenv("BACKGROUND_SCRIPTS_RUN", 0)): if bool(os.getenv("BACKGROUND_SCRIPTS_RUN", 0)):
print("run") print("run")
scheduler.start() scheduler.start()
#control speed #example
from scripts.speed_controller import rpm_control from scripts.example import test_function
scheduler.add_job(rpm_control, trigger=IntervalTrigger(seconds=0.5)) scheduler.add_job(test_function, trigger=IntervalTrigger(seconds=60))
#end example
#control fans
from scripts.fan_controller import fan_control
scheduler.add_job(fan_control, trigger=IntervalTrigger(minutes=1))
#speed_reader
import scripts.speed_reader_2
from scripts.speed_reader_2 import remover
scheduler.add_job(remover, trigger=IntervalTrigger(seconds=1))

View file

@ -1,4 +1,4 @@
import os, asyncio import os
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
@ -13,6 +13,5 @@ import scripts.timer
app = FastAPI(root_path="/api") app = FastAPI(root_path="/api")
app.include_router(router=modulesrouter) app.include_router(router=modulesrouter)
app.include_router(router=variablesrouter) app.include_router(router=variablesrouter)

View file

@ -19,9 +19,9 @@ def get_0_10V_out_all():
def get_0_10V_out_stack(stack: int): def get_0_10V_out_stack(stack: int):
return analogService.read_0_10_out_stack(stack=stack) return analogService.read_0_10_out_stack(stack=stack)
@analog.get("/output/0-10V/{stack}/{channel}", description="Read set value from selected 0-10V output", response_model=float) @analog.get("/output/0-10V/{stack}/{channel}", description="Read set value from selected 0-10V output", response_model=FloatValueModel)
def get_0_10V_out(stack: int, channel: int): def get_0_10V_out(stack: int, channel: int):
return analogService.read_0_10_out(stack=stack, channel=channel) return FloatValueModel(value=analogService.read_0_10_out(stack=stack, channel=channel))
@analog.post("/output/0-10V/all", description="Set output value to all 0-10V outputs", response_model=Dict[int, Dict[int, float]]) @analog.post("/output/0-10V/all", description="Set output value to all 0-10V outputs", response_model=Dict[int, Dict[int, float]])

View file

@ -19,7 +19,7 @@ def get_stack_celsius(stack: int):
@router.get("/temp/{stack}/{channel}", summary="Read specified RTD8 card value of specified channel in Celsius", response_model=float) @router.get("/temp/{stack}/{channel}", summary="Read specified RTD8 card value of specified channel in Celsius", response_model=float)
def get_channel_celsius(stack: int, channel: int): def get_channel_celsius(stack: int, channel: int):
return service.read_temp(stack, channel) return FloatValueModel(service=service.read_temp(stack, channel))

View file

@ -13,7 +13,7 @@ def set_variable(variable: str, params: SetVariable):
@router.get("/{variable}", description="Get variable data", response_model=str) @router.get("/{variable}", description="Get variable data", response_model=str)
def get_variable(variable: str): def get_variable(variable: str):
try: try:
return str(service.get_variable(variable=variable)) return service.get_variable(variable=variable)
except: except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

View file

@ -17,7 +17,7 @@ def __create_file_path(variable: str):
default_path: str = os.getenv("VARIABLES_SAVE_DIR", "variables_storage") default_path: str = os.getenv("VARIABLES_SAVE_DIR", "variables_storage")
if not os.path.exists(default_path): if not os.path.exists(default_path):
os.makedirs(default_path) os.makedirs(default_path)
return "{}/{}".format(default_path, re.sub(r"[^\w\-_.]", "_", variable)) return f"{default_path}/{re.sub(r'[^\w\-_.]', '_', variable)}"
def set_variable(variable: str, data, default: bool): def set_variable(variable: str, data, default: bool):