Added support for Inputs/outputs Industrial Automation card [partial support]

main
Jan Beníček 2024-12-08 12:22:15 +01:00
parent 5ec90c314f
commit 4dd870f755
11 changed files with 482 additions and 19 deletions

View File

@ -1,2 +1,3 @@
MODULE_RTD8_LEVELS=01234567 #number of stack level (0-7) not separated MODULE_RTD8_LEVELS=01234567 #number of stack level (0-7) not separated
MODULE_REL4HVI4_LEVELS=01234567 #number of stack level (0-7) not separated MODULE_REL4HVI4_LEVELS=01234567 #number of stack level (0-7) not separated
MODULE_INDUSTRIALAUTOMATION_LEVELS=01234567 #number of stack level (0-7) not separated

View File

@ -5,7 +5,7 @@
groups = ["default"] groups = ["default"]
strategy = ["cross_platform", "inherit_metadata"] strategy = ["cross_platform", "inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:a386b1448f475cfd997f505d3dc55da73a888f9be06306adbabb701440ec7aa3" content_hash = "sha256:b833a7bb8997b54c2b61879638963fef2bf4d8265bc9f0cd0d381cb8af3dd6b1"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.12" requires_python = ">=3.12"
@ -218,6 +218,19 @@ files = [
{file = "smbus2-0.5.0.tar.gz", hash = "sha256:4a5946fd82277870c2878befdb1a29bb28d15cda14ea4d8d2d54cf3d4bdcb035"}, {file = "smbus2-0.5.0.tar.gz", hash = "sha256:4a5946fd82277870c2878befdb1a29bb28d15cda14ea4d8d2d54cf3d4bdcb035"},
] ]
[[package]]
name = "smmegaind"
version = "1.0.3"
summary = "Library to control PLACEHOLDER Automation Card"
groups = ["default"]
dependencies = [
"smbus2",
]
files = [
{file = "SMmegaind-1.0.3-py2.py3-none-any.whl", hash = "sha256:eed7af4ff15605474596805d8569a8b1d458c1716a1be545e2f8f23b6f94b1c7"},
{file = "SMmegaind-1.0.3.tar.gz", hash = "sha256:e9dd339bc710d569d82148683b5a0938d417cb61fab7b2c427f3a62bc68db4d4"},
]
[[package]] [[package]]
name = "smrtd" name = "smrtd"
version = "1.0.1" version = "1.0.1"

View File

@ -11,6 +11,7 @@ dependencies = [
"SMrtd>=1.0.1", "SMrtd>=1.0.1",
"SM4relind>=1.0.3", "SM4relind>=1.0.3",
"python-dotenv>=1.0.1", "python-dotenv>=1.0.1",
"SMmegaind>=1.0.3",
] ]
requires-python = ">=3.12" requires-python = ">=3.12"
readme = "README.md" readme = "README.md"

View File

@ -0,0 +1 @@
Card: Industrial Automation 8-Layer Stackable HAT for Raspberry Pi - https://sequentmicrosystems.com/collections/industrial-automation/products/industrial-raspberry-pi

View File

@ -0,0 +1,162 @@
from fastapi import APIRouter
from typing import Dict
from src.modules.IndustrialAutomation import service_analog as analogService
from src.modules.IndustrialAutomation import service_digital as digitalService
from src.modules.universal_models.value import FloatValueModel, BoolValueModel
router = APIRouter(prefix="/industrialAutomation", tags=["Modules --> Industrial Automation Hat"])
#analog
analog = APIRouter(prefix="/analog")
#0-10 V output
@analog.get("/output/0-10V/all", description="Read all setted 0-10V outputs value", response_model=Dict[int, Dict[int, float]])
def get_0_10V_out_all():
return analogService.read_0_10_out_all()
@analog.get("/output/0-10V/{stack}", description="Read setted 0-10V output values on specified stack", response_model=Dict[int, float])
def get_0_10V_out_stack(stack: int):
return analogService.read_0_10_out_stack(stack=stack)
@analog.get("/output/0-10V/{stack}/{channel}", description="Read set value from selected 0-10V output", response_model=FloatValueModel)
def get_0_10V_out(stack: int, channel: int):
return FloatValueModel(value=analogService.read_0_10_out(stack=stack, channel=channel))
@analog.post("/output/0-10V/all", description="Set output value to all 0-10V outputs", response_model=Dict[int, Dict[int, float]])
def post_0_10V_out_all(output: FloatValueModel):
analogService.set_0_10_out_all(value=output.value)
return analogService.read_0_10_out_all()
@analog.post("/output/0-10V/{stack}", description="Set output value to all 0-10V output on specified card", response_model=Dict[int, float])
def post_0_10V_out_stack(stack: int, output: FloatValueModel):
analogService.set_0_10_out_stack(stack=stack, value=output.value)
return analogService.read_0_10_out_stack(stack=stack)
@analog.post("/output/0-10V/{stack}/{channel}", description="Set output value to specified 0-10V output", response_model=FloatValueModel)
def post_0_10V_out(stack: int, channel: int, output: FloatValueModel):
analogService.set_0_10_out(stack=stack, channel=channel, value=output.value)
return FloatValueModel(value=analogService.read_0_10_out(stack=stack, channel=channel))
#0-10V input
@analog.get("/input/0-10V/all", description="Read all 0-10V inputs value", response_model=Dict[int, Dict[int, float]])
def get_0_10V_in_all():
return analogService.read_0_10_in_all()
@analog.get("/input/0-10V/{stack}", description="Read all 0-10V inputs on specified card", response_model=Dict[int, float])
def get_0_10V_in_stack(stack: int):
return analogService.read_0_10_in_stack(stack=stack)
@analog.get("/input/0-10V/{stack}/{channel}", description="Read specified 0-10V input", response_model=FloatValueModel)
def get_0_10V_in(stack: int, channel: int):
return FloatValueModel(value=analogService.read_0_10_in(stack=stack, channel=channel))
#4-20mA output
@analog.get("/output/4-20mA/all", description="Read all setted 4-20mA output values", response_model=Dict[int, Dict[int, float]])
def get_4_20mA_out_all():
return analogService.read_4_20_out_all()
@analog.get("/output/4-20mA/{stack}", description="Read all setted 4-20mA output values of specified card", response_model=Dict[int, float])
def get_4_20mA_out_stack(stack: int):
return analogService.read_4_20_out_stack(stack=stack)
@analog.get("/output/4-20mA/{stack}/{channel}", description="Read setted 4-20mA output of specified channel", response_model=FloatValueModel)
def get_4_20mA_out(stack: int, channel: int):
return analogService.read_4_20_out(stack=stack, channel=channel)
@analog.post("/output/4-20mA/all", description="Set output value to all 4-20mA outputs", response_model=Dict[int, Dict[int, float]])
def post_4_20mA_out_all(output: FloatValueModel):
analogService.set_4_20_out_all(value=output.value)
return analogService.read_4_20_out_all()
@analog.post("/output/4-20mA/{stack}", description="Set output value of 4-20mA outputs on specified card", response_model=Dict[int, float])
def post_4_20mA_out_stack(stack: int, output: FloatValueModel):
analogService.set_4_20_out_stack(stack=stack, value=output.value)
return analogService.read_4_20_out_stack(stack=stack)
@analog.post("/output/4-20mA/{stack}/{channel}", description="Set output value of specified 4-20mA output", response_model=FloatValueModel)
def post_4_20mA_out(stack: int, channel: int, output: FloatValueModel):
analogService.set_4_20_out(stack=stack, channel=channel, value=output.value)
return FloatValueModel(value=analogService.read_4_20_out(stack=stack, channel=channel))
#4-20mA input
@analog.get("/input/4-20mA/all", description="Read all 4-20mA input values", response_model=Dict[int, Dict[int, float]])
def get_4_20mA_out_all():
return analogService.read_4_20_in_all()
@analog.get("/input/4-20mA/{stack}", description="Read all 4-20mA input values of specified card", response_model=Dict[int, float])
def get_4_20mA_out_stack(stack: int):
return analogService.read_4_20_in_stack(stack=stack)
@analog.get("/input/4-20mA/{stack}/{channel}", description="Read 4-20mA input of specified channel", response_model=FloatValueModel)
def get_4_20mA_out(stack: int, channel: int):
return analogService.read_4_20_in(stack=stack, channel=channel)
#digital
digital = APIRouter(prefix="/digital")
#opto inputs
@digital.get("/input/opto/all", description="Read all opto isolated inputs", response_model=Dict[int, Dict[int, bool]])
def get_opto_in_all():
return digitalService.read_opto_all()
@digital.get("/input/opto/{stack}", description="Read all opto isolated inputs on specified card", response_model=Dict[int, bool])
def get_opto_in_stack(stack: int):
return digitalService.read_opto_stack(stack=stack)
@digital.get("/input/opto/{stack}/{channel}", description="Read selected opto isolated input", response_model=BoolValueModel)
def get_opto_in(stack: int, channel:int):
return BoolValueModel(value=digitalService.read_opto(stack=stack, channel=channel))
#open Drain
@digital.get("/output/OD/all", description="Read open drain PWM freq in percentage from all outputs", response_model=Dict[int, Dict[int, float]])
def get_od_all():
return digitalService.read_od_pwm_all()
@digital.get("/output/OD/{stack}", description="Read open dran PWM freq in percentage tfrom all outputs on selected card", response_model=Dict[int, float])
def get_od_stack(stack: int):
return digitalService.read_od_pwm_stack(stack=stack)
@digital.get("/output/OD/{stack}/{channel}", description="Read open drain freq in percentage from selected output", response_model=FloatValueModel)
def get_od(stack: int, channel: int):
return FloatValueModel(value=digitalService.read_od_pwm(stack=stack, channel=channel))
@digital.post("/output/OD/all", description="Set open drain PWM freq in percentage to all outputs", response_model=Dict[int, Dict[int, float]])
def post_od_all(output: FloatValueModel):
digitalService.set_od_pwm_all(value=output.value)
return digitalService.read_od_pwm_all()
@digital.post("/output/OD/{stack}", description="Set open drain freq in percentage to all outputs on selected card", response_model=Dict[int, float])
def post_od_stack(stack: int, output: FloatValueModel):
digitalService.set_od_pwm_stack(stack=stack, value=output.value)
return digitalService.read_od_pwm_stack(stack=stack)
@digital.post("/output/OD/{stack}/{channel}", description="Set open drain freq in percentage to selected channel", response_model=FloatValueModel)
def post_od(stack: int, channel: int, output: FloatValueModel):
digitalService.set_od_pwm(stack=stack, channel=channel, value=output.value)
return FloatValueModel(value=digitalService.read_od_pwm(stack=stack, channel=channel))
router.include_router(router=analog)
router.include_router(router=digital)

View File

@ -0,0 +1,166 @@
import os, megaind
stacks = [int(h) for h in os.getenv("MODULE_INDUSTRIALAUTOMATION_LEVELS", "")]
if not stacks:
print("MODULE_INDUSTRIALAUTOMATION_LEVELS is not set or invalid")
for i in stacks:
print(f"Configured IndustrialAutomation card: {i}")
def read_0_10_out(stack: int, channel: int) -> float:
"""
Read set value in selected 0-10V output
"""
if stack in stacks and 1 <= channel <= 4:
try:
return megaind.get0_10Out(stack=stack, channel=channel)
except:
return -512
return -512
def read_0_10_out_stack(stack: int) -> dict[int, float]:
"""
Read set values in 0-10V output on selected card
"""
return {channel: read_0_10_out(stack=stack, channel=channel) for channel in range(1, 5)}
def read_0_10_out_all() -> dict[int, dict[int, float]]:
"""
Read set values in 0-10V outputs on all cards
"""
return {stack: read_0_10_out_stack(stack=stack) for stack in stacks}
def set_0_10_out(stack: int, channel: int, value: float):
"""
Set output voltage of 0-10V output to specified output
"""
if stack in stacks and 1 <= channel <= 4:
if value > 10:
value = 10
elif value < 0:
value = 0
try:
megaind.set0_10Out(stack=stack, channel=channel, value=value)
except:
print("error set 0-10V output value")
def set_0_10_out_stack(stack: int, value: float):
"""
Set output voltage of 0-10V output on specified card
"""
(set_0_10_out(stack=stack, channel=channel, value=value) for channel in range(1, 5))
def set_0_10_out_all(value: float):
"""
Set output voltage of 0-10V output on all card
"""
(set_0_10_out_stack(stack=stack, value=value) for stack in stacks)
def read_0_10_in(stack: int, channel: int) -> float:
"""
Read value in selected 0-10V input
"""
if stack in stacks and 1 <= channel <= 4:
try:
return megaind.get0_10In(stack=stack, channel=channel)
except:
return -512
return -512
def read_0_10_in_stack(stack: int) -> dict[int, float]:
"""
Read values in 0-10V input on selected card
"""
return {channel: read_0_10_in(stack=stack, channel=channel) for channel in range(1, 5)}
def read_0_10_in_all() -> dict[int, dict[int, float]]:
"""
Read values in 0-10V inputs on all cards
"""
return {stack: read_0_10_in_stack(stack=stack) for stack in stacks}
def read_4_20_in(stack: int, channel: int) -> float:
"""
Read values from specified 4-20mA input
"""
if stack in stacks and 1 <= channel <= 4:
try:
return megaind.get4_20In(stack=stack, channel=channel)
except:
return -512
return -512
def read_4_20_in_stack(stack: int) -> dict[int, float]:
"""
Read values from all 4-20mA inputs on specified card
"""
return {channel: read_4_20_in(stack=stack, channel=channel) for channel in range(1, 5)}
def read_4_20_in_all() -> dict[int, dict[int, float]]:
"""
Read values from all 4-20mA inputs
"""
return {stack: read_4_20_in_stack(stack=stack) for stack in stacks}
def read_4_20_out(stack: int, channel: int) -> float:
"""
Read setted value from specified 4-20mA output
"""
if stack in stacks and 1 <= channel <= 4:
try:
return megaind.get4_20Out(stack=stack, channel=channel)
except:
return -512
return -512
def read_4_20_out_stack(stack: int) -> dict[int, float]:
"""
Read setted values from all 4-20mA outputs on specified card
"""
return {channel: read_4_20_out(stack=stack, channel=channel) for channel in range(1, 5)}
def read_4_20_out_all() -> dict[int, dict[int, float]]:
"""
Read setted values from all 4-20mA outputs
"""
return {stack: read_4_20_out_stack(stack=stack) for stack in stacks}
def set_4_20_out(stack: int, channel: int, value: float):
"""
Set output value for specified 4-20mA output
"""
if stack in stacks and 1<= channel <= 4:
if value > 20:
value = 20
elif value < 4:
value = 4
try:
megaind.set4_20Out(stack=stack, channel=channel, value=value)
except:
print("Error set 4-20mA output")
def set_4_20_out_stack(stack: int, value: float):
"""
Set output value to all 4-20mA on specified card
"""
(set_4_20_out(stack=stack, channel=channel, value=value) for channel in range(1, 5))
def set_4_20_out_all(value: float):
"""
Set output value to all for 4-20mA outputs
"""
(set_4_20_out_stack(stack=stack, value=value) for stack in stacks)

View File

@ -0,0 +1,104 @@
import os, megaind
stacks = [int(h) for h in os.getenv("MODULE_INDUSTRIALAUTOMATION_LEVELS", "")]
if not stacks:
print("MODULE_INDUSTRIALAUTOMATION_LEVELS is not set or invalid")
for i in stacks:
print(f"Configured IndustrialAutomation card: {i}")
def read_opto(stack: int, opto: int):
"""
Read specified opto input state
"""
if stack in stacks and 1 <= opto <= 4:
try:
if megaind.getOptoCh(stack=stack, channel=opto) == 1:
return True
else:
return False
except:
return False
return False
def read_opto_stack(stack: int):
"""
Read all opto inputs state on specified stack/card
"""
return {opto: read_opto(stack=stack, opto=opto) for opto in range(1, 5)}
def read_opto_all():
"""
Read all opto inputs state
"""
return {stack: read_opto_stack(stack=stack) for stack in stacks}
def set_od_pwm(stack: int, channel: int, value: float):
"""
Set specified OpenDrain PWM percentage
"""
if stack in stacks and 1 <= channel <= 4:
if value < 0:
value = 0
elif value > 100:
value = 100
megaind.setOdPWM(stack=stack, channel=channel, value=value)
def set_od_pwm_stack(stack: int, value: bool):
"""
Set all OpenDrain PWM percentage on specified stack/card
"""
(set_od_pwm(stack=stack, channel=ch, value=value) for ch in range(1, 5))
def set_od_pwm_all(value: bool):
"""
Set all OpenDrain PWM percentage to requested state
"""
(set_od_pwm_stack(stack=stack, value=value) for stack in stacks)
def read_od_pwm(stack: int, channel: int) -> float:
"""
Read specified OpenDrain PWM percentage
"""
if stack in stacks and 1 <= channel <= 4:
try:
return megaind.getOdPWM(stack=stack, channel=channel)
except:
return -512
return -512
def read_od_pwm_stack(stack: int) -> dict[int, float]:
"""
Read all OpenDrain PWM percentage on specified stack/card
"""
return {channel: read_od_pwm(stack=stack, channel=channel) for channel in range(1, 5)}
def read_od_pwm_all() -> dict[int, dict[int, float]]:
"""
Read all OpenDrain PWM percentage
"""
return {stack: read_od_pwm_stack(stack=stack) for stack in stacks}
"""
TODO
OptoCount + Opto frequency
LED control
RTC
Owire Bus
"""

View File

@ -1,6 +1,7 @@
import src.modules.RTD8.service as service import src.modules.RTD8.service as service
from fastapi import APIRouter from fastapi import APIRouter
from typing import Dict from typing import Dict
from src.modules.universal_models.value import FloatValueModel
@ -18,7 +19,7 @@ def get_stack_celsius(stack: int):
@router.get("/{stack}/{channel}", summary="Read specified RTD8 card value of specified channel in Celsius", response_model=float) @router.get("/{stack}/{channel}", summary="Read specified RTD8 card value of specified channel in Celsius", response_model=float)
def get_channel_celsius(stack: int, channel: int): def get_channel_celsius(stack: int, channel: int):
return service.read_temp(stack, channel) return FloatValueModel(service=service.read_temp(stack, channel))
@ -30,6 +31,6 @@ def get_all_celsius():
def get_stack_celsius(stack: int): def get_stack_celsius(stack: int):
return service.read_resistance_stack(stack) return service.read_resistance_stack(stack)
@router.get("/{stack}/{channel}/resistance", summary="Read specified RTD8 card value of specified channel in Ohm", response_model=float) @router.get("/{stack}/{channel}/resistance", summary="Read specified RTD8 card value of specified channel in Ohm", response_model=FloatValueModel)
def get_channel_celsius(stack: int, channel: int): def get_channel_celsius(stack: int, channel: int):
return service.read_resistance(stack, channel) return FloatValueModel(value=service.read_resistance(stack, channel))

View File

@ -1,6 +1,7 @@
from fastapi import APIRouter from fastapi import APIRouter
import src.modules.Rel4HVI4.service as service import src.modules.Rel4HVI4.service as service
from typing import Dict from typing import Dict
from src.modules.universal_models.value import BoolValueModel
@ -8,17 +9,17 @@ router = APIRouter(prefix="/rel4hvi4", tags=["Modules --> Rel4HVI4"])
@router.post("/relay/all/{state}", description="Set all relays to required state (1/0)") @router.post("/relay/all", description="Set all relays to required state (1/0)")
def post_relay_all(state: bool): def post_relay_all(state: BoolValueModel):
service.set_relay_all(value=state) service.set_relay_all(value=state.value)
@router.post("/relay/{stack}/{state}", description="Set all relays on specified card to required state (1/0)") @router.post("/relay/{stack}", description="Set all relays on specified card to required state (1/0)")
def post_relay_stack(stack: int, state: bool): def post_relay_stack(stack: int, state: BoolValueModel):
service.set_relay_stack(stack=stack, value=state) service.set_relay_stack(stack=stack, value=state.value)
@router.post("/relay/{stack}/{channel}/{state}", description="Set all relays on specified card to required state (1/0)") @router.post("/relay/{stack}/{channel}", description="Set all relays on specified card to required state (1/0)")
def post_relay_stack(stack: int, channel, state: bool): def post_relay_stack(stack: int, channel, state: BoolValueModel):
service.set_relay(stack=stack, relay=channel, value=state) service.set_relay(stack=stack, relay=channel, value=state.value)
@router.get("/relay/all", description="Get state of all relays", response_model=Dict[int, Dict[int, bool]]) @router.get("/relay/all", description="Get state of all relays", response_model=Dict[int, Dict[int, bool]])
@ -29,9 +30,9 @@ def get_relay_all():
def get_relay_stack(stack: int): def get_relay_stack(stack: int):
return service.read_relay_stack(stack=stack) return service.read_relay_stack(stack=stack)
@router.get("/relay/{stack}/{channel}", description="Get state of selected relay", response_model=bool) @router.get("/relay/{stack}/{channel}", description="Get state of selected relay", response_model=BoolValueModel)
def get_relay(stack: int, channel: int): def get_relay(stack: int, channel: int):
return service.read_relay(stack=stack, relay=channel) return BoolValueModel(value=service.read_relay(stack=stack, relay=channel))
@ -43,6 +44,6 @@ def get_opto_all():
def get_opto_stack(stack: int): def get_opto_stack(stack: int):
return service.read_opto_stack(stack=stack) return service.read_opto_stack(stack=stack)
@router.get("/opto/{stack}/{channel}", description="Get state of selected input", response_model=bool) @router.get("/opto/{stack}/{channel}", description="Get state of selected input", response_model=BoolValueModel)
def get_opto(stack: int, channel: int): def get_opto(stack: int, channel: int):
return service.read_opto(stack=stack, opto=channel) return BoolValueModel(value=service.read_opto(stack=stack, opto=channel))

View File

@ -2,6 +2,7 @@ from fastapi import APIRouter
from src.modules.RTD8.router import router as RTD8Router from src.modules.RTD8.router import router as RTD8Router
from src.modules.Rel4HVI4.router import router as Rel4HVI4Router from src.modules.Rel4HVI4.router import router as Rel4HVI4Router
from src.modules.IndustrialAutomation.router import router as IndustrialAutomationRouter
@ -11,3 +12,4 @@ router = APIRouter(prefix="/modules")
router.include_router(router=RTD8Router) router.include_router(router=RTD8Router)
router.include_router(router=Rel4HVI4Router) router.include_router(router=Rel4HVI4Router)
router.include_router(router=IndustrialAutomationRouter)

View File

@ -0,0 +1,11 @@
from pydantic import BaseModel
class FloatValueModel(BaseModel):
value: float
class IntValueModel(BaseModel):
value: int
class BoolValueModel(BaseModel):
value: bool