Automation
data-lineage-tracker
Track data origin, transformations
---
slug: "data-lineage-tracker"
display_name: "Data Lineage Tracker"
description: "Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues."
---
# Data Lineage Tracker for Construction
## Overview
Track the origin, transformations, and flow of construction data through systems. Provides audit trails for compliance, helps debug data issues, and ensures data governance.
## Business Case
Construction projects require data accountability:
- **Audit Compliance**: Know where every number came from
- **Issue Resolution**: Trace data problems to their source
- **Change Impact**: Understand what downstream systems are affected
- **Regulatory Requirements**: Maintain data provenance for legal/insurance
## Technical Implementation
```python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import json
import hashlib
import uuid
class TransformationType(Enum):
EXTRACT = "extract"
TRANSFORM = "transform"
LOAD = "load"
AGGREGATE = "aggregate"
JOIN = "join"
FILTER = "filter"
CALCULATE = "calculate"
MANUAL_EDIT = "manual_edit"
IMPORT = "import"
EXPORT = "export"
@dataclass
class DataSource:
id: str
name: str
system: str
location: str
owner: str
created_at: datetime
@dataclass
class TransformationStep:
id: str
transformation_type: TransformationType
description: str
input_entities: List[str]
output_entities: List[str]
logic: str # SQL, Python, or description
performed_by: str # user or system
performed_at: datetime
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DataEntity:
id: str
name: str
source_id: str
entity_type: str # table, file, field, record
created_at: datetime
version: int = 1
checksum: Optional[str] = None
parent_entities: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LineageRecord:
id: str
entity_id: str
transformation_id: str
upstream_entities: List[str]
downstream_entities: List[str]
recorded_at: datetime
class ConstructionDataLineageTracker:
"""Track data lineage for construction data flows."""
def __init__(self, project_id: str):
self.project_id = project_id
self.sources: Dict[str, DataSource] = {}
self.entities: Dict[str, DataEntity] = {}
self.transformations: Dict[str, TransformationStep] = {}
self.lineage_records: List[LineageRecord] = []
def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
"""Register a new data source."""
source = DataSource(
id=f"SRC-{uuid.uuid4().hex[:8]}",
name=name,
system=system,
location=location,
owner=owner,
created_at=datetime.now()
)
self.sources[source.id] = source
return source
def register_entity(self, name: str, source_id: str, entity_type: str,
parent_entities: List[str] = None,
metadata: Dict = None) -> DataEntity:
"""Register a data entity (table, file, field)."""
entity = DataEntity(
id=f"ENT-{uuid.uuid4().hex[:8]}",
name=name,
source_id=source_id,
entity_type=entity_type,
created_at=datetime.now(),
parent_entities=parent_entities or [],
metadata=metadata or {}
)
self.entities[entity.id] = entity
return entity
def calculate_checksum(self, data: Any) -> str:
"""Calculate checksum for data verification."""
if isinstance(data, str):
content = data
else:
content = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def record_transformation(self,
transformation_type: TransformationType,
description: str,
input_entities: List[str],
output_entities: List[str],
logic: str,
performed_by: str,
parameters: Dict = None) -> TransformationStep:
"""Record a data transformation."""
transformation = TransformationStep(
id=f"TRF-{uuid.uuid4().hex[:8]}",
transformation_type=transformation_type,
description=description,
input_entities=input_entities,
output_entities=output_entities,
logic=logic,
performed_by=performed_by,
performed_at=datetime.now(),
parameters=parameters or {}
)
self.transformations[transformation.id] = transformation
# Create lineage records
for output_id in output_entities:
record = LineageRecord(
id=f"LIN-{uuid.uuid4().hex[:8]}",
entity_id=output_id,
transformation_id=transformation.id,
upstream_entities=input_entities,
downstream_entities=[],
recorded_at=datetime.now()
)
self.lineage_records.append(record)
# Update downstream references for input entities
for input_id in input_entities:
for existing_record in self.lineage_records:
if existing_record.entity_id == input_id:
existing_record.downstream_entities.append(output_id)
return transformation
def trace_upstream(self, entity_id: str, depth: int = None) -> List[Dict]:
"""Trace all upstream sources of an entity."""
visited = set()
lineage = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# Find transformations that produced this entity
for record in self.lineage_records:
if record.entity_id == eid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
lineage.append({
'entity': entity.name,
'entity_id': eid,
'depth': current_depth,
'transformation': transformation.description,
'transformation_type': transformation.transformation_type.value,
'performed_at': transformation.performed_at.isoformat(),
'performed_by': transformation.performed_by,
'upstream': record.upstream_entities
})
for upstream_id in record.upstream_entities:
trace(upstream_id, current_depth + 1)
trace(entity_id, 0)
return sorted(lineage, key=lambda x: x['depth'])
def trace_downstream(self, entity_id: str, depth: int = None) -> List[Dict]:
"""Trace all downstream dependencies of an entity."""
visited = set()
dependencies = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# Find entities that use this entity
for record in self.lineage_records:
if eid in record.upstream_entities:
transformation = self.transformations.get(record.transformation_id)
if transformation:
dependencies.append({
'entity': self.entities[record.entity_id].name if record.entity_id in self.entities else record.entity_id,
'entity_id': record.entity_id,
'depth': current_depth,
'transformation': transformation.description,
'transformation_type': transformation.transformation_type.value
})
trace(record.entity_id, current_depth + 1)
trace(entity_id, 0)
return sorted(dependencies, key=lambda x: x['depth'])
def get_entity_history(self, entity_id: str) -> List[Dict]:
"""Get complete history of changes to an entity."""
history = []
for record in self.lineage_records:
if record.entity_id == entity_id:
transformation = self.transformations.get(record.transformation_id)
if transformation:
history.append({
'timestamp': transformation.performed_at.isoformat(),
'action': transformation.transformation_type.value,
'description': transformation.description,
'performed_by': transformation.performed_by,
'inputs': [
self.entities[eid].name if eid in self.entities else eid
for eid in record.upstream_entities
]
... (truncated)
automation
By
Comments
Sign in to leave a comment