This repository has been archived on 2026-03-20. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
datavaultgenerator-1.1.5/DataVaultGenerator/Mapping.py

339 lines
15 KiB
Python

import logging
import re
from .Components import ErrorCollection, log
'''
targetmapping =
source: (class Mapping)
- target: costcenter_h (targetmapping)
type: mappingonly #d.h der Hub wird nicht aus dieser Tabelle beladen, aber die Satelliten und Links müssen das Mapping kennen
mappingmode: implicit|explicit
mapping:
- [creationname, creationname]
- target: costcentertype_h
type: master # Hauptquelle für hub
mapping:
- [creationname, creationname]
'''
'''
SourcesMapping
- TargetMapping 1..n
- AttributeMapping 1..n
'''
class AttributeMappingExpression:
"""
expression => Expression, e.g.: "concat({attribtute1},'-',{attribute2})"
entity => base entity
"""
def __init__(self, entity, expression: str = '', resulttype: str = '', alias:str = ''):
self._rawexpression = expression
self._resulttype = resulttype
self.entity = entity
self.alias = alias
@property
def datatype(self):
return self._resulttype
@property
def native_datatype(self):
"""Returns the native datatype expression. E.g. nvarchar"""
return self.datatype[:self.datatype.find('(')].strip().lower() if self.datatype.find(
'(') != -1 else self.datatype
# IMPROVE: Müsste eigentlich in der Attributdefinition explizit drin stehen.
@property
def native_datatypelength(self):
"""Returns the native datatype length. E.g. nvarchar(100) -> 100 """
return self.datatype[self.datatype.find('(') + 1:self.datatype.find(')')].strip().lower() if self.datatype.find('(') != -1 else ''
# IMPROVE: Müsste eigentlich in der Attributdefinition explizit drin stehen.
@property
def expression(self):
template = self.entity.model.basetemplates.get('attribute_expression')
parsed_result = self._rawexpression
for placeholder, attr in self.get_expression_attributes().items(): #IMPROVE: braucht man hier überhaupt die Attributeinstanzen? -> Der Name des platzhalters müsste ausreichen
if attr:
parsed_result = parsed_result.replace('{' + str(placeholder) + '}', template.render(component=[attr.name]))
return parsed_result
def get_expression_attributes(self):
""" Parses Querystrings like: concat({attribtute1},'-',{attribute2})
and returns a dict of attribute instances. """
regex = r"\{(.*?)?\}"
attributes = {}
matches = re.finditer(regex, self._rawexpression, re.MULTILINE)
for matchNum, match in enumerate(matches):
for groupNum in range(0, len(match.groups())):
attributes[match.group(1)] = self.entity.get_attribute(match.group(1))
return attributes
class AttributeMapping:
def __init__(self, targetmapping, source:str, target:str, transformation:str=''):
self.targetmapping = targetmapping
self._source = source # => 'attributename' or '{expression: "concat({attribute1},...)"}'
self._target = target # => 'attributename'
self.transformation = transformation
def __repr__(self):
return "AttributeMapping: <{0}> -> <{1}>".format(self._source, self._target)
@property
def source(self):
entity = self.targetmapping.sourceentity #IMPROVE: ggf. schon beim _init_ ermitteln, wenn damit die Validierung nicht vorweggenommen wird
if type(self._source) is dict:
return AttributeMappingExpression(entity , expression = self._source.get('expression')
, resulttype = self.target.datatype
, alias = self.target.name )
else: #falls nur ein attributname angegeben wurde, erzeuge künstliche expression:
attr = entity.get_attribute(self._source)
return AttributeMappingExpression(entity , expression = '{'+ self._source+'}'
, resulttype = attr.datatype
, alias = attr.name )
@property
def target(self):
#entity = self.targetmapping.model.get_entity(self.targetmapping.targetentityname) #IMPROVE: ggf. schon beim _init_ ermitteln, wenn damit die Validierung nicht vorweggenommen wird
return self.targetmapping.targetentity.get_attribute(self._target)
@property
def targetattribute_name(self): #fixme: müsste umgestellt werden auf target.name
return self._target
@property
def transformation_name(self):
return self.transformation
def validate(self):
errors = ErrorCollection()
if self.targetmapping.targetentity.get_attribute(self._target) is None:
errors.add("VALIDATION ERROR",
("Mapping", "<" + self.targetmapping.sourceentityname + ">","target <"+self.targetmapping.targetentityname+">"),
f'target attribute <{self._target}> not found')
sourceentity = self.targetmapping.sourceentity
if type(self._source) is dict:
am_expr = AttributeMappingExpression(sourceentity , expression = self._source.get('expression'), resulttype = None, alias = None)
for attrname, attr in am_expr.get_expression_attributes().items():
if attr is None:
errors.add("VALIDATION ERROR",
("Mapping", "<" + self.targetmapping.sourceentityname + ">",
"target <" + self.targetmapping.targetentityname + ">"),
f'attribute <{attrname}> in sourceexpression "{am_expr._rawexpression}" not found')
else: #falls nur ein attributname angegeben wurde, erzeuge künstliche expression:
attr = sourceentity.get_attribute(self._source)
if attr is None:
errors.add("VALIDATION ERROR",
("Mapping", "<" + self.targetmapping.sourceentityname + ">",
"target <" + self.targetmapping.targetentityname + ">"),
f'source attribute <{self._source}> not found')
return errors
class TargetMapping:
def __init__(self, model, sourceentityname: str, definition: dict = None):
self.model = model
self._definition = definition
self.sourceentityname = sourceentityname
self.targetentityname = definition.get('target')
self.mappingmode = definition.get('mappingmode', 'implicit')
self.type = definition.get('type', 'master')
self.attributemappings = []
self.explicitattributemappings = definition.get('mapping')
# TODO: Wäre besser in der Validierung:
if self.targetentity is None: #vgl. @property
logging.error('mapping: <%s> - target <%s> not found', self.sourceentityname, self.targetentityname)
# durch denn vorzeitigen return, wird die Validierung trozdem ausgeführt, sodass zumindest die Konsolenausgabe passt.
return
if self.mappingmode == 'implicit':
#get implicit roles from config.entitydefaults.
implicit_roles = self.model.config.entitydefaults.get(self.targetentity.type).get('map_implicit_roles', 'base')
for ta in self.targetentity.get_attributes(implicit_roles):
self.attributemappings.append(AttributeMapping(self, ta.name, ta.name)) # das funktioniert nicht, wenn später ein explizites mappin auftaucht, dass eine expression hat
logging.debug('Mapping <%s>: Created implicit attribute mappings for target <%s>: %s', self.sourceentityname,
self.targetentityname, self.attributemappings)
if self.explicitattributemappings:
logging.debug("explicit mappings: %s", self.explicitattributemappings)
logging.debug("result mappings (1): %s", self.attributemappings)
# process all explicit mapped attributes and overwrite existing implicitit attributes if exists:
i = 0
for explicitattributemapping in self.explicitattributemappings: # am = attributemapping
existing_index = None
transformationname = explicitattributemapping[2] if len(explicitattributemapping) == 3 else ''
if self.mappingmode == 'explicit':
self.attributemappings.append(
AttributeMapping(self, explicitattributemapping[0], explicitattributemapping[1],
transformationname))
else:
for am in self.attributemappings:
if am._source == explicitattributemapping[0] or am._target == explicitattributemapping[1]:
existing_index = i
logging.debug("replace Attribute mapping %s at index %s with mapping %s", am,
existing_index, explicitattributemapping)
i = 0
break
else:
existing_index = None
i += 1
if (existing_index is not None):
logging.debug(
'Mapping <%s>: Replace implicit Mapping: %s at index %s with explicit mapping: %s',
self.sourceentityname, self.attributemappings[existing_index], existing_index, #FIXME: Wenn es im Ziel ein implizit gemapptes Attribut mit einer Rolle gibt, die nicht in map_implicit_roles konfiguriert ist, gibt es einen index overflow
explicitattributemapping)
self.attributemappings[existing_index] = AttributeMapping(self,
explicitattributemapping[0],
explicitattributemapping[1],
transformationname)
else:
logging.debug('Mapping <%s>: Adding explicit attributemapping %s', self.sourceentityname,
explicitattributemapping)
self.attributemappings.append(
AttributeMapping(self,
explicitattributemapping[0],
explicitattributemapping[1],
transformationname))
logging.debug("result mappings (2): %s", self.attributemappings)
@property
def targetentity(self):
return self.model.get_entity(self._definition.get('target'))
@property
def sourceentity(self):
return self.model.get_entity(self.sourceentityname)
def get_attribute_mappings(self):
return self.attributemappings
def validate(self):
errors = ErrorCollection()
# ------ Validation Attributes: ---------
# sourceentity = self.model.get_entity(self.sourceentityname)
for am in self.get_attribute_mappings():
errors.append(am.validate())
# TODO: Check for Data-Truncation (macht erst Sinn, wenn die data-type-definition explizit/sauber ist)
# ------ Validation of explit mappings : ---------
if self.explicitattributemappings:
target =[]
for explicitattributemapping in self.explicitattributemappings:
if explicitattributemapping[1] not in target:
target.append(explicitattributemapping[1])
else:
errors.add("VALIDATION ERROR",
("Mapping", "<" + self.sourceentityname + ">",
"target <" + self.targetentityname + ">"),
f'More than one attribute from same source mapped to <{explicitattributemapping[1]}>')
return errors
class Mapping:
def __init__(self, model, sourceentityname: str, filename: str, definition: dict = None):
self.model = model
self.definition = definition
self.targetmappings = {}
self.sourceentityname = sourceentityname
self.filename = filename
self.type = 'mapping'
# FIXE: Das Laden der TargetMappings im _init_ verhindert eine klare Trennung von Validate und Laden.
for tm in self.definition:
self.targetmappings[tm.get('target')] = TargetMapping(model, sourceentityname, tm)
def get_attribute_mappings_by_target(self, targetentityname: str):
tm = self.get_targetmapping_by_target(targetentityname)
am = tm.get_attribute_mappings()
implicit_roles = self.model.config.entitydefaults.get(tm.targetentity.type).get('map_implicit_roles', 'base')
if not am and tm.targetentity.get_attributes(implicit_roles): # Links müssen keine Attribute haben, daher nur Fehler melden, falls Link ein Attribut hat
logging.error(f"Mapping <%s>: No mapping for '{targetentityname}' found", self.sourceentityname)
return am
def get_target_entities(self):
"""returns list of direct mapped entites"""
return [tm.targetentity for tm in self.targetmappings.values()]
def validate(self):
errors = ErrorCollection()
# ------ Validating Entity: ---------
if self.model.get_entity(self.sourceentityname) is None:
errors.add("VALIDATION ERROR",
(self.filename, "Mapping", "<" + self.sourceentityname + ">"),
"delivery <" + self.sourceentityname + "> not found")
return errors
# ------ Validation of linked entities -----
# Validation if all linked entities are present in the same mapping
for tm in self.definition:
e = self.model.get_entity(tm.get('target'))
if e is None:
errors.add("VALIDATION ERROR",
(self.filename, "Mapping", "<"+self.sourceentityname+">"),
"target <"+tm.get('target')+"> not found")
if errors.count > 0:
return errors
targetentities = self.get_target_entities()
links = (e for e in targetentities if e.type == 'link')
for link in links:
for le in link.get_linked_entities():
if le not in targetentities:
errors.add("VALIDATION ERROR",
(self.filename, "Mapping", "<" + self.sourceentityname + ">"),
"linked entity for link <" + link.name + "> is missing. Please provide a mapping for <"+le.name+"> in this mapping")
return errors
# ------ Validating Targetmapping: ---------
for name, tm in self.targetmappings.items():
errors.append(tm.validate())
return errors
def get_targetmapping_by_target(self, target: str):
return self.targetmappings.get(target)
def get_targetmappings(self):
return self.targetmappings