Skip to content

Commit 4e2bfdd

Browse files
author
Aditya Radhakrishnan
authored
fix(ingest): emit status aspect for entities ingested from okta and azure_ad (#5700)
1 parent 9511715 commit 4e2bfdd

File tree

7 files changed

+556
-44
lines changed

7 files changed

+556
-44
lines changed

metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
GroupMembershipClass,
3636
OriginClass,
3737
OriginTypeClass,
38+
StatusClass,
3839
)
3940

4041
logger = logging.getLogger(__name__)
@@ -299,17 +300,27 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
299300
aspectName="origin",
300301
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
301302
)
302-
group_origin_wu_id = (
303-
f"group-origin-{group_count + 1}"
304-
if self.config.mask_group_id
305-
else datahub_corp_group_snapshot.urn
306-
)
303+
group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
307304
group_origin_wu = MetadataWorkUnit(
308305
id=group_origin_wu_id, mcp=group_origin_mcp
309306
)
310307
self.report.report_workunit(group_origin_wu)
311308
yield group_origin_wu
312309

310+
group_status_mcp = MetadataChangeProposalWrapper(
311+
entityType="corpGroup",
312+
entityUrn=datahub_corp_group_snapshot.urn,
313+
changeType=ChangeTypeClass.UPSERT,
314+
aspectName="status",
315+
aspect=StatusClass(removed=False),
316+
)
317+
group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
318+
group_status_wu = MetadataWorkUnit(
319+
id=group_status_wu_id, mcp=group_status_mcp
320+
)
321+
self.report.report_workunit(group_status_wu)
322+
yield group_status_wu
323+
313324
# Populate GroupMembership Aspects for CorpUsers
314325
datahub_corp_user_urn_to_group_membership: Dict[
315326
str, GroupMembershipClass
@@ -426,11 +437,7 @@ def ingest_ad_users(
426437
assert datahub_group_membership
427438
datahub_corp_user_snapshot.aspects.append(datahub_group_membership)
428439
mce = MetadataChangeEvent(proposedSnapshot=datahub_corp_user_snapshot)
429-
wu_id = (
430-
f"user-{user_count + 1}"
431-
if self.config.mask_user_id
432-
else datahub_corp_user_snapshot.urn
433-
)
440+
wu_id = f"user-snapshot-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
434441
wu = MetadataWorkUnit(id=wu_id, mce=mce)
435442
self.report.report_workunit(wu)
436443
yield wu
@@ -442,15 +449,23 @@ def ingest_ad_users(
442449
aspectName="origin",
443450
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
444451
)
445-
user_origin_wu_id = (
446-
f"user-origin-{user_count + 1}"
447-
if self.config.mask_user_id
448-
else datahub_corp_user_snapshot.urn
449-
)
452+
user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
450453
user_origin_wu = MetadataWorkUnit(id=user_origin_wu_id, mcp=user_origin_mcp)
451454
self.report.report_workunit(user_origin_wu)
452455
yield user_origin_wu
453456

457+
user_status_mcp = MetadataChangeProposalWrapper(
458+
entityType="corpuser",
459+
entityUrn=datahub_corp_user_snapshot.urn,
460+
changeType=ChangeTypeClass.UPSERT,
461+
aspectName="status",
462+
aspect=StatusClass(removed=False),
463+
)
464+
user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
465+
user_status_wu = MetadataWorkUnit(id=user_status_wu_id, mcp=user_status_mcp)
466+
self.report.report_workunit(user_status_wu)
467+
yield user_status_wu
468+
454469
def get_report(self) -> SourceReport:
455470
return self.report
456471

metadata-ingestion/src/datahub/ingestion/source/identity/okta.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
GroupMembershipClass,
3939
OriginClass,
4040
OriginTypeClass,
41+
StatusClass,
4142
)
4243

4344
logger = logging.getLogger(__name__)
@@ -279,11 +280,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
279280
datahub_corp_group_snapshots
280281
):
281282
mce = MetadataChangeEvent(proposedSnapshot=datahub_corp_group_snapshot)
282-
wu_id = (
283-
f"group-{group_count + 1}"
284-
if self.config.mask_group_id
285-
else datahub_corp_group_snapshot.urn
286-
)
283+
wu_id = f"group-snapshot-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
287284
wu = MetadataWorkUnit(id=wu_id, mce=mce)
288285
self.report.report_workunit(wu)
289286
yield wu
@@ -295,17 +292,27 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
295292
aspectName="origin",
296293
aspect=OriginClass(OriginTypeClass.EXTERNAL, "OKTA"),
297294
)
298-
group_origin_wu_id = (
299-
f"group-origin-{group_count + 1}"
300-
if self.config.mask_group_id
301-
else datahub_corp_group_snapshot.urn
302-
)
295+
group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
303296
group_origin_wu = MetadataWorkUnit(
304297
id=group_origin_wu_id, mcp=group_origin_mcp
305298
)
306299
self.report.report_workunit(group_origin_wu)
307300
yield group_origin_wu
308301

302+
group_status_mcp = MetadataChangeProposalWrapper(
303+
entityType="corpGroup",
304+
entityUrn=datahub_corp_group_snapshot.urn,
305+
changeType=ChangeTypeClass.UPSERT,
306+
aspectName="status",
307+
aspect=StatusClass(removed=False),
308+
)
309+
group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
310+
group_status_wu = MetadataWorkUnit(
311+
id=group_status_wu_id, mcp=group_status_mcp
312+
)
313+
self.report.report_workunit(group_status_wu)
314+
yield group_status_wu
315+
309316
# Step 2: Populate GroupMembership Aspects for CorpUsers
310317
datahub_corp_user_urn_to_group_membership: Dict[str, GroupMembershipClass] = {}
311318
if self.config.ingest_group_membership and okta_groups is not None:
@@ -370,11 +377,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
370377
assert datahub_group_membership is not None
371378
datahub_corp_user_snapshot.aspects.append(datahub_group_membership)
372379
mce = MetadataChangeEvent(proposedSnapshot=datahub_corp_user_snapshot)
373-
wu_id = (
374-
f"user-{user_count + 1}"
375-
if self.config.mask_user_id
376-
else datahub_corp_user_snapshot.urn
377-
)
380+
wu_id = f"user-snapshot-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
378381
wu = MetadataWorkUnit(id=wu_id, mce=mce)
379382
self.report.report_workunit(wu)
380383
yield wu
@@ -386,17 +389,27 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
386389
aspectName="origin",
387390
aspect=OriginClass(OriginTypeClass.EXTERNAL, "OKTA"),
388391
)
389-
user_origin_wu_id = (
390-
f"user-origin-{user_count + 1}"
391-
if self.config.mask_user_id
392-
else datahub_corp_user_snapshot.urn
393-
)
392+
user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
394393
user_origin_wu = MetadataWorkUnit(
395394
id=user_origin_wu_id, mcp=user_origin_mcp
396395
)
397396
self.report.report_workunit(user_origin_wu)
398397
yield user_origin_wu
399398

399+
user_status_mcp = MetadataChangeProposalWrapper(
400+
entityType="corpuser",
401+
entityUrn=datahub_corp_user_snapshot.urn,
402+
changeType=ChangeTypeClass.UPSERT,
403+
aspectName="status",
404+
aspect=StatusClass(removed=False),
405+
)
406+
user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
407+
user_status_wu = MetadataWorkUnit(
408+
id=user_status_wu_id, mcp=user_status_mcp
409+
)
410+
self.report.report_workunit(user_status_wu)
411+
yield user_status_wu
412+
400413
# Step 4: Close the event loop
401414
event_loop.close()
402415

metadata-ingestion/tests/integration/azure_ad/azure_ad_mces_golden_default_config.json

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"admins": [],
1313
"members": [],
1414
"groups": [],
15-
"description": null
15+
"description": null,
16+
"slack": null
1617
}
1718
}
1819
]
@@ -46,6 +47,25 @@
4647
"properties": null
4748
}
4849
},
50+
{
51+
"auditHeader": null,
52+
"entityType": "corpGroup",
53+
"entityUrn": "urn:li:corpGroup:groupDisplayName1",
54+
"entityKeyAspect": null,
55+
"changeType": "UPSERT",
56+
"aspectName": "status",
57+
"aspect": {
58+
"value": "{\"removed\": false}",
59+
"contentType": "application/json"
60+
},
61+
"systemMetadata": {
62+
"lastObserved": 1629795600000,
63+
"runId": "test-azure-ad",
64+
"registryName": null,
65+
"registryVersion": null,
66+
"properties": null
67+
}
68+
},
4969
{
5070
"auditHeader": null,
5171
"proposedSnapshot": {
@@ -59,7 +79,8 @@
5979
"admins": [],
6080
"members": [],
6181
"groups": [],
62-
"description": "This is an interesting description"
82+
"description": "This is an interesting description",
83+
"slack": null
6384
}
6485
}
6586
]
@@ -93,6 +114,25 @@
93114
"properties": null
94115
}
95116
},
117+
{
118+
"auditHeader": null,
119+
"entityType": "corpGroup",
120+
"entityUrn": "urn:li:corpGroup:groupDisplayName2",
121+
"entityKeyAspect": null,
122+
"changeType": "UPSERT",
123+
"aspectName": "status",
124+
"aspect": {
125+
"value": "{\"removed\": false}",
126+
"contentType": "application/json"
127+
},
128+
"systemMetadata": {
129+
"lastObserved": 1629795600000,
130+
"runId": "test-azure-ad",
131+
"registryName": null,
132+
"registryVersion": null,
133+
"properties": null
134+
}
135+
},
96136
{
97137
"auditHeader": null,
98138
"proposedSnapshot": {
@@ -153,6 +193,25 @@
153193
"properties": null
154194
}
155195
},
196+
{
197+
"auditHeader": null,
198+
"entityType": "corpuser",
199+
"entityUrn": "urn:li:corpuser:johngreen@acryl.io",
200+
"entityKeyAspect": null,
201+
"changeType": "UPSERT",
202+
"aspectName": "status",
203+
"aspect": {
204+
"value": "{\"removed\": false}",
205+
"contentType": "application/json"
206+
},
207+
"systemMetadata": {
208+
"lastObserved": 1629795600000,
209+
"runId": "test-azure-ad",
210+
"registryName": null,
211+
"registryVersion": null,
212+
"properties": null
213+
}
214+
},
156215
{
157216
"auditHeader": null,
158217
"proposedSnapshot": {
@@ -212,5 +271,24 @@
212271
"registryVersion": null,
213272
"properties": null
214273
}
274+
},
275+
{
276+
"auditHeader": null,
277+
"entityType": "corpuser",
278+
"entityUrn": "urn:li:corpuser:adamhall@acryl.io",
279+
"entityKeyAspect": null,
280+
"changeType": "UPSERT",
281+
"aspectName": "status",
282+
"aspect": {
283+
"value": "{\"removed\": false}",
284+
"contentType": "application/json"
285+
},
286+
"systemMetadata": {
287+
"lastObserved": 1629795600000,
288+
"runId": "test-azure-ad",
289+
"registryName": null,
290+
"registryVersion": null,
291+
"properties": null
292+
}
215293
}
216294
]

0 commit comments

Comments
 (0)