This repository has been archived on 2026-03-20. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files

90 lines
3.1 KiB
Python

from DataVaultGenerator.Components import DataVaultEntity, MappingSource, ErrorCollection
class Delivery(DataVaultEntity, MappingSource):
def __init__(self, model, filename, definition: dict = None):
DataVaultEntity.__init__(self, model, filename, definition)
MappingSource.__init__(self, model, self)
self.properties = definition.get('properties', {})
@property
def delta_attribute(self):
return self.get_attribute(self._definition.get('deltaattribute'))
@property
def delta_initialvalue(self):
return self._definition.get('deltainitialvalue')
@property
def recordsource(self):
return self._definition.get('recordsource', '')
@property
def batchmode(self):
return self._definition.get('batchmode', 'single') # multi, single
@property
def deliverymode(self):
return 'delta' if self._definition.get('deltaattribute') else 'full' # multi, single
@property
def interfaces(self):
return [self.model.get_interface(i) for i in self._definition['interfaces']]
@property
def ldts_source(self):
return self.get_attribute(self._definition.get('ldts_source'))
@property
def overwrite_ldts(self):
return True if self._definition.get('ldts_source') else False
@property
def query(self):
return self._definition.get('query', '')
@property
def source_system(self):
if self._definition.get('sourcesystem'):
return self.model.get_source_system(self._definition.get('sourcesystem'))
else:
return self.interfaces[0].source_system
@property
def source_type(self):
return self._definition.get('sourcetype',self.interfaces[0].source_type)
def get_component_entities(self):
return [{'entity': self, 'component': c, 'type': c.type} for c in
self.interfaces]
def validate(self):
errors = ErrorCollection()
for attr in self.attributes.values():
spec = self.layer.sys_specification
errors.append(attr.validate(spec))
# Validating entity references:
for i in self._definition['interfaces']:
if self.model.get_interface(i) is None:
errors.add("VALIDATION ERROR",
(self.filename, "Delivery", "<" + self.name + ">"),
f'Interface <{i}> not found')
if self._definition.get('deltaattribute'):
if self.delta_attribute is None:
errors.add("VALIDATION ERROR",
(self.filename, "Delivery", "<" + self.name + ">"),
f'Deltaattribute <{self._definition.get("deltaattribute")}> not found in attributes.')
if self._definition.get('ldts_source'):
if self.ldts_source is None:
errors.add("VALIDATION ERROR",
(self.filename, "Delivery", "<" + self.name + ">"),
f'ldts_source <{self._definition.get("ldts_source")}> not found in attributes.')
return errors