boolobj.py
'''BOOL Objects. (Take many, many.)
classes:
BoolDataBlock (object)
BoolCallFrame (object)
BoolPStack (object)
BoolTuple (tuple)
BoolParam (BoolTuple)
BoolAddr (BoolTuple)
BoolLst (BoolTuple)
BoolMsg (BoolTuple)
BoolTor (BoolTuple)
BoolAction (BoolTuple)
BoolObj (BoolTuple)
BoolRef (BoolTuple)
BoolModel (BoolTuple)
BoolContextObject (object)
Coder@Sonnack.com
June 20, 2017
'''
from __future__ import print_function
from sys import stdout, stderr, argv
from datetime import datetime
from hashlib import md5 as StringHash
from logger import logger, info, debug, trace
Log = logger('BOOL')
NUL = 'NUL'
PAR = 'PAR'
OBJ = 'OBJ'
REF = 'REF'
LST = 'LST'
MSG = 'MSG'
TOR = 'TOR'
ACT = 'ACT'
MDL = 'MDL'
SYS = 'SYS'
EXT = 'EXT'
DEV = 'DEV'
Meta = [NUL, PAR, OBJ, REF, LST, MSG, TOR, ACT, MDL, SYS, EXT, DEV]
Here = '!'
def meta_name (ix):
'''Return a meta-type name given an integer index.'''
if not isinstance(ix,int):
raise KeyError('Invalid Index Type!')
if 0 <= ix < len(Meta):
return Meta[ix]
raise IndexError('Index Out Of Range!')
def meta_index (mtype):
'''Return an integer index given a meta-type name.'''
if not isinstance(mtype,str):
raise KeyError('Invalid Meta-Type!')
if mtype in Meta:
return Meta.index(mtype)
raise KeyError('Unknown Meta-Type!')
class BoolDataBlock (object):
'''Manager for data memory allocation.'''
def __init__ (self, datum):
self.datum = datum
self.block = []
def __len__ (self):
return len(self.block)
def __str__ (self):
return 'DataBlock: %d item%s [%s]' % (len(self.block), 's' if len(self.block) == 1 else '', type(self.datum))
def new_datum (self, datum=None):
new_datum = self.datum if datum == None else datum
try:
ix = self.block.index(None)
self.block[ix] = new_datum
return ix
except ValueError:
ix = len(self.block)
self.block.append(new_datum)
return ix
def del_datum (self, ix):
self._index_validate(ix)
self.block[ix] = None
def get_datum (self, ix):
self._index_validate(ix)
return self.block[ix]
def set_datum (self, ix, datum):
self._index_validate(ix)
self.block[ix] = datum
def _index_validate (self, ix):
if ix < 0:
raise Exception('Data Index below zero!: %d' % ix)
if len(self.block) <= ix:
raise Exception('Data Index beyond DataBlock size!: %d' % ix)
class BoolCallFrame (object):
def __init__ (self, action, parent=None):
self.parent = parent
self.action = action
self.name = action.name
self.data = action.dataslots * [None]
self.retn_flag = 0
self.loop_flag = 0
self.nest_level = 0
Log.trace(str(self))
def __len__ (self):
return len(self.data)
def __str__ (self):
return 'CallFrame: %s [%d]' % (self.name, len(self.data))
class BoolPStack (object):
'''\
BOOL Parameter Stack.
The Parameter Stack carries parameters into and out of Actions.
Usually, input parameters are references to Objects, but they
can be new Objects created on the fly (for pass by value).
Output parameters are often Objects created in the Action that
need to be transferred to the calling frame. This requirement,
and the pass by value ability, require the parameter stack be
capable of containing entire Objects like Address Space does.
This is in addition to holding references to Objects in the
usual Address Space.
'''
Log = logger('PStack')
def __init__ (self):
self.Log.trace('init')
self.stack = []
def __len__ (self):
'''Return count of items in stack.'''
return len(self.stack)
def __getitem__ (self, ix):
'''Return a reference to an item in the stack.'''
return self.stack[ix]
def push (self, obj):
self.Log.trace('PStak::push: %s' % str(obj))
if obj:
self.stack.append(obj)
def peek (self):
n = len(self)
if 0 < n:
obj = self.stack[n-1]
self.Log.trace('PStak::peek: %s' % str(obj))
return obj
raise BufferError,'PStack is empty!'
def pop (self):
if len(self):
obj = self.stack.pop()
self.Log.trace('PStak::pop: %s' % str(obj))
return obj
raise BufferError,'PStack is empty!'
class BoolTuple (tuple):
'''Bool (extended) Tuple base class.'''
Log = logger('Meta')
def __new__ (cls, *args):
return tuple.__new__(cls, args)
def __init__ (self, *args):
tuple.__init__(self, args)
def __getattr__ (self, name):
if name in ['mtype','meta','metatype','objtype']: return self[0]
raise AttributeError(BoolTuple.E_ATTR_DEF % name)
def __setattr__ (self, name, value):
if name in ['mtype','meta','metatype','objtype']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
tuple.__setattr__(self, name, value)
E_ATTR_RO = 'Attribute "%s" is read-only'
E_ATTR_DEF = 'Attribute "%s" is not defined.'
class BoolParam (BoolTuple):
'''Bool (PStack) Parameter class.'''
def __new__ (cls, model, dkey):
return BoolTuple.__new__(cls, PAR, model, dkey)
def __init__ (self, model, dkey):
BoolTuple.__init__(self, PAR, model, dkey)
def __getattr__ (self, name):
if name == 'model': return self[1]
if name == 'dkey': return self[2]
raise AttributeError(BoolTuple.E_ATTR_DEF % name)
def __setattr__ (self, name, value):
if name in ['model','dkey']:
raise AttributeError('Attribute "%s" is read-only' % name)
BoolTuple.__setattr__(self, name, value)
class BoolAddr (BoolTuple):
'''Bool Address class. Just a named tuple: (ns, offset)'''
def __new__ (cls, offset, ns=Here):
return BoolTuple.__new__(cls, 'addr', ns, offset)
def __init__ (self, offset, ns=Here):
BoolTuple.__init__(self, 'addr', ns, offset)
if not isinstance(ns,str):
raise TypeError,'Address namespace must be a string!'
if not isinstance(offset,int):
raise TypeError,'Address value must be an integer!'
def __getattr__ (self, name):
if name == 'ns': return self[1]
if name == 'offset': return self[2]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['ns','offset']:
raise AttributeError('Attribute "%s" is read-only' % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
return '[%s:%04d]' % (self.ns, self.offset)
class BoolLst (BoolTuple):
'''BOOL List Object class.'''
def __new__ (cls, *args, **kwargs):
list_len = len(args)
gate_expr = kwargs['gate_expr'] if 'gate_expr' in kwargs else None
props = kwargs['props'] if 'props' in kwargs else None
args = args+(props,)
return BoolTuple.__new__(cls, LST, gate_expr, list_len, *args)
def __init__ (self, *args, **kwargs):
BoolTuple.__init__(self, *args)
def __getattr__ (self, name):
if name == 'gate': return self[1]
if name == 'size': return self[2]
if name == 'items': return self[3:3+self.size]
if name == 'props': return self[3+self.size]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['gate','size','items','props']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
s = '(%s items=%d gate=%s)'
t = (self.mtype, self.size, self.gate if self.gate else '()')
return s % t
def test_gate (self):
if self.gate:
self.Log.trace('List Gate: %s' % str(self.gate))
BoolContext.send_message('Q:', self.gate)
obj = BoolContext.ps_pop()
self.Log.debug('List Gate result: %s' % str(obj))
BoolContext.invoke_model(obj.model, 'true')
obj = BoolContext.ps_pop()
if obj.model != 'bool':
raise RuntimeError('Expression result not a *bool object: *%s' % obj.model)
mdl = BoolContext.get_model(obj.model)
return mdl.get_value(obj.dkey)
return True
def receive_message (self, msg, ns=Here):
self.Log.trace('%sList (items=%d)' % (msg, self.size))
if len(self.items) != self.size:
raise Exception('Invalid list size or length! (size:%d, len:%d)' % (self.size,len(self.items)))
if self.test_gate():
for list_item in self.items:
BoolContext.send_message(msg, list_item)
if BoolContext.cf.loop_flag:
BoolContext.cf.loop_flag = 0
break
class BoolMsg (BoolTuple):
'''BOOL Message Object class.'''
def __new__ (cls, name, target, param):
return BoolTuple.__new__(cls, MSG, name, target, param)
def __init__ (self, name, target, param):
BoolTuple.__init__(self, MSG, name, target, param)
def __getattr__ (self, name):
if name == 'name': return self[1]
if name == 'target': return self[2]
if name == 'param': return self[3]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['name','target','param']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
s = '(%s %s: %s %s)'
t = (self.mtype, self.name, self.target, self.param if self.param else ';')
return s % t
def receive_message (self, msg, ns=Here):
self.Log.trace('%sMessage: "%s"' % (msg, self.name))
if self.param:
BoolContext.send_message('Q:', self.param)
BoolContext.send_message('Q:', self.target)
obj = BoolContext.ps_peek()
self.Log.trace('MSG: target object: %s' % str(obj))
BoolContext.invoke_model(obj.model, self.name)
if msg in 'x:X:':
obj_addr = BoolContext.ps_pop()
class BoolTor (BoolTuple):
'''BOOL Actor ("'Tor") Object class.'''
def __new__ (cls, name, params):
return BoolTuple.__new__(cls, TOR, name, params)
def __init__ (self, *args):
BoolTuple.__init__(self, TOR, *args)
def __getattr__ (self, name):
if name == 'name': return self[1]
if name == 'params': return self[2]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['name','params']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
p = str(self.params[0])
n = len(self.params[1:])
r = (' +%d' % n) if 0 < n else ''
s = '(%s @%s %s%s)'
t = (self.mtype, self.name, p, r)
return s % t
def receive_message (self, msg, ns=Here):
self.Log.trace('%sActor: "%s" (params: %d)' % (msg, self.name, len(self.params)))
BoolContext.invoke_action(self.name, self.params)
if msg in 'x:X:':
obj_addr = BoolContext.ps_pop()
class BoolAction (BoolTuple):
'''BOOL Action Object class.'''
def __new__ (cls, name,sig,dslots, initlist,exitlist,clauses, codelist, namtab={}, props=None):
return BoolTuple.__new__(cls, ACT, name,sig,dslots, initlist,exitlist,clauses, codelist, namtab, props)
def __init__ (self, *args, **kwargs):
BoolTuple.__init__(self, ACT, *args)
def __getattr__ (self, name):
if name == 'name': return self[1]
if name == 'sig': return self[2]
if name == 'dataslots': return self[3]
if name == 'initlist': return self[4]
if name == 'exitlist': return self[5]
if name == 'clauses': return self[6]
if name == 'codeblock': return self[7]
if name == 'namtab': return self[8]
if name == 'props': return self[9]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['name','sig','dataslots','initlist','exitlist','clauses','codeblock','namtab','props']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
s = '(%s @%s inits=%d exits=%d clauses=%d cs=%d ds=%d)'
t = (self.mtype, self.name, len(self.initlist), len(self.exitlist), len(self.clauses), len(self.codeblock), self.dataslots)
return s % t
def receive_message (self, msg, ns=Here):
pass
class BoolObj (BoolTuple):
'''BOOL Instance Object class.'''
def __new__ (cls, model, dx, init_list=None, props=None):
return BoolTuple.__new__(cls, OBJ, model, dx, init_list, props)
def __init__ (self, *args, **kwargs):
BoolTuple.__init__(self, OBJ, *args)
def __getattr__ (self, name):
if name == 'model': return self[1]
if name == 'dx': return self[2]
if name == 'init': return self[3]
if name == 'props': return self[4]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['model','dx','init','props']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
s = '(%s *%s #%d =%s)'
t = (self.mtype, self.model, self.dx, str(self.init))
return s % t
def initialize (self):
'''Initialize Object (use Init-List if present).'''
mdl = BoolContext.find_model(self.model)
if self.props and ('value' in self.props):
datum = self.props['value']
dkey = mdl.new_value(datum)
BoolContext.cf.data[self.dx] = dkey
self.Log.trace('OBJ init-property: %s' % dkey)
return
if self.init:
BoolContext.send_message('Q:', self.init)
obj = BoolContext.ps_pop()
BoolContext.cf.data[self.dx] = obj.dkey
self.Log.trace('OBJ init-list: %s' % str(obj))
return
dkey = mdl.new_value()
BoolContext.cf.data[self.dx] = dkey
self.Log.trace('OBJ initialize: %s' % dkey)
def ps_push (self, ns):
cf = self.__get_cf(ns)
t = BoolParam(self.model, cf.data[self.dx])
BoolContext.ps_push(t)
def __get_cf (self, ns):
return BoolContext.cf if ns == Here else BoolContext.find_frame(ns)
def receive_message (self, msg, ns=Here):
self.Log.trace('%sOBJ: <%s>' % (msg, self.model))
if msg in 'x:X:':
self.initialize()
return
if msg in 'q:Q:':
self.ps_push(ns)
return
if msg in 'n:N:name:Name:':
return
self.ps_push(ns)
BoolContext.invoke_model(self.model, msg)
class BoolRef (BoolTuple):
'''BOOL Reference Object class.'''
def __new__ (cls, model, dx):
return BoolTuple.__new__(cls, REF, model, dx)
def __init__ (self, *args):
BoolTuple.__init__(self, REF, *args)
def __getattr__ (self, name):
if name == 'model': return self[1]
if name == 'dx': return self[2]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['model','dx']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
return '(%s *%s #%d)' % (self.mtype, self.model, self.dx)
def initialize (self):
'''Initialize Reference.'''
mdl = BoolContext.find_model(self.model)
dkey = mdl.new_value()
BoolContext.cf.data[self.dx] = dkey
self.Log.trace('REF::Initialize: %s' % dkey)
def receive_value (self):
'''Pop a value from the PStack into the Reference.'''
obj = BooContext.ps_pop()
if self.model in ['any','one','list']:
ref = self.__get_ref(Here)
ref[1] = obj.model
ref[2] = obj.dkey
self.Log.trace('REF::ReceiveValue: %s' % ref)
return
if self.model != obj.model:
raise Exception('Type Mismatch: *%s = *%s' % (self.model, obj.model))
BoolContext.cf.data[self.dx] = obj.dkey
def ps_push (self, ns):
if self.model in ['any','one','list']:
ref = self.__get_ref(ns)
t = BoolParam(ref[1], ref[2])
BoolContext.ps_push(t)
return
cf = self.__get_cf(Here)
t = BoolParam(self.model, cf.data[self.dx])
BoolContext.ps_push(t)
def __get_ref (self, ns):
cf = self.__get_cf(ns)
mdl = BoolContext.find_model(self.model)
return mdl.get_value(cf.data[self.dx])
def __get_cf (self, ns):
return BoolContext.cf if ns == Here else BoolContext.find_frame(ns)
def receive_message (self, msg, ns=Here):
self.Log.trace('%sREF: <%s>' % (msg, self.model))
if msg in 'x:X:':
self.initialize()
return
if msg in 'r:R:':
self.receive_value()
return
if msg in 'q:Q:':
self.ps_push(ns)
return
self.ps_push(ns)
BoolContext.invoke_model(self.model, msg)
class BoolModel (BoolTuple):
'''BOOL Model Object class.'''
def __new__ (cls, name, data_mgr, actions, namtab={}, props=None):
return BoolTuple.__new__(cls, MDL, name, data_mgr, actions, namtab, props)
def __init__ (self, *args, **kwargs):
BoolTuple.__init__(self, MDL, *args)
def __getattr__ (self, name):
if name == 'name': return self[1]
if name == 'ds': return self[2]
if name == 'actions': return self[3]
if name == 'namtab': return self[4]
if name == 'props': return self[5]
return BoolTuple.__getattr__(self, name)
def __setattr__ (self, name, value):
if name in ['name','ds','actions','namtab','props']:
raise AttributeError(BoolTuple.E_ATTR_RO % name)
BoolTuple.__setattr__(self, name, value)
def __str__ (self):
s = '(%s *%s ds=%s actions=%d)'
t = (self.mtype, self.name, self.ds, len(self.actions))
return s % t
def new_value (self, datum=None):
dkey = self.ds.new_datum(datum)
self.Log.trace('Model::new-value: %s = %s' % (dkey,str(datum)))
return dkey
def get_value (self, dkey):
self.Log.trace('Model::get-value: %s' % dkey)
return self.ds.get_datum(dkey)
def set_value (self, dkey, datum):
self.Log.trace('Model::set-value: %s = %s' % (dkey,str(datum)))
self.ds.set_datum(dkey, datum)
def del_value (self, dkey):
self.ds.del_datum(dkey)
def find_action (self, msg_name):
if msg_name in self.actions:
return self.actions[msg_name]
return None
def receive_message (self, msg, ns=Here):
pass
class BoolContextObject (object):
Log = logger('CTX')
def __init__ (self):
self.ps = BoolPStack()
self.cf = BoolCallFrame(BoolAction('_root_','',0,[],[],[],[]))
self.actions = {}
self.models = {}
def ps_push (self, obj):
self.ps.push(obj)
def ps_pop (self):
return self.ps.pop()
def ps_peek (self):
return self.ps.peek()
def ps_push_value (self, model_name, datum, use_mdl=None):
mdl = use_mdl if use_mdl else self.find_model(model_name)
val = mdl.new_value(datum)
arg = BoolParam(mdl.name, val)
self.ps_push(arg)
def cf_push (self, action):
old_cf = self.cf
self.cf = BoolCallFrame(action, parent=old_cf)
def cf_pop (self):
self.cf = self.cf.parent
def load_actions (self, actions):
for a in actions:
self.actions[a.name] = a
def load_models (self, models):
for m in models:
self.models[m.name] = m
def find_action (self, action_name):
if action_name in self.actions:
return self.actions[action_name]
return None
def find_model (self, model_name):
if model_name in self.models:
return self.models[model_name]
return None
def find_frame (self, name):
cf = self.cf
while cf.name != '_root_':
if cf.name == name:
return cf
cf = cf.parent
return None
def search_frame (self, name):
self.Log.trace('search-frame: key=%s' % name)
cf = self.cf
while cf.name != '_root_':
self.Log.trace('search-frame: CF=%s' % cf.name)
act = cf.action
if name in act.namtab:
return act.namtab[name]
cf = cf.parent
return None
def find_object (self, address):
self.Log.trace('FindObject: %s' % str(address))
if address.offset < 0:
raise Exception('Illegal Address: %s' % str(address))
cf = self.cf
if address.ns != Here:
while address.ns != cf.name:
cf = cf.parent
if cf.name == '_root_':
raise Exception('Address not found: %s', str(address))
code = cf.action.codeblock
if len(code) <= address.offset:
raise Exception('Illegal Address: %s' % str(address))
return code[address.offset]
def send_message (self, msg, obj_addr):
'''Send-Message (message-name, target-address)'''
self.Log.debug('SendMesg: "%s" -> %s' % (msg, obj_addr))
if msg in 'z:Z:nop:':
return
if msg in 'w:W:where:addr:':
self.ps_push(obj_addr)
return
obj = self.find_object(obj_addr)
self.Log.trace('Dispatch Message to: %s' % str(obj))
obj.receive_message(msg, ns=obj_addr.ns)
def invoke_model (self, mdl_name, msg):
'''Invoke-Model (model-name, message-name)'''
self.Log.debug('Invoke-Model: "%s" msg=%s' % (mdl_name,msg))
mdl = self.find_model(mdl_name)
if not mdl:
raise Exception('Model Not Found: *%s' % model.name)
self.invoke_model_action(mdl, msg)
def invoke_model_action (self, mdl, msg):
'''Invoke-Model-Action (model-object, message-name)'''
self.Log.info()
self.Log.info('Invoke-Model-Action: "%s_%s"' % (mdl.name,msg))
act = mdl.find_action(msg)
if not act:
raise Exception('Model-Action Not Found: "%s:(*%s)"' % (msg,mdl.name))
native = mdl.props and ('native' in mdl.props)
if native:
self.Log.trace('Native Model-Action: ENTER')
act(mdl, msg)
self.Log.trace('Native Model-Action: EXIT')
self.Log.trace()
return
self.Log.trace('Model-Action: %s' % str(act))
self.cf_push(act)
for addr in act.initlist:
self.send_message('X:', addr)
for addr in act.exitlist:
self.send_message('X:', addr)
for addr in act.initlist:
self.send_message('R:', addr)
main_clause = act.clauses[0]
self.Log.debug('EXECUTE ACTION MAIN CLAUSE')
self.Log.trace(main_clause)
self.send_message('X:', main_clause[2])
for addr in act.exitlist:
self.send_message('Q:', addr)
self.cf_pop()
self.Log.debug('exit Model-Action: "%s"' % act.name)
self.Log.debug()
def invoke_action (self, act_name, params):
'''Invoke-Action (action-name, params-list)'''
self.Log.info()
self.Log.info('Invoke-Action: "%s"' % act_name)
act = self.find_action(act_name)
if not act:
raise Exception('Action Not Found: @%s' % act_name)
native = act.props and ('native' in act.props)
if native:
self.Log.trace('Native Action: ENTER')
code = act.props['code']
code(act, params)
self.Log.trace('Native Action: EXIT')
self.Log.trace()
return
self.Log.trace('Action: %s' % str(act))
self.cf_push(act)
for addr in act.initlist:
self.send_message('X:', addr)
for addr in act.exitlist:
self.send_message('X:', addr)
clauses = list(act.clauses)
main_clause = clauses.pop(0)
main_params = params.pop(0)
if main_clause[0] != main_params[0]:
raise Exception('Main Clause Parameter mismatch! [%s/%s]' % (main_clause[0]))
self.Log.debug('EXECUTE ACTION MAIN CLAUSE')
self.Log.trace(main_clause)
self.Log.trace(main_params)
self.send_message('X:', main_clause[2])
if 0 < len(params):
sub_params = params.pop(0)
for sub_clause in clauses:
while sub_clause[0] == sub_params[0]:
self.Log.debug('EXECUTE ACTION SUB CLAUSE')
self.Log.trace(sub_clause)
self.Log.trace(sub_params)
self.send_message('X:', sub_clause[2])
if len(params) == 0:
break
sub_params = params.pop(0)
if 0 < len(params):
raise Exception('Action did not use all parameter clauses!')
for addr in act.exitlist:
self.send_message('Q:', addr)
self.cf_pop()
self.Log.debug('exit Action: "%s"' % act.name)
self.Log.debug()
BoolContext = BoolContextObject()
if __name__ == '__main__':
pass