relays.py
'''\
Relays Circuit Modeler.
commands:
main - Main function
test - Test function
usage:
$PYTHON module_name {command} {arguments} -- executing module externally
module.dispatch({command}, args={arguments}) -- calling module from code
Developer@Sonnack.com
April 2016
'''
from sys import stdout, stderr, argv
from os import path
from datetime import date, timedelta
from logger import logger, info, debug, trace
Log = logger('app')
class Contact (object):
'''Contact base class.'''
def __init__ (self, state=False):
'''New Contact instance.'''
self._state = state
def __str__ (self):
'''Return a string version.'''
return str(self._state)
def __repr__ (self):
'''Return a JSON version.'''
s = '{Contact:{state:%s}}'
return s % self._state
def __call__ (self, new_state=None):
'''Return the State. Optionally, use passed value to set it first.'''
if new_state != None:
self._state = new_state
return self._state
def src (self):
return True
class NamedContact (Contact):
'''NamedContact base class.'''
def __init__ (self, name, state=False):
'''New NamedContact instance.'''
super(NamedContact,self).__init__(state)
self._name = name
def __str__ (self):
'''Return a string version.'''
return '[%s] (%s)' % (self._name, self._state)
def __repr__ (self):
'''Return a JSON version.'''
s = '{NamedContact:{name:"%s", state:%s}}'
return s % (self._name, self._state)
@classmethod
def print_members (cls):
Log.info('--%ss--' % cls.__name__)
for m in sorted(cls.members):
Log.info('%s: %s' % (m, cls.members[m]._state))
class GND (Contact):
def __init__ (self):
'''New Ground instance.'''
super(GND,self).__init__(True)
def __str__ (self):
'''Return a string version.'''
return 'Ground (True)'
class Input (NamedContact):
def __init__ (self, name):
'''New Input instance.'''
super(Input,self).__init__(name)
self.add_member()
def __str__ (self):
'''Return a string version.'''
return 'I[%s] (%s)' % (self._name, self._state)
members = {}
def add_member (self):
'''Add object to members collection. (Automatic on initialization!)'''
if self._name not in Input.members:
Input.members[self._name] = self
Log.trace('Input::AddMember: %s' % self._name)
def del_member (self):
'''Delete object from members collection. (Manual!)'''
if self._name in Input.members:
del Input.members[self._name]
Log.trace('Input::DelMember: %s' % self._name)
class Output (NamedContact):
def __init__ (self, name):
'''New Output instance.'''
super(Output,self).__init__(name)
self.add_member()
def __str__ (self):
'''Return a string version.'''
return 'O[%s] (%s)' % (self._name, self._state)
def src (self):
'''Override source flag.'''
return False
members = {}
def add_member (self):
'''Add object to members collection. (Automatic on initialization!)'''
if self._name not in Output.members:
Output.members[self._name] = self
Log.trace('Output::AddMember: %s' % self._name)
def del_member (self):
'''Delete object from members collection. (Manual!)'''
if self._name in Output.members:
del Output.members[self._name]
Log.trace('Output::DelMember: %s' % self._name)
class RelayContactPart (Contact):
'''RelayContactPart class. (Implements a single double-throw relay contact.)'''
def __init__ (self, callback):
super(RelayContactPart,self).__init__()
self._cb_f = callback
def __call__ (self, new_state=None):
'''Get/Put State with callback function.'''
if new_state != None:
self._state = new_state
return self._cb_f(new_state)
class RelayContact (Contact):
'''Relay Contact class.'''
attr_comm = ['c','C', 'comm']
attr_make = ['m','M', 'no','NO']
attr_break = ['b','B', 'nc','NC']
def __init__ (self, ctct_nbr):
super(RelayContact,self).__init__()
self._number = ctct_nbr
self._common = RelayContactPart(self.get_common)
self._make = RelayContactPart(self.get_make)
self._break = RelayContactPart(self.get_break)
def get_common (self, new_state):
if self():
if not self._common._state and self._make._state:
self._common._state = True
else:
if not self._common._state and self._break._state:
self._common._state = True
return self._common._state
def get_make (self, new_state):
if self():
if not self._make._state and self._common._state:
self._make._state = True
else:
if self._make._state:
self._make._state = False
return self._make._state
def get_break (self, new_state):
if self():
if self._break._state:
self._break._state = False
else:
if not self._break._state and self._common._state:
self._break._state = True
return self._break._state
def __str__ (self):
s = '%s[%d]: (%s)(%s|%s)'
t = (self._state, self._number, self._common(), self._make(), self._break())
return s % t
def __repr__ (self):
s = '{RelayContact:{state:%s, number:%d, comm:%s, make:%s, break:%s}}'
t = (self._state, self._number, self._common, self._make, self._break)
return s % t
def __getattr__ (self, name):
'''Getter for virtual properties.'''
if name == 'state': return self._state
if name in self.attr_comm: return self._common
if name in self.attr_make: return self._make
if name in self.attr_break: return self._break
raise AttributeError,('Unknown Attribute: "%s"' % name)
def __setattr__ (self, name, value):
'''Setter for virtual properties.'''
if name in (self.attr_comm + self.attr_make + self.attr_break):
raise AttributeError,('Read-Only Attribute: "%s"' % name)
super(RelayContact,self).__setattr__(name, value)
class Relay (NamedContact):
'''Relay class.'''
def __init__ (self, name, n_contacts=4):
'''New Relay instance.'''
super(Relay,self).__init__(name)
self._contacts = [RelayContact(1+x) for x in range(n_contacts)]
self.add_member()
def src (self):
return False
def __str__ (self):
s = 'K[%s] (%s)'
t = (self._name, self._state)
return s % t
def __repr__ (self):
s = '{Relay:{name:"%s", state:%s, contacts:%d}}'
t = (self._name, self._state, len(self))
return s % t
def __call__ (self, new_state=None):
'''Override for Contact array handling.'''
if new_state != None:
if self._state != new_state:
Log.info('Relay::StateChange: %s -> %s' % (self._state,new_state))
self._state = new_state
for c in self._contacts:
c(self._state)
return self._state
def __nonzero__ (self):
return True
def __len__ (self):
return len(self._contacts)
def __iter__ (self):
return iter(self._contacts)
def __getitem__ (self, index):
'''Get a Contact by index.'''
try:
ix = int(index)
if (ix < 1) or (len(self) < ix):
raise KeyError('Invalid Contact Index: "%d"' % ix)
except:
raise KeyError('Invalid Contact Index: "%s"' % index)
return self._contacts[ix-1]
def __setitem__ (self, index, value):
raise NotImplementedError('Contact replace not allowed!')
def __delitem__ (self, ix):
raise NotImplementedError('Contact delete not allowed!')
members = {}
def add_member (self):
if self._name not in Relay.members:
Relay.members[self._name] = self
def del_member (self):
if self._name in Relay.members:
del Relay.members[self._name]
def print_circuits (circuit):
for t in circuit:
d = (t[0]._state, t[1]._state)
s = '(%-5s :: %5s)'
Log.debug(s % d)
def test_one_circuit (c):
lhs,rhs = c
if lhs() and rhs():
return False
if not lhs() and not rhs():
return False
if lhs():
if lhs.src():
rhs(True)
return True
lhs(False)
return False
if rhs():
if rhs.src():
lhs(True)
return True
rhs(False)
return False
raise RuntimeError('This never happens!')
def test_circuits (cs):
dirty_flag = False
for ix,c in enumerate(cs):
df = test_one_circuit(c)
if df:
dirty_flag = True
Log.trace('%03d: %s' % (ix, df))
return dirty_flag
def do_test (*args):
Log.trace('test/parameters: %d' % len(args))
Input('a')
Input('b')
Relay('A')
Relay('B')
Output('AND')
Output('OR')
Output('NAND')
Output('NOR')
c1 = Contact()
c2 = Contact()
Log.info()
I = Input.members
O = Output.members
K = Relay.members
circuit = [
( I['a'] , K['A'] ),
( I['b'] , K['B'] ),
( GND() , K['A'][1].c ),
( K['A'][1].m , K['B'][1].m ),
( K['B'][1].c , O['AND'] ),
( GND() , K['A'][2].c ),
( GND() , K['B'][2].c ),
( K['A'][2].m , c1 ),
( K['B'][2].m , c1 ),
( c1 , O['OR'] ),
( GND() , K['A'][3].c ),
( GND() , K['B'][3].c ),
( K['A'][3].b , c2 ),
( K['B'][3].b , c2 ),
( c2 , O['NAND'] ),
( GND() , K['A'][4].c ),
( K['A'][4].b , K['B'][4].b ),
( K['B'][4].c , O['NOR'] ),
]
Input.members['a'](True)
Input.members['b'](False)
Input.print_members()
for ix in range(16):
print_circuits(circuit)
Log.debug('Circuit Test Loop #%d' % (1+ix))
if not test_circuits(circuit):
break
print_circuits(circuit)
Output.print_members()
return circuit
def do_main (*args):
Log.trace('main/parameters: %s' % str(args))
return args
def dispatch (cmd, *args):
Log.trace('command: %s' % cmd)
Log.trace('arguments: %d' % len(args))
if cmd == 'test': return do_test(*args)
if cmd == 'main': return do_main(*args)
return [None, cmd, args]
if __name__ == '__main__':
print 'autorun: %s' % argv[0]
Log.start('relays.log')
Log.level(info())
cmd = argv[1] if 1 < len(argv) else ''
etc = argv[2:]
try:
obj = dispatch(cmd, *etc)
Log.trace()
for ix,g in enumerate(obj):
Log.trace('%d: %s' % (ix,repr(g)))
print 'exit/type: %s (length: %d)' % (type(obj), len(obj))
except Exception as e:
print >> stderr, 'ERROR! "%s"' % e
Log.error(e)
raise
finally:
Log.end()
'''eof'''