Skip to content

Commit c77860f

Browse files
authored
[client] Multiple additions [entities, connectors, tests] (OpenCTI-Platform#199)
* [tests] Added first connector tests * [connector] Add shutdown function for ping * [tests] Fix format * [connector] Added connector unregister function * [connector] Documentation adaption * [entities] Added delete functions for AttackPattern, KillChainPhase, Label and MarkingDefinition [tests] Excluded delete exceptions for the CRUD tests * [tests] migrated all entity CRUD tests to pytest-cases * Merged changes from OpenCTI-Platform#197 * [tests/modules] Moved all test case definitions to modules. Renamed modules.py to entities.py * [tests] Removed all for loops from the test cases. Test cases which contained data lists are split up into multiple single test cases * [tests] Moved all test case definitions to modules (the simple connector will be moved in the future) * [tests] SimpleConnector and ExternalImport connector tests and adaptions * [entity] Add ask_for_enrichment for SCO * [connector] save work before branch swi * [tests] Migrated setup and teardown to fixtures * [tests] Add switch to enable connector tests * [tests] Improved connector test cases * [connector] Improve exit function * [entites] Format adaptions * [test] Implemented filter tests * [entity] Removed ask_for_enrichment from SDO * [tests] External Connector Test added work_id generation * [tests] Refactoring to properly wait for task to finish. Also work logs cleanup after each test run * [tests] Added works cleanup function. Additionally small refactoring * [tests] format fix
1 parent f75785e commit c77860f

20 files changed

+1882
-955
lines changed

pycti/api/opencti_api_connector.py

+15
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,18 @@ def register(self, connector: OpenCTIConnector) -> Dict:
100100
"""
101101
result = self.api.query(query, connector.to_input())
102102
return result["data"]["registerConnector"]
103+
104+
def unregister(self, _id: str) -> Dict:
105+
"""unregister a connector with OpenCTI
106+
107+
:param _id: `OpenCTIConnector` connector id
108+
:type _id: string
109+
:return: the response registerConnector data dict
110+
:rtype: dict
111+
"""
112+
query = """
113+
mutation ConnectorDeletionMutation($id: ID!) {
114+
deleteConnector(id: $id)
115+
}
116+
"""
117+
return self.api.query(query, {"id": _id})

pycti/connector/opencti_connector_helper.py

+34-10
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ def __init__(self, helper, config: Dict, callback) -> None:
106106
self.user = config["connection"]["user"]
107107
self.password = config["connection"]["pass"]
108108
self.queue_name = config["listen"]
109+
self.exit_event = threading.Event()
110+
self.thread = None
109111

110112
# noinspection PyUnusedLocal
111113
def _process_message(self, channel, method, properties, body) -> None:
@@ -122,9 +124,9 @@ def _process_message(self, channel, method, properties, body) -> None:
122124
"""
123125

124126
json_data = json.loads(body)
125-
thread = threading.Thread(target=self._data_handler, args=[json_data])
126-
thread.start()
127-
while thread.is_alive(): # Loop while the thread is processing
127+
self.thread = threading.Thread(target=self._data_handler, args=[json_data])
128+
self.thread.start()
129+
while self.thread.is_alive(): # Loop while the thread is processing
128130
assert self.pika_connection is not None
129131
self.pika_connection.sleep(1.0)
130132
logging.info(
@@ -159,7 +161,7 @@ def _data_handler(self, json_data) -> None:
159161
logging.error("Failing reporting the processing")
160162

161163
def run(self) -> None:
162-
while True:
164+
while not self.exit_event.is_set():
163165
try:
164166
# Connect the broker
165167
self.pika_credentials = pika.PlainCredentials(self.user, self.password)
@@ -186,6 +188,11 @@ def run(self) -> None:
186188
self.helper.log_error(str(e))
187189
time.sleep(10)
188190

191+
def stop(self):
192+
self.exit_event.set()
193+
if self.thread:
194+
self.thread.join()
195+
189196

190197
class PingAlive(threading.Thread):
191198
def __init__(self, connector_id, api, get_state, set_state) -> None:
@@ -195,9 +202,10 @@ def __init__(self, connector_id, api, get_state, set_state) -> None:
195202
self.api = api
196203
self.get_state = get_state
197204
self.set_state = set_state
205+
self.exit_event = threading.Event()
198206

199207
def ping(self) -> None:
200-
while True:
208+
while not self.exit_event.is_set():
201209
try:
202210
initial_state = self.get_state()
203211
result = self.api.connector.ping(self.connector_id, initial_state)
@@ -221,12 +229,16 @@ def ping(self) -> None:
221229
except Exception: # pylint: disable=broad-except
222230
self.in_error = True
223231
logging.error("Error pinging the API")
224-
time.sleep(40)
232+
self.exit_event.wait(40)
225233

226234
def run(self) -> None:
227235
logging.info("Starting ping alive thread")
228236
self.ping()
229237

238+
def stop(self) -> None:
239+
logging.info("Preparing for clean shutdown")
240+
self.exit_event.set()
241+
230242

231243
class ListenStream(threading.Thread):
232244
def __init__(
@@ -239,6 +251,7 @@ def __init__(
239251
self.token = token
240252
self.verify_ssl = verify_ssl
241253
self.start_timestamp = start_timestamp
254+
self.exit_event = threading.Event()
242255

243256
def run(self) -> None: # pylint: disable=too-many-branches
244257
current_state = self.helper.get_state()
@@ -432,6 +445,17 @@ def __init__(self, config: Dict) -> None:
432445
)
433446
self.ping.start()
434447

448+
# self.listen_stream = None
449+
self.listen_queue = None
450+
451+
def stop(self) -> None:
452+
if self.listen_queue:
453+
self.listen_queue.stop()
454+
# if self.listen_stream:
455+
# self.listen_stream.stop()
456+
self.ping.stop()
457+
self.api.connector.unregister(self.connector_id)
458+
435459
def get_name(self) -> Optional[Union[bool, int, str]]:
436460
return self.connect_name
437461

@@ -470,8 +494,8 @@ def listen(self, message_callback: Callable[[Dict], str]) -> None:
470494
:type message_callback: Callable[[Dict], str]
471495
"""
472496

473-
listen_queue = ListenQueue(self, self.config, message_callback)
474-
listen_queue.start()
497+
self.listen_queue = ListenQueue(self, self.config, message_callback)
498+
self.listen_queue.start()
475499

476500
def listen_stream(
477501
self,
@@ -486,10 +510,10 @@ def listen_stream(
486510
:param message_callback: callback function to process messages
487511
"""
488512

489-
listen_stream = ListenStream(
513+
self.listen_stream = ListenStream(
490514
self, message_callback, url, token, verify_ssl, start_timestamp
491515
)
492-
listen_stream.start()
516+
self.listen_stream.start()
493517

494518
def get_opencti_url(self) -> Optional[Union[bool, int, str]]:
495519
return self.opencti_url

pycti/entities/opencti_attack_pattern.py

+16
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,19 @@ def import_from_stix2(self, **kwargs):
406406
self.opencti.log(
407407
"error", "[opencti_attack_pattern] Missing parameters: stixObject"
408408
)
409+
410+
def delete(self, **kwargs):
411+
id = kwargs.get("id", None)
412+
if id is not None:
413+
self.opencti.log("info", "Deleting Attack Pattern {" + id + "}.")
414+
query = """
415+
mutation AttackPatternEdit($id: ID!) {
416+
attackPatternEdit(id: $id) {
417+
delete
418+
}
419+
}
420+
"""
421+
self.opencti.query(query, {"id": id})
422+
else:
423+
self.opencti.log("error", "[attack_pattern] Missing parameters: id")
424+
return None

pycti/entities/opencti_kill_chain_phase.py

+18
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,21 @@ def create(self, **kwargs):
168168
"error",
169169
"[opencti_kill_chain_phase] Missing parameters: kill_chain_name and phase_name",
170170
)
171+
172+
def delete(self, **kwargs):
173+
id = kwargs.get("id", None)
174+
if id is not None:
175+
self.opencti.log("info", "Deleting Kill-Chain-Phase {" + id + "}.")
176+
query = """
177+
mutation KillChainPhaseEdit($id: ID!) {
178+
killChainPhaseEdit(id: $id) {
179+
delete
180+
}
181+
}
182+
"""
183+
self.opencti.query(query, {"id": id})
184+
else:
185+
self.opencti.log(
186+
"error", "[opencti_kill_chain_phase] Missing parameters: id"
187+
)
188+
return None

pycti/entities/opencti_label.py

+16
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,19 @@ def create(self, **kwargs):
150150
"error",
151151
"[opencti_label] Missing parameters: value",
152152
)
153+
154+
def delete(self, **kwargs):
155+
id = kwargs.get("id", None)
156+
if id is not None:
157+
self.opencti.log("info", "Deleting Label {" + id + "}.")
158+
query = """
159+
mutation LabelEdit($id: ID!) {
160+
labelEdit(id: $id) {
161+
delete
162+
}
163+
}
164+
"""
165+
self.opencti.query(query, {"id": id})
166+
else:
167+
self.opencti.log("error", "[opencti_label] Missing parameters: id")
168+
return None

pycti/entities/opencti_marking_definition.py

+18
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,21 @@ def import_from_stix2(self, **kwargs):
219219
self.opencti.log(
220220
"error", "[opencti_marking_definition] Missing parameters: stixObject"
221221
)
222+
223+
def delete(self, **kwargs):
224+
id = kwargs.get("id", None)
225+
if id is not None:
226+
self.opencti.log("info", "Deleting Marking-Definition {" + id + "}.")
227+
query = """
228+
mutation MarkingDefinitionEdit($id: ID!) {
229+
markingDefinitionEdit(id: $id) {
230+
delete
231+
}
232+
}
233+
"""
234+
self.opencti.query(query, {"id": id})
235+
else:
236+
self.opencti.log(
237+
"error", "[opencti_marking_definition] Missing parameters: id"
238+
)
239+
return None

pycti/entities/opencti_observed_data.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ def create(self, **kwargs):
387387
else:
388388
self.opencti.log(
389389
"error",
390-
"[opencti_observedData] Missing parameters: first_observed or last_observed",
390+
"[opencti_observedData] Missing parameters: first_observed, last_observed or objects",
391391
)
392392

393393
"""

pycti/entities/opencti_stix_cyber_observable.py

+36-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import json
44
import os
5+
56
import magic
67

78

@@ -320,11 +321,11 @@ def list(self, **kwargs):
320321
)
321322
query = (
322323
"""
323-
query StixCyberObservables($types: [String], $filters: [StixCyberObservablesFiltering], $search: String, $first: Int, $after: ID, $orderBy: StixCyberObservablesOrdering, $orderMode: OrderingMode) {
324-
stixCyberObservables(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
325-
edges {
326-
node {
327-
"""
324+
query StixCyberObservables($types: [String], $filters: [StixCyberObservablesFiltering], $search: String, $first: Int, $after: ID, $orderBy: StixCyberObservablesOrdering, $orderMode: OrderingMode) {
325+
stixCyberObservables(types: $types, filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
326+
edges {
327+
node {
328+
"""
328329
+ (custom_attributes if custom_attributes is not None else self.properties)
329330
+ """
330331
}
@@ -398,9 +399,9 @@ def read(self, **kwargs):
398399
self.opencti.log("info", "Reading StixCyberObservable {" + id + "}.")
399400
query = (
400401
"""
401-
query StixCyberObservable($id: String!) {
402-
stixCyberObservable(id: $id) {
403-
"""
402+
query StixCyberObservable($id: String!) {
403+
stixCyberObservable(id: $id) {
404+
"""
404405
+ (
405406
custom_attributes
406407
if custom_attributes is not None
@@ -1736,3 +1737,30 @@ def push_list_export(self, file_name, data, list_filters=""):
17361737
"listFilters": list_filters,
17371738
},
17381739
)
1740+
1741+
def ask_for_enrichment(self, **kwargs):
1742+
id = kwargs.get("id", None)
1743+
connector_id = kwargs.get("connector_id", None)
1744+
1745+
if id is None or connector_id is None:
1746+
self.opencti.log("error", "Missing parameters: id and connector_id")
1747+
return False
1748+
1749+
query = """
1750+
mutation StixCoreObjectEnrichmentLinesMutation($id: ID!, $connectorId: ID!) {
1751+
stixCoreObjectEdit(id: $id) {
1752+
askEnrichment(connectorId: $connectorId) {
1753+
id
1754+
}
1755+
}
1756+
}
1757+
"""
1758+
1759+
self.opencti.query(
1760+
query,
1761+
{
1762+
"id": id,
1763+
"connectorId": connector_id,
1764+
},
1765+
)
1766+
return True

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ black==20.8b1
1111
python-magic==0.4.22; sys_platform == 'linux' or sys_platform == 'darwin'
1212
python-magic-bin==0.4.14; sys_platform == 'win32'
1313
pytest_randomly==3.8.0
14+
pytest-cases==3.6.3

0 commit comments

Comments
 (0)