forked from OpenCTI-Platform/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopencti_stix.py
69 lines (63 loc) · 2.22 KB
/
opencti_stix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class Stix:
def __init__(self, opencti):
self.opencti = opencti
"""
Delete a Stix element
:param id: the Stix element id
:return void
"""
def delete(self, **kwargs):
id = kwargs.get("id", None)
if id is not None:
self.opencti.app_logger.info("Deleting Stix element", {"id": id})
query = """
mutation StixEdit($id: ID!) {
stixEdit(id: $id) {
delete
}
}
"""
self.opencti.query(query, {"id": id})
else:
self.opencti.app_logger.error("[opencti_stix] Missing parameters: id")
return None
"""
Merge a Stix-Object object field
:param id: the Stix-Object id
:param key: the key of the field
:param value: the value of the field
:return The updated Stix-Object object
"""
def merge(self, **kwargs):
id = kwargs.get("id")
stix_objects_ids = kwargs.get("object_ids")
if id is not None and stix_objects_ids is not None:
self.opencti.app_logger.info(
"Merging Stix object", {"id": id, "sources": ",".join(stix_objects_ids)}
)
query = """
mutation StixEdit($id: ID!, $stixObjectsIds: [String]!) {
stixEdit(id: $id) {
merge(stixObjectsIds: $stixObjectsIds) {
id
standard_id
entity_type
}
}
}
"""
result = self.opencti.query(
query,
{
"id": id,
"stixObjectsIds": stix_objects_ids,
},
)
return self.opencti.process_multiple_fields(
result["data"]["stixEdit"]["merge"]
)
else:
self.opencti.app_logger.error(
"[opencti_stix] Missing parameters: id and object_ids"
)
return None