Compare commits
2 commits
5c1e3d3fac
...
4dd870f755
Author | SHA1 | Date | |
---|---|---|---|
4dd870f755 | |||
5ec90c314f |
16 changed files with 663 additions and 19 deletions
|
@ -1 +1,3 @@
|
|||
MODULE_RTD8_LEVELS=01234567 #number of stack level (0-7) not separated
|
||||
MODULE_RTD8_LEVELS=01234567 #number of stack level (0-7) not separated
|
||||
MODULE_REL4HVI4_LEVELS=01234567 #number of stack level (0-7) not separated
|
||||
MODULE_INDUSTRIALAUTOMATION_LEVELS=01234567 #number of stack level (0-7) not separated
|
19
README.md
19
README.md
|
@ -1,3 +1,20 @@
|
|||
# SequentMicrosystems_API
|
||||
|
||||
API for sequent microsystems boards running on RaspberryPI
|
||||
API for sequent microsystems boards running on RaspberryPI
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
48
pdm.lock
generated
48
pdm.lock
generated
|
@ -5,7 +5,7 @@
|
|||
groups = ["default"]
|
||||
strategy = ["cross_platform", "inherit_metadata"]
|
||||
lock_version = "4.5.0"
|
||||
content_hash = "sha256:f4f1e2189c6ffd6d794a2407d788e6716784556dfe8717566ece8e4c48e157e7"
|
||||
content_hash = "sha256:b833a7bb8997b54c2b61879638963fef2bf4d8265bc9f0cd0d381cb8af3dd6b1"
|
||||
|
||||
[[metadata.targets]]
|
||||
requires_python = ">=3.12"
|
||||
|
@ -68,15 +68,6 @@ files = [
|
|||
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dotenv"
|
||||
version = "0.0.5"
|
||||
summary = "Handle .env files"
|
||||
groups = ["default"]
|
||||
files = [
|
||||
{file = "dotenv-0.0.5.tar.gz", hash = "sha256:b58d2ab3f83dbd4f8a362b21158a606bee87317a9444485566b3c8f0af847091"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastapi"
|
||||
version = "0.115.5"
|
||||
|
@ -193,6 +184,30 @@ files = [
|
|||
{file = "pydantic_core-2.27.0.tar.gz", hash = "sha256:f57783fbaf648205ac50ae7d646f27582fc706be3977e87c3c124e7a92407b10"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.0.1"
|
||||
requires_python = ">=3.8"
|
||||
summary = "Read key-value pairs from a .env file and set them as environment variables"
|
||||
groups = ["default"]
|
||||
files = [
|
||||
{file = "python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca"},
|
||||
{file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sm4relind"
|
||||
version = "1.0.3"
|
||||
summary = "A set of functions to control Sequent Microsystems 4-Relay board"
|
||||
groups = ["default"]
|
||||
dependencies = [
|
||||
"smbus2",
|
||||
]
|
||||
files = [
|
||||
{file = "SM4relind-1.0.3-py2.py3-none-any.whl", hash = "sha256:1308f580d05893f481b79c055dfb5572bbce992777edeee6292c91c707a73a55"},
|
||||
{file = "sm4relind-1.0.3.tar.gz", hash = "sha256:aa1c464f6344be63f8a84fb7e51d9c683f73f696cb8fd43cb0f1171b851f7d6b"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smbus2"
|
||||
version = "0.5.0"
|
||||
|
@ -203,6 +218,19 @@ files = [
|
|||
{file = "smbus2-0.5.0.tar.gz", hash = "sha256:4a5946fd82277870c2878befdb1a29bb28d15cda14ea4d8d2d54cf3d4bdcb035"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smmegaind"
|
||||
version = "1.0.3"
|
||||
summary = "Library to control PLACEHOLDER Automation Card"
|
||||
groups = ["default"]
|
||||
dependencies = [
|
||||
"smbus2",
|
||||
]
|
||||
files = [
|
||||
{file = "SMmegaind-1.0.3-py2.py3-none-any.whl", hash = "sha256:eed7af4ff15605474596805d8569a8b1d458c1716a1be545e2f8f23b6f94b1c7"},
|
||||
{file = "SMmegaind-1.0.3.tar.gz", hash = "sha256:e9dd339bc710d569d82148683b5a0938d417cb61fab7b2c427f3a62bc68db4d4"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smrtd"
|
||||
version = "1.0.1"
|
||||
|
|
|
@ -8,8 +8,10 @@ authors = [
|
|||
dependencies = [
|
||||
"fastapi>=0.115.5",
|
||||
"uvicorn>=0.32.1",
|
||||
"dotenv>=0.0.5",
|
||||
"SMrtd>=1.0.1",
|
||||
"SM4relind>=1.0.3",
|
||||
"python-dotenv>=1.0.1",
|
||||
"SMmegaind>=1.0.3",
|
||||
]
|
||||
requires-python = ">=3.12"
|
||||
readme = "README.md"
|
||||
|
|
|
@ -1,7 +1,14 @@
|
|||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
from fastapi import FastAPI
|
||||
from src.modules.router import router as modulesrouter
|
||||
|
||||
|
||||
|
||||
app = FastAPI(root_path="/api")
|
||||
|
||||
app.include_router(router=modulesrouter)
|
||||
|
||||
|
|
1
src/modules/IndustrialAutomation/readme.md
Normal file
1
src/modules/IndustrialAutomation/readme.md
Normal file
|
@ -0,0 +1 @@
|
|||
Card: Industrial Automation 8-Layer Stackable HAT for Raspberry Pi - https://sequentmicrosystems.com/collections/industrial-automation/products/industrial-raspberry-pi
|
162
src/modules/IndustrialAutomation/router.py
Normal file
162
src/modules/IndustrialAutomation/router.py
Normal file
|
@ -0,0 +1,162 @@
|
|||
from fastapi import APIRouter
|
||||
from typing import Dict
|
||||
from src.modules.IndustrialAutomation import service_analog as analogService
|
||||
from src.modules.IndustrialAutomation import service_digital as digitalService
|
||||
from src.modules.universal_models.value import FloatValueModel, BoolValueModel
|
||||
|
||||
router = APIRouter(prefix="/industrialAutomation", tags=["Modules --> Industrial Automation Hat"])
|
||||
|
||||
|
||||
#analog
|
||||
analog = APIRouter(prefix="/analog")
|
||||
|
||||
#0-10 V output
|
||||
@analog.get("/output/0-10V/all", description="Read all setted 0-10V outputs value", response_model=Dict[int, Dict[int, float]])
|
||||
def get_0_10V_out_all():
|
||||
return analogService.read_0_10_out_all()
|
||||
|
||||
@analog.get("/output/0-10V/{stack}", description="Read setted 0-10V output values on specified stack", response_model=Dict[int, float])
|
||||
def get_0_10V_out_stack(stack: int):
|
||||
return analogService.read_0_10_out_stack(stack=stack)
|
||||
|
||||
@analog.get("/output/0-10V/{stack}/{channel}", description="Read set value from selected 0-10V output", response_model=FloatValueModel)
|
||||
def get_0_10V_out(stack: int, channel: int):
|
||||
return FloatValueModel(value=analogService.read_0_10_out(stack=stack, channel=channel))
|
||||
|
||||
|
||||
@analog.post("/output/0-10V/all", description="Set output value to all 0-10V outputs", response_model=Dict[int, Dict[int, float]])
|
||||
def post_0_10V_out_all(output: FloatValueModel):
|
||||
analogService.set_0_10_out_all(value=output.value)
|
||||
return analogService.read_0_10_out_all()
|
||||
|
||||
@analog.post("/output/0-10V/{stack}", description="Set output value to all 0-10V output on specified card", response_model=Dict[int, float])
|
||||
def post_0_10V_out_stack(stack: int, output: FloatValueModel):
|
||||
analogService.set_0_10_out_stack(stack=stack, value=output.value)
|
||||
return analogService.read_0_10_out_stack(stack=stack)
|
||||
|
||||
@analog.post("/output/0-10V/{stack}/{channel}", description="Set output value to specified 0-10V output", response_model=FloatValueModel)
|
||||
def post_0_10V_out(stack: int, channel: int, output: FloatValueModel):
|
||||
analogService.set_0_10_out(stack=stack, channel=channel, value=output.value)
|
||||
return FloatValueModel(value=analogService.read_0_10_out(stack=stack, channel=channel))
|
||||
|
||||
|
||||
#0-10V input
|
||||
@analog.get("/input/0-10V/all", description="Read all 0-10V inputs value", response_model=Dict[int, Dict[int, float]])
|
||||
def get_0_10V_in_all():
|
||||
return analogService.read_0_10_in_all()
|
||||
|
||||
@analog.get("/input/0-10V/{stack}", description="Read all 0-10V inputs on specified card", response_model=Dict[int, float])
|
||||
def get_0_10V_in_stack(stack: int):
|
||||
return analogService.read_0_10_in_stack(stack=stack)
|
||||
|
||||
@analog.get("/input/0-10V/{stack}/{channel}", description="Read specified 0-10V input", response_model=FloatValueModel)
|
||||
def get_0_10V_in(stack: int, channel: int):
|
||||
return FloatValueModel(value=analogService.read_0_10_in(stack=stack, channel=channel))
|
||||
|
||||
|
||||
#4-20mA output
|
||||
@analog.get("/output/4-20mA/all", description="Read all setted 4-20mA output values", response_model=Dict[int, Dict[int, float]])
|
||||
def get_4_20mA_out_all():
|
||||
return analogService.read_4_20_out_all()
|
||||
|
||||
@analog.get("/output/4-20mA/{stack}", description="Read all setted 4-20mA output values of specified card", response_model=Dict[int, float])
|
||||
def get_4_20mA_out_stack(stack: int):
|
||||
return analogService.read_4_20_out_stack(stack=stack)
|
||||
|
||||
@analog.get("/output/4-20mA/{stack}/{channel}", description="Read setted 4-20mA output of specified channel", response_model=FloatValueModel)
|
||||
def get_4_20mA_out(stack: int, channel: int):
|
||||
return analogService.read_4_20_out(stack=stack, channel=channel)
|
||||
|
||||
|
||||
@analog.post("/output/4-20mA/all", description="Set output value to all 4-20mA outputs", response_model=Dict[int, Dict[int, float]])
|
||||
def post_4_20mA_out_all(output: FloatValueModel):
|
||||
analogService.set_4_20_out_all(value=output.value)
|
||||
return analogService.read_4_20_out_all()
|
||||
|
||||
@analog.post("/output/4-20mA/{stack}", description="Set output value of 4-20mA outputs on specified card", response_model=Dict[int, float])
|
||||
def post_4_20mA_out_stack(stack: int, output: FloatValueModel):
|
||||
analogService.set_4_20_out_stack(stack=stack, value=output.value)
|
||||
return analogService.read_4_20_out_stack(stack=stack)
|
||||
|
||||
@analog.post("/output/4-20mA/{stack}/{channel}", description="Set output value of specified 4-20mA output", response_model=FloatValueModel)
|
||||
def post_4_20mA_out(stack: int, channel: int, output: FloatValueModel):
|
||||
analogService.set_4_20_out(stack=stack, channel=channel, value=output.value)
|
||||
return FloatValueModel(value=analogService.read_4_20_out(stack=stack, channel=channel))
|
||||
|
||||
|
||||
#4-20mA input
|
||||
@analog.get("/input/4-20mA/all", description="Read all 4-20mA input values", response_model=Dict[int, Dict[int, float]])
|
||||
def get_4_20mA_out_all():
|
||||
return analogService.read_4_20_in_all()
|
||||
|
||||
@analog.get("/input/4-20mA/{stack}", description="Read all 4-20mA input values of specified card", response_model=Dict[int, float])
|
||||
def get_4_20mA_out_stack(stack: int):
|
||||
return analogService.read_4_20_in_stack(stack=stack)
|
||||
|
||||
@analog.get("/input/4-20mA/{stack}/{channel}", description="Read 4-20mA input of specified channel", response_model=FloatValueModel)
|
||||
def get_4_20mA_out(stack: int, channel: int):
|
||||
return analogService.read_4_20_in(stack=stack, channel=channel)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#digital
|
||||
digital = APIRouter(prefix="/digital")
|
||||
|
||||
#opto inputs
|
||||
@digital.get("/input/opto/all", description="Read all opto isolated inputs", response_model=Dict[int, Dict[int, bool]])
|
||||
def get_opto_in_all():
|
||||
return digitalService.read_opto_all()
|
||||
|
||||
@digital.get("/input/opto/{stack}", description="Read all opto isolated inputs on specified card", response_model=Dict[int, bool])
|
||||
def get_opto_in_stack(stack: int):
|
||||
return digitalService.read_opto_stack(stack=stack)
|
||||
|
||||
@digital.get("/input/opto/{stack}/{channel}", description="Read selected opto isolated input", response_model=BoolValueModel)
|
||||
def get_opto_in(stack: int, channel:int):
|
||||
return BoolValueModel(value=digitalService.read_opto(stack=stack, channel=channel))
|
||||
|
||||
|
||||
#open Drain
|
||||
@digital.get("/output/OD/all", description="Read open drain PWM freq in percentage from all outputs", response_model=Dict[int, Dict[int, float]])
|
||||
def get_od_all():
|
||||
return digitalService.read_od_pwm_all()
|
||||
|
||||
@digital.get("/output/OD/{stack}", description="Read open dran PWM freq in percentage tfrom all outputs on selected card", response_model=Dict[int, float])
|
||||
def get_od_stack(stack: int):
|
||||
return digitalService.read_od_pwm_stack(stack=stack)
|
||||
|
||||
@digital.get("/output/OD/{stack}/{channel}", description="Read open drain freq in percentage from selected output", response_model=FloatValueModel)
|
||||
def get_od(stack: int, channel: int):
|
||||
return FloatValueModel(value=digitalService.read_od_pwm(stack=stack, channel=channel))
|
||||
|
||||
|
||||
@digital.post("/output/OD/all", description="Set open drain PWM freq in percentage to all outputs", response_model=Dict[int, Dict[int, float]])
|
||||
def post_od_all(output: FloatValueModel):
|
||||
digitalService.set_od_pwm_all(value=output.value)
|
||||
return digitalService.read_od_pwm_all()
|
||||
|
||||
@digital.post("/output/OD/{stack}", description="Set open drain freq in percentage to all outputs on selected card", response_model=Dict[int, float])
|
||||
def post_od_stack(stack: int, output: FloatValueModel):
|
||||
digitalService.set_od_pwm_stack(stack=stack, value=output.value)
|
||||
return digitalService.read_od_pwm_stack(stack=stack)
|
||||
|
||||
@digital.post("/output/OD/{stack}/{channel}", description="Set open drain freq in percentage to selected channel", response_model=FloatValueModel)
|
||||
def post_od(stack: int, channel: int, output: FloatValueModel):
|
||||
digitalService.set_od_pwm(stack=stack, channel=channel, value=output.value)
|
||||
return FloatValueModel(value=digitalService.read_od_pwm(stack=stack, channel=channel))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
router.include_router(router=analog)
|
||||
router.include_router(router=digital)
|
166
src/modules/IndustrialAutomation/service_analog.py
Normal file
166
src/modules/IndustrialAutomation/service_analog.py
Normal file
|
@ -0,0 +1,166 @@
|
|||
import os, megaind
|
||||
|
||||
|
||||
stacks = [int(h) for h in os.getenv("MODULE_INDUSTRIALAUTOMATION_LEVELS", "")]
|
||||
if not stacks:
|
||||
print("MODULE_INDUSTRIALAUTOMATION_LEVELS is not set or invalid")
|
||||
|
||||
for i in stacks:
|
||||
print(f"Configured IndustrialAutomation card: {i}")
|
||||
|
||||
|
||||
|
||||
def read_0_10_out(stack: int, channel: int) -> float:
|
||||
"""
|
||||
Read set value in selected 0-10V output
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
try:
|
||||
return megaind.get0_10Out(stack=stack, channel=channel)
|
||||
except:
|
||||
return -512
|
||||
return -512
|
||||
|
||||
def read_0_10_out_stack(stack: int) -> dict[int, float]:
|
||||
"""
|
||||
Read set values in 0-10V output on selected card
|
||||
"""
|
||||
return {channel: read_0_10_out(stack=stack, channel=channel) for channel in range(1, 5)}
|
||||
|
||||
def read_0_10_out_all() -> dict[int, dict[int, float]]:
|
||||
"""
|
||||
Read set values in 0-10V outputs on all cards
|
||||
"""
|
||||
return {stack: read_0_10_out_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
def set_0_10_out(stack: int, channel: int, value: float):
|
||||
"""
|
||||
Set output voltage of 0-10V output to specified output
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
if value > 10:
|
||||
value = 10
|
||||
elif value < 0:
|
||||
value = 0
|
||||
|
||||
try:
|
||||
megaind.set0_10Out(stack=stack, channel=channel, value=value)
|
||||
except:
|
||||
print("error set 0-10V output value")
|
||||
|
||||
def set_0_10_out_stack(stack: int, value: float):
|
||||
"""
|
||||
Set output voltage of 0-10V output on specified card
|
||||
"""
|
||||
(set_0_10_out(stack=stack, channel=channel, value=value) for channel in range(1, 5))
|
||||
|
||||
def set_0_10_out_all(value: float):
|
||||
"""
|
||||
Set output voltage of 0-10V output on all card
|
||||
"""
|
||||
(set_0_10_out_stack(stack=stack, value=value) for stack in stacks)
|
||||
|
||||
|
||||
|
||||
def read_0_10_in(stack: int, channel: int) -> float:
|
||||
"""
|
||||
Read value in selected 0-10V input
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
try:
|
||||
return megaind.get0_10In(stack=stack, channel=channel)
|
||||
except:
|
||||
return -512
|
||||
return -512
|
||||
|
||||
def read_0_10_in_stack(stack: int) -> dict[int, float]:
|
||||
"""
|
||||
Read values in 0-10V input on selected card
|
||||
"""
|
||||
return {channel: read_0_10_in(stack=stack, channel=channel) for channel in range(1, 5)}
|
||||
|
||||
def read_0_10_in_all() -> dict[int, dict[int, float]]:
|
||||
"""
|
||||
Read values in 0-10V inputs on all cards
|
||||
"""
|
||||
return {stack: read_0_10_in_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
|
||||
def read_4_20_in(stack: int, channel: int) -> float:
|
||||
"""
|
||||
Read values from specified 4-20mA input
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
try:
|
||||
return megaind.get4_20In(stack=stack, channel=channel)
|
||||
except:
|
||||
return -512
|
||||
return -512
|
||||
|
||||
def read_4_20_in_stack(stack: int) -> dict[int, float]:
|
||||
"""
|
||||
Read values from all 4-20mA inputs on specified card
|
||||
"""
|
||||
return {channel: read_4_20_in(stack=stack, channel=channel) for channel in range(1, 5)}
|
||||
|
||||
def read_4_20_in_all() -> dict[int, dict[int, float]]:
|
||||
"""
|
||||
Read values from all 4-20mA inputs
|
||||
"""
|
||||
return {stack: read_4_20_in_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
|
||||
def read_4_20_out(stack: int, channel: int) -> float:
|
||||
"""
|
||||
Read setted value from specified 4-20mA output
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
try:
|
||||
return megaind.get4_20Out(stack=stack, channel=channel)
|
||||
except:
|
||||
return -512
|
||||
return -512
|
||||
|
||||
def read_4_20_out_stack(stack: int) -> dict[int, float]:
|
||||
"""
|
||||
Read setted values from all 4-20mA outputs on specified card
|
||||
"""
|
||||
return {channel: read_4_20_out(stack=stack, channel=channel) for channel in range(1, 5)}
|
||||
|
||||
def read_4_20_out_all() -> dict[int, dict[int, float]]:
|
||||
"""
|
||||
Read setted values from all 4-20mA outputs
|
||||
"""
|
||||
return {stack: read_4_20_out_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
def set_4_20_out(stack: int, channel: int, value: float):
|
||||
"""
|
||||
Set output value for specified 4-20mA output
|
||||
"""
|
||||
if stack in stacks and 1<= channel <= 4:
|
||||
if value > 20:
|
||||
value = 20
|
||||
elif value < 4:
|
||||
value = 4
|
||||
|
||||
try:
|
||||
megaind.set4_20Out(stack=stack, channel=channel, value=value)
|
||||
except:
|
||||
print("Error set 4-20mA output")
|
||||
|
||||
def set_4_20_out_stack(stack: int, value: float):
|
||||
"""
|
||||
Set output value to all 4-20mA on specified card
|
||||
"""
|
||||
(set_4_20_out(stack=stack, channel=channel, value=value) for channel in range(1, 5))
|
||||
|
||||
def set_4_20_out_all(value: float):
|
||||
"""
|
||||
Set output value to all for 4-20mA outputs
|
||||
"""
|
||||
(set_4_20_out_stack(stack=stack, value=value) for stack in stacks)
|
||||
|
104
src/modules/IndustrialAutomation/service_digital.py
Normal file
104
src/modules/IndustrialAutomation/service_digital.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import os, megaind
|
||||
|
||||
|
||||
|
||||
stacks = [int(h) for h in os.getenv("MODULE_INDUSTRIALAUTOMATION_LEVELS", "")]
|
||||
if not stacks:
|
||||
print("MODULE_INDUSTRIALAUTOMATION_LEVELS is not set or invalid")
|
||||
|
||||
for i in stacks:
|
||||
print(f"Configured IndustrialAutomation card: {i}")
|
||||
|
||||
|
||||
|
||||
def read_opto(stack: int, opto: int):
|
||||
"""
|
||||
Read specified opto input state
|
||||
"""
|
||||
if stack in stacks and 1 <= opto <= 4:
|
||||
try:
|
||||
if megaind.getOptoCh(stack=stack, channel=opto) == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def read_opto_stack(stack: int):
|
||||
"""
|
||||
Read all opto inputs state on specified stack/card
|
||||
"""
|
||||
return {opto: read_opto(stack=stack, opto=opto) for opto in range(1, 5)}
|
||||
|
||||
def read_opto_all():
|
||||
"""
|
||||
Read all opto inputs state
|
||||
"""
|
||||
return {stack: read_opto_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
|
||||
def set_od_pwm(stack: int, channel: int, value: float):
|
||||
"""
|
||||
Set specified OpenDrain PWM percentage
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
if value < 0:
|
||||
value = 0
|
||||
elif value > 100:
|
||||
value = 100
|
||||
|
||||
megaind.setOdPWM(stack=stack, channel=channel, value=value)
|
||||
|
||||
def set_od_pwm_stack(stack: int, value: bool):
|
||||
"""
|
||||
Set all OpenDrain PWM percentage on specified stack/card
|
||||
"""
|
||||
(set_od_pwm(stack=stack, channel=ch, value=value) for ch in range(1, 5))
|
||||
|
||||
def set_od_pwm_all(value: bool):
|
||||
"""
|
||||
Set all OpenDrain PWM percentage to requested state
|
||||
"""
|
||||
(set_od_pwm_stack(stack=stack, value=value) for stack in stacks)
|
||||
|
||||
|
||||
def read_od_pwm(stack: int, channel: int) -> float:
|
||||
"""
|
||||
Read specified OpenDrain PWM percentage
|
||||
"""
|
||||
if stack in stacks and 1 <= channel <= 4:
|
||||
try:
|
||||
return megaind.getOdPWM(stack=stack, channel=channel)
|
||||
except:
|
||||
return -512
|
||||
return -512
|
||||
|
||||
def read_od_pwm_stack(stack: int) -> dict[int, float]:
|
||||
"""
|
||||
Read all OpenDrain PWM percentage on specified stack/card
|
||||
"""
|
||||
return {channel: read_od_pwm(stack=stack, channel=channel) for channel in range(1, 5)}
|
||||
|
||||
def read_od_pwm_all() -> dict[int, dict[int, float]]:
|
||||
"""
|
||||
Read all OpenDrain PWM percentage
|
||||
"""
|
||||
return {stack: read_od_pwm_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
"""
|
||||
TODO
|
||||
OptoCount + Opto frequency
|
||||
LED control
|
||||
RTC
|
||||
Owire Bus
|
||||
"""
|
|
@ -1,6 +1,7 @@
|
|||
import src.modules.RTD8.service as service
|
||||
from fastapi import APIRouter
|
||||
from typing import Dict
|
||||
from src.modules.universal_models.value import FloatValueModel
|
||||
|
||||
|
||||
|
||||
|
@ -18,7 +19,7 @@ def get_stack_celsius(stack: int):
|
|||
|
||||
@router.get("/{stack}/{channel}", summary="Read specified RTD8 card value of specified channel in Celsius", response_model=float)
|
||||
def get_channel_celsius(stack: int, channel: int):
|
||||
return service.read_temp(stack, channel)
|
||||
return FloatValueModel(service=service.read_temp(stack, channel))
|
||||
|
||||
|
||||
|
||||
|
@ -30,6 +31,6 @@ def get_all_celsius():
|
|||
def get_stack_celsius(stack: int):
|
||||
return service.read_resistance_stack(stack)
|
||||
|
||||
@router.get("/{stack}/{channel}/resistance", summary="Read specified RTD8 card value of specified channel in Ohm", response_model=float)
|
||||
@router.get("/{stack}/{channel}/resistance", summary="Read specified RTD8 card value of specified channel in Ohm", response_model=FloatValueModel)
|
||||
def get_channel_celsius(stack: int, channel: int):
|
||||
return service.read_resistance(stack, channel)
|
||||
return FloatValueModel(value=service.read_resistance(stack, channel))
|
|
@ -5,6 +5,9 @@ stacks = [int(h) for h in os.getenv("MODULE_RTD8_LEVELS", "")]
|
|||
if not stacks:
|
||||
print("MODULE_RTD8_LEVELS is not set or invalid")
|
||||
|
||||
for i in stacks:
|
||||
print(f"Configured RTD8 card: {i}")
|
||||
|
||||
|
||||
|
||||
def read_temp(stack: int, channel: int) -> float | str:
|
||||
|
@ -15,7 +18,7 @@ def read_temp(stack: int, channel: int) -> float | str:
|
|||
try:
|
||||
return librtd.get(stack, channel)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error read data from card {stack} on channel {channel} by error: {e}")
|
||||
return -512
|
||||
|
||||
return -512
|
||||
|
||||
|
@ -41,7 +44,7 @@ def read_resistance(stack: int, channel: int) -> float | str:
|
|||
try:
|
||||
return librtd.getRes(stack, channel)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error read data from card {stack} on channel {channel} by error: {e}")
|
||||
return -512
|
||||
|
||||
return -512
|
||||
|
||||
|
|
1
src/modules/Rel4HVI4/readme.md
Normal file
1
src/modules/Rel4HVI4/readme.md
Normal file
|
@ -0,0 +1 @@
|
|||
Card: Four Relays four HV Inputs 8-Layer Stackable HAT for Raspberry Pi - https://sequentmicrosystems.com/collections/industrial-automation/products/four-relays-four-inputs-for-raspberry-pi
|
49
src/modules/Rel4HVI4/router.py
Normal file
49
src/modules/Rel4HVI4/router.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
from fastapi import APIRouter
|
||||
import src.modules.Rel4HVI4.service as service
|
||||
from typing import Dict
|
||||
from src.modules.universal_models.value import BoolValueModel
|
||||
|
||||
|
||||
|
||||
router = APIRouter(prefix="/rel4hvi4", tags=["Modules --> Rel4HVI4"])
|
||||
|
||||
|
||||
|
||||
@router.post("/relay/all", description="Set all relays to required state (1/0)")
|
||||
def post_relay_all(state: BoolValueModel):
|
||||
service.set_relay_all(value=state.value)
|
||||
|
||||
@router.post("/relay/{stack}", description="Set all relays on specified card to required state (1/0)")
|
||||
def post_relay_stack(stack: int, state: BoolValueModel):
|
||||
service.set_relay_stack(stack=stack, value=state.value)
|
||||
|
||||
@router.post("/relay/{stack}/{channel}", description="Set all relays on specified card to required state (1/0)")
|
||||
def post_relay_stack(stack: int, channel, state: BoolValueModel):
|
||||
service.set_relay(stack=stack, relay=channel, value=state.value)
|
||||
|
||||
|
||||
@router.get("/relay/all", description="Get state of all relays", response_model=Dict[int, Dict[int, bool]])
|
||||
def get_relay_all():
|
||||
return service.read_relay_all()
|
||||
|
||||
@router.get("/relay/{stack}", description="Get all relays state of specified card", response_model=Dict[int, bool])
|
||||
def get_relay_stack(stack: int):
|
||||
return service.read_relay_stack(stack=stack)
|
||||
|
||||
@router.get("/relay/{stack}/{channel}", description="Get state of selected relay", response_model=BoolValueModel)
|
||||
def get_relay(stack: int, channel: int):
|
||||
return BoolValueModel(value=service.read_relay(stack=stack, relay=channel))
|
||||
|
||||
|
||||
|
||||
@router.get("/opto/all", description="Get input state of all opto inputs", response_model=Dict[int, Dict[int, bool]])
|
||||
def get_opto_all():
|
||||
return service.read_opto_all()
|
||||
|
||||
@router.get("/opto/{stack}", description="Get input state all opto inputs on selected card", response_model=Dict[int, bool])
|
||||
def get_opto_stack(stack: int):
|
||||
return service.read_opto_stack(stack=stack)
|
||||
|
||||
@router.get("/opto/{stack}/{channel}", description="Get state of selected input", response_model=BoolValueModel)
|
||||
def get_opto(stack: int, channel: int):
|
||||
return BoolValueModel(value=service.read_opto(stack=stack, opto=channel))
|
86
src/modules/Rel4HVI4/service.py
Normal file
86
src/modules/Rel4HVI4/service.py
Normal file
|
@ -0,0 +1,86 @@
|
|||
import lib4relind, os
|
||||
from typing import Dict
|
||||
|
||||
|
||||
stacks = [int(h) for h in os.getenv("MODULE_REL4HVI4_LEVELS", "")]
|
||||
if not stacks:
|
||||
print("MODULE_Rel4HVI4_LEVELS is not set or invalid")
|
||||
|
||||
for i in stacks:
|
||||
print(f"Configured Rel4HVI4 card: {i}")
|
||||
|
||||
|
||||
def set_relay(stack: int, relay: int, value: bool):
|
||||
"""
|
||||
Set specified relay to requested state
|
||||
"""
|
||||
if stack in stacks and 1 <= relay <= 4:
|
||||
lib4relind.set_relay(stack=stack, relay=relay, value=int(value))
|
||||
|
||||
def set_relay_stack(stack: int, value: bool):
|
||||
"""
|
||||
Set all relays to requested state on specified stack/card
|
||||
"""
|
||||
(set_relay(stack=stack, relay=ch, value=value) for ch in range(1, 5))
|
||||
|
||||
def set_relay_all(value: bool):
|
||||
"""
|
||||
Set all relays to requested state
|
||||
"""
|
||||
(set_relay_stack(stack=stack, value=value) for stack in stacks)
|
||||
|
||||
|
||||
def read_relay(stack: int, relay: int):
|
||||
"""
|
||||
Read specified relay state
|
||||
"""
|
||||
if stack in stacks and 1 <= relay <= 4:
|
||||
try:
|
||||
if lib4relind.get_relay(stack=stack, relay=relay) == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def read_relay_stack(stack: int):
|
||||
"""
|
||||
Read all relays state on specified stack/card
|
||||
"""
|
||||
return {relay: read_relay(stack=stack, relay=relay) for relay in range(1, 5)}
|
||||
|
||||
def read_relay_all():
|
||||
"""
|
||||
Read all relays state
|
||||
"""
|
||||
return {stack: read_relay_stack(stack=stack) for stack in stacks}
|
||||
|
||||
|
||||
|
||||
def read_opto(stack: int, opto: int):
|
||||
"""
|
||||
Read specified opto input state
|
||||
"""
|
||||
if stack in stacks and 1 <= opto <= 4:
|
||||
try:
|
||||
if lib4relind.get_opto(stack=stack, channel=opto) == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def read_opto_stack(stack: int):
|
||||
"""
|
||||
Read all opto inputs state on specified stack/card
|
||||
"""
|
||||
return {opto: read_opto(stack=stack, opto=opto) for opto in range(1, 5)}
|
||||
|
||||
def read_opto_all():
|
||||
"""
|
||||
Read all opto inputs state
|
||||
"""
|
||||
return {stack: read_opto_stack(stack=stack) for stack in stacks}
|
||||
|
|
@ -1,6 +1,8 @@
|
|||
from fastapi import APIRouter
|
||||
|
||||
from src.modules.RTD8.router import router as RTD8Router
|
||||
from src.modules.Rel4HVI4.router import router as Rel4HVI4Router
|
||||
from src.modules.IndustrialAutomation.router import router as IndustrialAutomationRouter
|
||||
|
||||
|
||||
|
||||
|
@ -8,4 +10,6 @@ from src.modules.RTD8.router import router as RTD8Router
|
|||
|
||||
router = APIRouter(prefix="/modules")
|
||||
|
||||
router.include_router(router=RTD8Router)
|
||||
router.include_router(router=RTD8Router)
|
||||
router.include_router(router=Rel4HVI4Router)
|
||||
router.include_router(router=IndustrialAutomationRouter)
|
11
src/modules/universal_models/value.py
Normal file
11
src/modules/universal_models/value.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class FloatValueModel(BaseModel):
|
||||
value: float
|
||||
|
||||
class IntValueModel(BaseModel):
|
||||
value: int
|
||||
|
||||
class BoolValueModel(BaseModel):
|
||||
value: bool
|
Loading…
Add table
Reference in a new issue