forked from OpenCTI-Platform/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload_artifacts.py
87 lines (68 loc) · 2.4 KB
/
upload_artifacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import argparse
import os
import magic
from pycti import OpenCTIApiClient
api_url = "http://opencti:4000"
api_token = "bfa014e0-e02e-4aa6-a42b-603b19dcf159"
# OpenCTI instantiation
OPENCTI_API_CLIENT = OpenCTIApiClient(api_url, api_token)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-f", "--file", required=True, help="The path of the Artifact(s) to upload."
)
parser.add_argument(
"-d", "--description", default="", help="The description for the Artifact."
)
parser.add_argument(
"-l", "--label", default="", help="Comma separated labels for the Artifact."
)
parser.add_argument(
"-r",
"--related",
default=None,
help="Standard id of an object related to the Artifact.",
)
args = parser.parse_args()
if os.path.isdir(args.file):
for currentpath, folders, files in os.walk(args.file):
for filep in files:
upload(
os.path.join(currentpath, filep),
args.description,
args.label,
args.related,
)
else:
upload(args.file, args.description, args.label, args.related)
def upload(file_path, description, labels, related_standard_id):
file_data = b""
with open(file_path, "rb") as f:
file_data = f.read()
mime_type = magic.from_buffer(file_data, mime=True)
# Upload the file, returns the query response for the file upload
kwargs = {
"file_name": os.path.basename(file_path),
"data": file_data,
"mime_type": mime_type,
"x_opencti_description": "",
}
if description:
kwargs["x_opencti_description"] = description
response = OPENCTI_API_CLIENT.stix_cyber_observable.upload_artifact(**kwargs)
print(response)
for label_str in labels.split(","):
if label_str:
label = OPENCTI_API_CLIENT.label.create(value=label_str)
OPENCTI_API_CLIENT.stix_cyber_observable.add_label(
id=response["id"], label_id=label["id"]
)
if related_standard_id:
OPENCTI_API_CLIENT.stix_core_relationship.create(
fromId=related_standard_id,
toId=response["standard_id"],
relationship_type="related-to",
description=f"Related to {related_standard_id}",
)
if __name__ == "__main__":
main()