|
6 | 6 | from pycti import OpenCTIApiClient
|
7 | 7 |
|
8 | 8 | # Variables
|
9 |
| -api_url = "https://demo.opencti.io" |
10 |
| -api_token = "YOUR_TOKEN" |
| 9 | +api_url = "http://opencti:4000" |
| 10 | +api_token = "bfa014e0-e02e-4aa6-a42b-603b19dcf159" |
11 | 11 |
|
12 | 12 | # OpenCTI initialization
|
13 | 13 | opencti_api_client = OpenCTIApiClient(api_url, api_token)
|
14 | 14 |
|
15 |
| -# Parameters |
16 |
| -parser = argparse.ArgumentParser(description="Mandatory arguments.") |
17 |
| -parser.add_argument( |
18 |
| - "--entity-type", |
19 |
| - dest="entity_type", |
20 |
| - default="Intrusion-Set", |
21 |
| - required=True, |
22 |
| - help="Type of the threat (Threat-Actor, Intrusion-Set, Campaign, X-OpenCTI,-Incident, Malware, Tool, Attack-Pattern)", |
23 |
| -) |
24 |
| -parser.add_argument( |
25 |
| - "--name", |
26 |
| - dest="name", |
27 |
| - required=True, |
28 |
| - help="Name of the threat", |
29 |
| -) |
30 |
| -parser.add_argument( |
31 |
| - "--created-after", |
32 |
| - dest="createdAfter", |
33 |
| - help="Indicator created before (ISO date)", |
34 |
| -) |
35 |
| -parser.add_argument( |
36 |
| - "--created-before", |
37 |
| - dest="createdBefore", |
38 |
| - help="Indicator created after (ISO date)", |
39 |
| -) |
40 |
| -parser.add_argument( |
41 |
| - "--tags", |
42 |
| - dest="tags", |
43 |
| - required=True, |
44 |
| - help="Tags to add or remove (separated by ,)", |
45 |
| -) |
46 |
| -parser.add_argument( |
47 |
| - "--operation", |
48 |
| - dest="operation", |
49 |
| - required=True, |
50 |
| - default="add", |
51 |
| - help="Operation (add/remove)", |
52 |
| -) |
53 |
| -args = parser.parse_args() |
54 | 15 |
|
55 |
| -entity_type = args.entity_type |
56 |
| -name = args.name |
57 |
| -created_after = parse(args.createdAfter).strftime("%Y-%m-%dT%H:%M:%SZ") |
58 |
| -created_before = parse(args.createdBefore).strftime("%Y-%m-%dT%H:%M:%SZ") |
59 |
| -tags = args.tags.split(",") |
60 |
| -operation = args.operation |
| 16 | +def main(): |
| 17 | + # Parameters |
| 18 | + parser = argparse.ArgumentParser(description="Mandatory arguments.") |
| 19 | + parser.add_argument( |
| 20 | + "--entity-type", |
| 21 | + dest="entity_type", |
| 22 | + default="Intrusion-Set", |
| 23 | + required=True, |
| 24 | + help="Type of the threat (Threat-Actor, Intrusion-Set, Campaign, X-OpenCTI,-Incident, Malware, Tool, Attack-Pattern)", |
| 25 | + ) |
| 26 | + parser.add_argument( |
| 27 | + "--name", |
| 28 | + dest="name", |
| 29 | + required=True, |
| 30 | + help="Name of the threat", |
| 31 | + ) |
| 32 | + parser.add_argument( |
| 33 | + "--created-after", |
| 34 | + dest="createdAfter", |
| 35 | + help="Indicator created before (ISO date)", |
| 36 | + ) |
| 37 | + parser.add_argument( |
| 38 | + "--created-before", |
| 39 | + dest="createdBefore", |
| 40 | + help="Indicator created after (ISO date)", |
| 41 | + ) |
| 42 | + parser.add_argument( |
| 43 | + "--tags", |
| 44 | + dest="tags", |
| 45 | + required=True, |
| 46 | + help="Tags to add or remove (separated by ,)", |
| 47 | + ) |
| 48 | + parser.add_argument( |
| 49 | + "--operation", |
| 50 | + dest="operation", |
| 51 | + required=True, |
| 52 | + default="add", |
| 53 | + help="Operation (add/remove)", |
| 54 | + ) |
| 55 | + args = parser.parse_args() |
61 | 56 |
|
62 |
| -# Resolve the entity |
63 |
| -threat = opencti_api_client.stix_domain_object.read( |
64 |
| - types=[entity_type], filters=[{"key": "name", "values": [name]}] |
65 |
| -) |
| 57 | + entity_type = args.entity_type |
| 58 | + name = args.name |
| 59 | + created_after = parse(args.createdAfter).strftime("%Y-%m-%dT%H:%M:%SZ") |
| 60 | + created_before = parse(args.createdBefore).strftime("%Y-%m-%dT%H:%M:%SZ") |
| 61 | + tags = args.tags.split(",") |
| 62 | + operation = args.operation |
66 | 63 |
|
67 |
| -if not threat: |
68 |
| - raise ValueError("Cannot find the entity with the name " + name) |
| 64 | + # Resolve the entity |
| 65 | + threat = opencti_api_client.stix_domain_object.read( |
| 66 | + types=[entity_type], filters=[{"key": "name", "values": [name]}] |
| 67 | + ) |
69 | 68 |
|
70 |
| -# Resolve all tags |
71 |
| -labels = [] |
72 |
| -for tag in tags: |
73 |
| - labels.append(opencti_api_client.label.create(value=tag)) |
| 69 | + if not threat: |
| 70 | + raise ValueError("Cannot find the entity with the name " + name) |
74 | 71 |
|
75 |
| -# Get indicators |
76 |
| -custom_attributes = """ |
77 |
| - id |
78 |
| - created_at |
79 |
| -""" |
| 72 | + # Resolve all tags |
| 73 | + labels = [] |
| 74 | + for tag in tags: |
| 75 | + labels.append(opencti_api_client.label.create(value=tag)) |
80 | 76 |
|
| 77 | + # Get indicators |
| 78 | + custom_attributes = """ |
| 79 | + id |
| 80 | + created_at |
| 81 | + """ |
81 | 82 |
|
82 |
| -data = {"pagination": {"hasNextPage": True, "endCursor": None}} |
83 |
| -while data["pagination"]["hasNextPage"]: |
84 |
| - after = data["pagination"]["endCursor"] |
85 |
| - data = opencti_api_client.indicator.list( |
86 |
| - first=50, |
87 |
| - after=after, |
88 |
| - customAttributes=custom_attributes, |
89 |
| - filters=[ |
90 |
| - {"key": "indicates", "values": [threat["id"]]}, |
91 |
| - {"key": "created_at", "values": [created_after], "operator": "gt"}, |
92 |
| - {"key": "created_at", "values": [created_before], "operator": "lt"}, |
93 |
| - ], |
94 |
| - orderBy="created_at", |
95 |
| - orderMode="asc", |
96 |
| - withPagination=True, |
97 |
| - ) |
98 |
| - for indicator in data["entities"]: |
99 |
| - print("[" + indicator["created_at"] + "] " + indicator["id"]) |
100 |
| - if operation == "add": |
101 |
| - for label in labels: |
102 |
| - opencti_api_client.stix_domain_object.add_label( |
103 |
| - id=indicator["id"], label_id=label["id"] |
104 |
| - ) |
105 |
| - elif operation == "remove": |
106 |
| - for label in labels: |
107 |
| - opencti_api_client.stix_domain_object.remove_label( |
108 |
| - id=indicator["id"], label_id=label["id"] |
109 |
| - ) |
| 83 | + data = {"pagination": {"hasNextPage": True, "endCursor": None}} |
| 84 | + while data["pagination"]["hasNextPage"]: |
| 85 | + after = data["pagination"]["endCursor"] |
| 86 | + data = opencti_api_client.indicator.list( |
| 87 | + first=50, |
| 88 | + after=after, |
| 89 | + customAttributes=custom_attributes, |
| 90 | + filters=[ |
| 91 | + {"key": "indicates", "values": [threat["id"]]}, |
| 92 | + {"key": "created_at", "values": [created_after], "operator": "gt"}, |
| 93 | + {"key": "created_at", "values": [created_before], "operator": "lt"}, |
| 94 | + ], |
| 95 | + orderBy="created_at", |
| 96 | + orderMode="asc", |
| 97 | + withPagination=True, |
| 98 | + ) |
| 99 | + for indicator in data["entities"]: |
| 100 | + print("[" + indicator["created_at"] + "] " + indicator["id"]) |
| 101 | + if operation == "add": |
| 102 | + for label in labels: |
| 103 | + opencti_api_client.stix_domain_object.add_label( |
| 104 | + id=indicator["id"], label_id=label["id"] |
| 105 | + ) |
| 106 | + elif operation == "remove": |
| 107 | + for label in labels: |
| 108 | + opencti_api_client.stix_domain_object.remove_label( |
| 109 | + id=indicator["id"], label_id=label["id"] |
| 110 | + ) |
| 111 | + |
| 112 | + |
| 113 | +if __name__ == "__main__": |
| 114 | + main() |
0 commit comments