3
3
import functools
4
4
import json
5
5
import re
6
- from typing import Any , Callable , Iterable
6
+ from typing import Any , Callable
7
7
8
+ # , Iterable
8
9
from aws_lambda_powertools .utilities .data_masking .constants import DATA_MASKING_STRING
9
10
10
11
PRESERVE_CHARS = set ("-_. " )
@@ -69,14 +70,14 @@ def decrypt(self, data, provider_options: dict | None = None, **encryption_conte
69
70
70
71
def erase (
71
72
self ,
72
- data ,
73
+ data : Any ,
73
74
dynamic_mask : bool | None = None ,
74
75
custom_mask : str | None = None ,
75
76
regex_pattern : str | None = None ,
76
77
mask_format : str | None = None ,
77
78
masking_rules : dict | None = None ,
78
79
** kwargs ,
79
- ) -> Iterable [ str ] :
80
+ ) -> str | dict | list | tuple | set :
80
81
"""
81
82
This method irreversibly erases data.
82
83
@@ -85,47 +86,68 @@ def erase(
85
86
86
87
If the data to be erased is of an iterable type like `list`, `tuple`,
87
88
or `set`, this method will return a new object of the same type as the
88
- input data but with each element replaced by the string "*****" or following one of the custom masks .
89
+ input data but with each element masked according to the specified rules .
89
90
"""
90
- result = DATA_MASKING_STRING
91
-
92
- if data :
93
- if isinstance (data , str ):
94
- if dynamic_mask :
95
- result = self ._custom_erase (data , ** kwargs )
96
- if custom_mask :
97
- result = self ._pattern_mask (data , custom_mask )
98
- if regex_pattern and mask_format :
99
- result = self ._regex_mask (data , regex_pattern , mask_format )
100
- elif isinstance (data , dict ):
101
- if masking_rules :
102
- result = self ._apply_masking_rules (data , masking_rules )
103
- elif isinstance (data , (list , tuple , set )):
104
- result = type (data )(
105
- self .erase (
106
- item ,
107
- dynamic_mask = dynamic_mask ,
108
- custom_mask = custom_mask ,
109
- regex_pattern = regex_pattern ,
110
- mask_format = mask_format ,
111
- masking_rules = masking_rules ,
112
- ** kwargs ,
113
- )
114
- for item in data
91
+ result = None
92
+
93
+ # Handle empty or None data
94
+ if not data :
95
+ result = DATA_MASKING_STRING if isinstance (data , (str , bytes )) else data
96
+
97
+ # Handle string data
98
+ elif isinstance (data , str ):
99
+ if regex_pattern and mask_format :
100
+ result = self ._regex_mask (data , regex_pattern , mask_format )
101
+ elif custom_mask :
102
+ result = self ._pattern_mask (data , custom_mask )
103
+ elif dynamic_mask :
104
+ result = self ._custom_erase (data , ** kwargs )
105
+ else :
106
+ result = DATA_MASKING_STRING
107
+
108
+ # Handle dictionary data
109
+ elif isinstance (data , dict ):
110
+ result = (
111
+ self ._apply_masking_rules (data , masking_rules )
112
+ if masking_rules
113
+ else {k : DATA_MASKING_STRING for k in data }
114
+ )
115
+
116
+ # Handle iterable data (list, tuple, set)
117
+ elif isinstance (data , (list , tuple , set )):
118
+ masked_data = (
119
+ self .erase (
120
+ item ,
121
+ dynamic_mask = dynamic_mask ,
122
+ custom_mask = custom_mask ,
123
+ regex_pattern = regex_pattern ,
124
+ mask_format = mask_format ,
125
+ masking_rules = masking_rules ,
126
+ ** kwargs ,
115
127
)
128
+ for item in data
129
+ )
130
+ result = type (data )(masked_data )
131
+
132
+ # Default case
133
+ else :
134
+ result = DATA_MASKING_STRING
116
135
117
136
return result
118
137
119
138
def _apply_masking_rules (self , data : dict , masking_rules : dict ) -> dict :
139
+ """Apply masking rules to dictionary data."""
120
140
return {
121
141
key : self .erase (str (value ), ** masking_rules [key ]) if key in masking_rules else str (value )
122
142
for key , value in data .items ()
123
143
}
124
144
125
145
def _pattern_mask (self , data : str , pattern : str ) -> str :
146
+ """Apply pattern masking to string data."""
126
147
return pattern [: len (data )] if len (pattern ) >= len (data ) else pattern
127
148
128
149
def _regex_mask (self , data : str , regex_pattern : str , mask_format : str ) -> str :
150
+ """Apply regex masking to string data."""
129
151
try :
130
152
if regex_pattern not in _regex_cache :
131
153
_regex_cache [regex_pattern ] = re .compile (regex_pattern )
@@ -137,5 +159,4 @@ def _custom_erase(self, data: str, **kwargs) -> str:
137
159
if not data :
138
160
return ""
139
161
140
- # Use join with list comprehension instead of building list incrementally
141
162
return "" .join ("*" if char not in PRESERVE_CHARS else char for char in data )
0 commit comments