forked from OpenCTI-Platform/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopencti_api_connector.py
119 lines (104 loc) · 3.48 KB
/
opencti_api_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import json
from typing import Any, Dict
from pycti.api import LOGGER
from pycti.connector.opencti_connector import OpenCTIConnector
class OpenCTIApiConnector:
"""OpenCTIApiConnector"""
def __init__(self, api):
self.api = api
def list(self) -> Dict:
"""list available connectors
:return: return dict with connectors
:rtype: dict
"""
LOGGER.info("Getting connectors ...")
query = """
query GetConnectors {
connectorsForWorker {
id
name
config {
connection {
host
vhost
use_ssl
port
user
pass
}
listen
push
}
}
}
"""
result = self.api.query(query)
return result["data"]["connectorsForWorker"]
def ping(self, connector_id: str, connector_state: Any) -> Dict:
"""pings a connector by id and state
:param connector_id: the connectors id
:type connector_id: str
:param connector_state: state for the connector
:type connector_state:
:return: the response pingConnector data dict
:rtype: dict
"""
query = """
mutation PingConnector($id: ID!, $state: String) {
pingConnector(id: $id, state: $state) {
id
connector_state
}
}
"""
result = self.api.query(
query, {"id": connector_id, "state": json.dumps(connector_state)}
)
return result["data"]["pingConnector"]
def register(self, connector: OpenCTIConnector) -> Dict:
"""register a connector with OpenCTI
:param connector: `OpenCTIConnector` connector object
:type connector: OpenCTIConnector
:return: the response registerConnector data dict
:rtype: dict
"""
query = """
mutation RegisterConnector($input: RegisterConnectorInput) {
registerConnector(input: $input) {
id
connector_state
config {
connection {
host
vhost
use_ssl
port
user
pass
}
listen
listen_routing
listen_exchange
push
push_routing
push_exchange
}
connector_user_id
}
}
"""
result = self.api.query(query, connector.to_input())
return result["data"]["registerConnector"]
def unregister(self, _id: str) -> Dict:
"""unregister a connector with OpenCTI
:param _id: `OpenCTIConnector` connector id
:type _id: string
:return: the response registerConnector data dict
:rtype: dict
"""
query = """
mutation ConnectorDeletionMutation($id: ID!) {
deleteConnector(id: $id)
}
"""
return self.api.query(query, {"id": _id})