import datetime import json import uuid from dateutil.parser import parse from stix2.canonicalization.Canonicalize import canonicalize class Task: def __init__(self, opencti): self.opencti = opencti self.properties = """ id standard_id entity_type parent_types spec_version created_at updated_at createdBy { ... on Identity { id standard_id entity_type parent_types spec_version identity_class name description roles contact_information x_opencti_aliases created modified objectLabel { id value color } } ... on Organization { x_opencti_organization_type x_opencti_reliability } ... on Individual { x_opencti_firstname x_opencti_lastname } } objectMarking { id standard_id entity_type definition_type definition created modified x_opencti_order x_opencti_color } objectLabel { id value color } externalReferences { edges { node { id standard_id entity_type source_name description url hash external_id created modified importFiles { edges { node { id name size metaData { mimetype version } } } } } } } revoked confidence created modified name description due_date objects { edges { node { ... on BasicObject { id entity_type parent_types } ... on BasicRelationship { id entity_type parent_types } ... on StixObject { standard_id spec_version created_at updated_at } ... on AttackPattern { name } ... on Campaign { name } ... on CourseOfAction { name } ... on Individual { name } ... on Organization { name } ... on Sector { name } ... on System { name } ... on Indicator { name } ... on Infrastructure { name } ... on IntrusionSet { name } ... on Position { name } ... on City { name } ... on Country { name } ... on Region { name } ... on Malware { name } ... on ThreatActor { name } ... on Tool { name } ... on Vulnerability { name } ... on Incident { name } ... on Event { name } ... on Channel { name } ... on Narrative { name } ... on Language { name } ... on DataComponent { name } ... on DataSource { name } ... on StixCoreRelationship { standard_id spec_version created_at updated_at relationship_type } ... on StixSightingRelationship { standard_id spec_version created_at updated_at } } } } importFiles { edges { node { id name size metaData { mimetype version } } } } """ @staticmethod def generate_id(name, created): name = name.lower().strip() if isinstance(created, datetime.datetime): created = created.isoformat() data = {"name": name, "created": created} data = canonicalize(data, utf8=False) id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) return "task--" + id """ List Task objects :param filters: the filters to apply :param search: the search keyword :param first: return the first n rows from the after ID (or the beginning if not set) :param after: ID of the first row for pagination :return List of Task objects """ def list(self, **kwargs): filters = kwargs.get("filters", None) search = kwargs.get("search", None) first = kwargs.get("first", 500) after = kwargs.get("after", None) order_by = kwargs.get("orderBy", None) order_mode = kwargs.get("orderMode", None) custom_attributes = kwargs.get("customAttributes", None) get_all = kwargs.get("getAll", False) with_pagination = kwargs.get("withPagination", False) if get_all: first = 500 self.opencti.app_logger.info( "Listing Tasks with filters", {"filters": json.dumps(filters)} ) query = ( """ query tasks($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: TasksOrdering, $orderMode: OrderingMode) { tasks(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { edges { node { """ + (custom_attributes if custom_attributes is not None else self.properties) + """ } } pageInfo { startCursor endCursor hasNextPage hasPreviousPage globalCount } } } """ ) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) if get_all: final_data = [] data = self.opencti.process_multiple(result["data"]["tasks"]) final_data = final_data + data while result["data"]["tasks"]["pageInfo"]["hasNextPage"]: after = result["date"]["tasks"]["pageInfo"]["endCursor"] self.opencti.app_logger.info("Listing Tasks", {"after": after}) result = self.opencti.query( query, { "filters": filters, "search": search, "first": first, "after": after, "orderBy": order_by, "orderMode": order_mode, }, ) data = self.opencti.process_multiple(result["data"]["tasks"]) final_data = final_data + data return final_data else: return self.opencti.process_multiple( result["data"]["tasks"], with_pagination ) """ Read a Task object :param id: the id of the Task :param filters: the filters to apply if no id provided :return Task object """ def read(self, **kwargs): id = kwargs.get("id", None) filters = kwargs.get("filters", None) custom_attributes = kwargs.get("customAttributes", None) if id is not None: self.opencti.app_logger.info("Reading Task", {"id": id}) query = ( """ query task($id: String!) { task(id: $id) { """ + ( custom_attributes if custom_attributes is not None else self.properties ) + """ } } """ ) result = self.opencti.query(query, {"id": id}) return self.opencti.process_multiple_fields(result["data"]["task"]) elif filters is not None: result = self.list(filters=filters) if len(result) > 0: return result[0] else: return None """ Read a Task object by stix_id or name :param type: the Stix-Domain-Entity type :param stix_id: the STIX ID of the Stix-Domain-Entity :param name: the name of the Stix-Domain-Entity :return Stix-Domain-Entity object """ def get_by_stix_id_or_name(self, **kwargs): stix_id = kwargs.get("stix_id", None) name = kwargs.get("name", None) created = kwargs.get("created", None) custom_attributes = kwargs.get("customAttributes", None) object_result = None if stix_id is not None: object_result = self.read(id=stix_id, customAttributes=custom_attributes) if object_result is None and name is not None and created is not None: created_final = parse(created).strftime("%Y-%m-%d") object_result = self.read( filters={ "mode": "and", "filters": [ {"key": "name", "values": [name]}, {"key": "created_day", "values": [created_final]}, ], "filterGroups": [], }, customAttributes=custom_attributes, ) return object_result """ Check if a task already contains a thing (Stix Object or Stix Relationship) :param id: the id of the Task :param stixObjectOrStixRelationshipId: the id of the Stix-Entity :return Boolean """ def contains_stix_object_or_stix_relationship(self, **kwargs): id = kwargs.get("id", None) stix_object_or_stix_relationship_id = kwargs.get( "stixObjectOrStixRelationshipId", None ) if id is not None and stix_object_or_stix_relationship_id is not None: self.opencti.app_logger.info( "Checking StixObjectOrStixRelationship in Task", { "stix_object_or_stix_relationship_id": stix_object_or_stix_relationship_id, "id": id, }, ) query = """ query taskContainsStixObjectOrStixRelationship($id: String!, $stixObjectOrStixRelationshipId: String!) { taskContainsStixObjectOrStixRelationship(id: $id, stixObjectOrStixRelationshipId: $stixObjectOrStixRelationshipId) } """ result = self.opencti.query( query, { "id": id, "stixObjectOrStixRelationshipId": stix_object_or_stix_relationship_id, }, ) return result["data"]["taskContainsStixObjectOrStixRelationship"] else: self.opencti.app_logger.error( "[opencti_Task] Missing parameters: id or stixObjectOrStixRelationshipId" ) """ Create a Task object :param name: the name of the Task :return Task object """ def create(self, **kwargs): objects = kwargs.get("objects", None) created = kwargs.get("created", None) name = kwargs.get("name", None) description = kwargs.get("description", None) due_date = kwargs.get("due_date", None) created_by = kwargs.get("createdBy", None) object_marking = kwargs.get("objectMarking", None) object_label = kwargs.get("objectLabel", None) object_assignee = kwargs.get("objectAssignee", None) granted_refs = kwargs.get("objectOrganization", None) x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None) update = kwargs.get("update", False) if name is not None: self.opencti.app_logger.info("Creating Task", {"name": name}) query = """ mutation TaskAdd($input: TaskAddInput!) { taskAdd(input: $input) { id standard_id entity_type parent_types } } """ result = self.opencti.query( query, { "input": { "created": created, "name": name, "description": description, "due_date": due_date, "objects": objects, "createdBy": created_by, "objectLabel": object_label, "objectMarking": object_marking, "objectOrganization": granted_refs, "objectAssignee": object_assignee, "x_opencti_workflow_id": x_opencti_workflow_id, "update": update, } }, ) return self.opencti.process_multiple_fields(result["data"]["taskAdd"]) else: self.opencti.app_logger.error("[opencti_task] Missing parameters: name") def update_field(self, **kwargs): self.opencti.app_logger.info("Updating Task", {"data": json.dumps(kwargs)}) id = kwargs.get("id", None) input = kwargs.get("input", None) if id is not None and input is not None: query = """ mutation TaskEdit($id: ID!, $input: [EditInput!]!) { taskFieldPatch(id: $id, input: $input) { id standard_id entity_type } } """ result = self.opencti.query(query, {"id": id, "input": input}) return self.opencti.process_multiple_fields( result["data"]["taskFieldPatch"] ) else: self.opencti.app_logger.error( "[opencti_Task] Missing parameters: id and key and value" ) return None """ Add a Stix-Entity object to Task object (object_refs) :param id: the id of the Task :param stixObjectOrStixRelationshipId: the id of the Stix-Entity :return Boolean """ def add_stix_object_or_stix_relationship(self, **kwargs): id = kwargs.get("id", None) stix_object_or_stix_relationship_id = kwargs.get( "stixObjectOrStixRelationshipId", None ) if id is not None and stix_object_or_stix_relationship_id is not None: self.opencti.app_logger.info( "Adding StixObjectOrStixRelationship in Task", { "stix_object_or_stix_relationship_id": stix_object_or_stix_relationship_id, "id": id, }, ) query = """ mutation taskEditRelationAdd($id: ID!, $input: StixMetaRelationshipAddInput) { taskRelationAdd(id: $id, input: $input) { id } } """ self.opencti.query( query, { "id": id, "input": { "toId": stix_object_or_stix_relationship_id, "relationship_type": "object", }, }, ) return True else: self.opencti.app_logger.error( "[opencti_task] Missing parameters: id and stixObjectOrStixRelationshipId", ) return False """ Remove a Stix-Entity object to Task object (object_refs) :param id: the id of the Task :param stixObjectOrStixRelationshipId: the id of the Stix-Entity :return Boolean """ def remove_stix_object_or_stix_relationship(self, **kwargs): id = kwargs.get("id", None) stix_object_or_stix_relationship_id = kwargs.get( "stixObjectOrStixRelationshipId", None ) if id is not None and stix_object_or_stix_relationship_id is not None: self.opencti.app_logger.info( "Removing StixObjectOrStixRelationship in Task", { "stix_object_or_stix_relationship_id": stix_object_or_stix_relationship_id, "id": id, }, ) query = """ mutation taskEditRelationDelete($id: ID!, $toId: StixRef!, $relationship_type: String!) { taskRelationAdd(id: $id, toId: $toId, relationship_type: $relationship_type) { id } } """ self.opencti.query( query, { "id": id, "toId": stix_object_or_stix_relationship_id, "relationship_type": "object", }, ) return True else: self.opencti.app_logger.error( "[opencti_task] Missing parameters: id and stixObjectOrStixRelationshipId", ) return False """ Import a Task object from a STIX2 object :param stixObject: the Stix-Object Task :return Task object """ def import_from_stix2(self, **kwargs): stix_object = kwargs.get("stixObject", None) extras = kwargs.get("extras", {}) update = kwargs.get("update", False) if stix_object is not None: # Search in extensions if "x_opencti_stix_ids" not in stix_object: stix_object["x_opencti_stix_ids"] = ( self.opencti.get_attribute_in_extension("stix_ids", stix_object) ) if "x_opencti_granted_refs" not in stix_object: stix_object["x_opencti_granted_refs"] = ( self.opencti.get_attribute_in_extension("granted_refs", stix_object) ) if "x_opencti_workflow_id" not in stix_object: stix_object["x_opencti_workflow_id"] = ( self.opencti.get_attribute_in_extension("workflow_id", stix_object) ) if "x_opencti_assignee_ids" not in stix_object: stix_object["x_opencti_assignee_ids"] = ( self.opencti.get_attribute_in_extension("assignee_ids", stix_object) ) return self.create( stix_id=stix_object["id"], createdBy=( extras["created_by_id"] if "created_by_id" in extras else None ), objectMarking=( extras["object_marking_ids"] if "object_marking_ids" in extras else None ), objectLabel=( extras["object_label_ids"] if "object_label_ids" in extras else None ), objects=extras["object_ids"] if "object_ids" in extras else [], created=stix_object["created"] if "created" in stix_object else None, name=stix_object["name"], description=( self.opencti.stix2.convert_markdown(stix_object["description"]) if "description" in stix_object else None ), due_date=stix_object["due_date"] if "due_date" in stix_object else None, objectOrganization=( stix_object["x_opencti_granted_refs"] if "x_opencti_granted_refs" in stix_object else None ), objectAssignee=( stix_object["x_opencti_assignee_ids"] if "x_opencti_assignee_ids" in stix_object else None ), x_opencti_workflow_id=( stix_object["x_opencti_workflow_id"] if "x_opencti_workflow_id" in stix_object else None ), update=update, ) else: self.opencti.app_logger.error( "[opencti_task] Missing parameters: stixObject" ) def delete(self, **kwargs): id = kwargs.get("id", None) if id is not None: self.opencti.app_logger.info("Deleting Task", {"id": id}) query = """ mutation TaskDelete($id: ID!) { taskDelete(id: $id) } """ self.opencti.query(query, {"id": id}) else: self.opencti.app_logger.error("[opencti_task] Missing parameters: id") return None