Skip to content

Commit bace2be

Browse files
committed
[client] Fix circular dependencies
1 parent 59e7077 commit bace2be

20 files changed

+107
-176
lines changed

pycti/api/opencti_api_client.py

+44
Original file line numberDiff line numberDiff line change
@@ -642,3 +642,47 @@ def get_stix_content(self, id):
642642
"""
643643
result = self.query(query, {"id": id})
644644
return json.loads(result["data"]["stix"])
645+
646+
@staticmethod
647+
def get_attribute_in_extension(key, object) -> any:
648+
if (
649+
"extensions" in object
650+
and "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba"
651+
in object["extensions"]
652+
and key
653+
in object["extensions"][
654+
"extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba"
655+
]
656+
):
657+
return object["extensions"][
658+
"extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba"
659+
][key]
660+
elif (
661+
"extensions" in object
662+
and "extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82"
663+
in object["extensions"]
664+
and key
665+
in object["extensions"][
666+
"extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82"
667+
]
668+
):
669+
return object["extensions"][
670+
"extension-definition--f93e2c80-4231-4f9a-af8b-95c9bd566a82"
671+
][key]
672+
return None
673+
674+
@staticmethod
675+
def get_attribute_in_mitre_extension(key, object) -> any:
676+
if (
677+
"extensions" in object
678+
and "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b"
679+
in object["extensions"]
680+
and key
681+
in object["extensions"][
682+
"extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b"
683+
]
684+
):
685+
return object["extensions"][
686+
"extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b"
687+
][key]
688+
return None

pycti/entities/opencti_attack_pattern.py

+8-18
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class AttackPattern:
1210
def __init__(self, opencti):
@@ -379,12 +377,10 @@ def import_from_stix2(self, **kwargs):
379377
if "x_mitre_id" in stix_object:
380378
x_mitre_id = stix_object["x_mitre_id"]
381379
elif (
382-
OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
383-
"id", stix_object
384-
)
380+
self.opencti.get_attribute_in_mitre_extension("id", stix_object)
385381
is not None
386382
):
387-
x_mitre_id = OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
383+
x_mitre_id = self.opencti.get_attribute_in_mitre_extension(
388384
"id", stix_object
389385
)
390386
elif "external_references" in stix_object:
@@ -405,39 +401,33 @@ def import_from_stix2(self, **kwargs):
405401
# Search in extensions
406402
if "x_opencti_order" not in stix_object:
407403
stix_object["x_opencti_order"] = (
408-
OpenCTIConnectorHelper.get_attribute_in_extension(
409-
"order", stix_object
410-
)
411-
if OpenCTIConnectorHelper.get_attribute_in_extension(
412-
"order", stix_object
413-
)
404+
self.opencti.get_attribute_in_extension("order", stix_object)
405+
if self.opencti.get_attribute_in_extension("order", stix_object)
414406
is not None
415407
else 0
416408
)
417409
if "x_mitre_platforms" not in stix_object:
418410
stix_object[
419411
"x_mitre_platforms"
420-
] = OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
412+
] = self.opencti.get_attribute_in_mitre_extension(
421413
"platforms", stix_object
422414
)
423415
if "x_mitre_permissions_required" not in stix_object:
424416
stix_object[
425417
"x_mitre_permissions_required"
426-
] = OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
418+
] = self.opencti.get_attribute_in_mitre_extension(
427419
"permissions_required", stix_object
428420
)
429421
if "x_mitre_detection" not in stix_object:
430422
stix_object[
431423
"x_mitre_detection"
432-
] = OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
424+
] = self.opencti.get_attribute_in_mitre_extension(
433425
"detection", stix_object
434426
)
435427
if "x_opencti_stix_ids" not in stix_object:
436428
stix_object[
437429
"x_opencti_stix_ids"
438-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
439-
"stix_ids", stix_object
440-
)
430+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
441431

442432
return self.create(
443433
stix_id=stix_object["id"],

pycti/entities/opencti_campaign.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class Campaign:
1210
def __init__(self, opencti):
@@ -332,9 +330,7 @@ def import_from_stix2(self, **kwargs):
332330
if "x_opencti_stix_ids" not in stix_object:
333331
stix_object[
334332
"x_opencti_stix_ids"
335-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
336-
"stix_ids", stix_object
337-
)
333+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
338334

339335
return self.create(
340336
stix_id=stix_object["id"],

pycti/entities/opencti_course_of_action.py

+4-12
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class CourseOfAction:
1210
def __init__(self, opencti):
@@ -335,12 +333,10 @@ def import_from_stix2(self, **kwargs):
335333
if "x_mitre_id" in stix_object:
336334
x_mitre_id = stix_object["x_mitre_id"]
337335
elif (
338-
OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
339-
"id", stix_object
340-
)
336+
self.opencti.get_attribute_in_mitre_extension("id", stix_object)
341337
is not None
342338
):
343-
x_mitre_id = OpenCTIConnectorHelper.get_attribute_in_mitre_extension(
339+
x_mitre_id = self.opencti.get_attribute_in_mitre_extension(
344340
"id", stix_object
345341
)
346342
elif "external_references" in stix_object:
@@ -357,15 +353,11 @@ def import_from_stix2(self, **kwargs):
357353
if "x_opencti_aliases" not in stix_object:
358354
stix_object[
359355
"x_opencti_aliases"
360-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
361-
"aliases", stix_object
362-
)
356+
] = self.opencti.get_attribute_in_extension("aliases", stix_object)
363357
if "x_opencti_stix_ids" not in stix_object:
364358
stix_object[
365359
"x_opencti_stix_ids"
366-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
367-
"stix_ids", stix_object
368-
)
360+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
369361

370362
return self.create(
371363
stix_id=stix_object["id"],

pycti/entities/opencti_identity.py

+7-18
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
98
from pycti.utils.constants import IdentityTypes
109

1110

@@ -388,45 +387,35 @@ def import_from_stix2(self, **kwargs):
388387
if "x_opencti_aliases" not in stix_object:
389388
stix_object[
390389
"x_opencti_aliases"
391-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
392-
"aliases", stix_object
393-
)
390+
] = self.opencti.get_attribute_in_extension("aliases", stix_object)
394391
if "x_opencti_organization_type" not in stix_object:
395392
stix_object[
396393
"x_opencti_organization_type"
397-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
394+
] = self.opencti.get_attribute_in_extension(
398395
"organization_type", stix_object
399396
)
400397
if "x_opencti_reliability" not in stix_object:
401398
stix_object[
402399
"x_opencti_reliability"
403-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
404-
"reliability", stix_object
405-
)
400+
] = self.opencti.get_attribute_in_extension("reliability", stix_object)
406401
if "x_opencti_organization_type" not in stix_object:
407402
stix_object[
408403
"x_opencti_organization_type"
409-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
404+
] = self.opencti.get_attribute_in_extension(
410405
"organization_type", stix_object
411406
)
412407
if "x_opencti_firstname" not in stix_object:
413408
stix_object[
414409
"x_opencti_firstname"
415-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
416-
"firstname", stix_object
417-
)
410+
] = self.opencti.get_attribute_in_extension("firstname", stix_object)
418411
if "x_opencti_lastname" not in stix_object:
419412
stix_object[
420413
"x_opencti_lastname"
421-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
422-
"lastname", stix_object
423-
)
414+
] = self.opencti.get_attribute_in_extension("lastname", stix_object)
424415
if "x_opencti_stix_ids" not in stix_object:
425416
stix_object[
426417
"x_opencti_stix_ids"
427-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
428-
"stix_ids", stix_object
429-
)
418+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
430419

431420
return self.create(
432421
type=type,

pycti/entities/opencti_incident.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class Incident:
1210
def __init__(self, opencti):
@@ -330,9 +328,7 @@ def import_from_stix2(self, **kwargs):
330328
if "x_opencti_stix_ids" not in stix_object:
331329
stix_object[
332330
"x_opencti_stix_ids"
333-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
334-
"stix_ids", stix_object
335-
)
331+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
336332

337333
return self.create(
338334
stix_id=stix_object["id"],

pycti/entities/opencti_indicator.py

+5-13
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class Indicator:
1210
"""Main Indicator class for OpenCTI
@@ -505,33 +503,27 @@ def import_from_stix2(self, **kwargs):
505503
if "x_opencti_score" not in stix_object:
506504
stix_object[
507505
"x_opencti_score"
508-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
509-
"score", stix_object
510-
)
506+
] = self.opencti.get_attribute_in_extension("score", stix_object)
511507
if "x_opencti_detection" not in stix_object:
512508
stix_object[
513509
"x_opencti_detection"
514-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
515-
"detection", stix_object
516-
)
510+
] = self.opencti.get_attribute_in_extension("detection", stix_object)
517511
if "x_opencti_main_observable_type" not in stix_object:
518512
stix_object[
519513
"x_opencti_main_observable_type"
520-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
514+
] = self.opencti.get_attribute_in_extension(
521515
"main_observable_type", stix_object
522516
)
523517
if "x_opencti_create_observables" not in stix_object:
524518
stix_object[
525519
"x_opencti_create_observables"
526-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
520+
] = self.opencti.get_attribute_in_extension(
527521
"create_observables", stix_object
528522
)
529523
if "x_opencti_stix_ids" not in stix_object:
530524
stix_object[
531525
"x_opencti_stix_ids"
532-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
533-
"stix_ids", stix_object
534-
)
526+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
535527

536528
return self.create(
537529
stix_id=stix_object["id"],

pycti/entities/opencti_infrastructure.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class Infrastructure:
1210
"""Main Infrastructure class for OpenCTI
@@ -388,9 +386,7 @@ def import_from_stix2(self, **kwargs):
388386

389387
# Search in extensions
390388
if "x_opencti_stix_ids" not in stix_object:
391-
stix_object[
392-
"x_opencti_stix_ids"
393-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
389+
stix_object["x_opencti_stix_ids"] = self.opencti.get_attribute_in_extension(
394390
"stix_ids", stix_object
395391
)
396392

pycti/entities/opencti_intrusion_set.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class IntrusionSet:
1210
def __init__(self, opencti):
@@ -344,9 +342,7 @@ def import_from_stix2(self, **kwargs):
344342
if "x_opencti_stix_ids" not in stix_object:
345343
stix_object[
346344
"x_opencti_stix_ids"
347-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
348-
"stix_ids", stix_object
349-
)
345+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
350346

351347
return self.create(
352348
stix_id=stix_object["id"],

pycti/entities/opencti_location.py

+4-14
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from stix2.canonicalization.Canonicalize import canonicalize
77

8-
from pycti.connector.opencti_connector_helper import OpenCTIConnectorHelper
9-
108

119
class Location:
1210
def __init__(self, opencti):
@@ -346,14 +344,10 @@ def import_from_stix2(self, **kwargs):
346344
if "x_opencti_location_type" in stix_object:
347345
type = stix_object["x_opencti_location_type"]
348346
elif (
349-
OpenCTIConnectorHelper.get_attribute_in_extension(
350-
"location_type", stix_object
351-
)
347+
self.opencti.get_attribute_in_extension("location_type", stix_object)
352348
is not None
353349
):
354-
type = OpenCTIConnectorHelper.get_attribute_in_extension(
355-
"location_type", stix_object
356-
)
350+
type = self.opencti.get_attribute_in_extension("location_type", stix_object)
357351
else:
358352
if "city" in stix_object:
359353
type = "City"
@@ -369,15 +363,11 @@ def import_from_stix2(self, **kwargs):
369363
if "x_opencti_aliases" not in stix_object:
370364
stix_object[
371365
"x_opencti_aliases"
372-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
373-
"aliases", stix_object
374-
)
366+
] = self.opencti.get_attribute_in_extension("aliases", stix_object)
375367
if "x_opencti_stix_ids" not in stix_object:
376368
stix_object[
377369
"x_opencti_stix_ids"
378-
] = OpenCTIConnectorHelper.get_attribute_in_extension(
379-
"stix_ids", stix_object
380-
)
370+
] = self.opencti.get_attribute_in_extension("stix_ids", stix_object)
381371

382372
return self.create(
383373
type=type,

0 commit comments

Comments
 (0)