Skip to content

Commit ece3780

Browse files
MattAlexMiracleAlexander Mattickyk
authored
Data analysis suite (LAION-AI#2159)
This adds in two scripts to compare rankings and votes. The rankings is relatively simple and computes the expected kendall-tau correlation of a user to the consensus vote. This is mostly for spotting bad actors. Currently it does not find statistically significant outliers that are also bad actors This doesn't mean that individual bad actors don't exist, but it does mean that either 1. there are so many of them, the ranking results are so far beyond repair that the merged rankings themselves make no sense 2. there are so few of them, that they overall don't affect the results sufficiently negatively to be noticable The truth is probably somewhere in the middle: since we only have ~3 rankers per message, if we had one that is consistently screwing with things, he would be hard to detect since by sheer luck he would agree enough with another ranker to overall vanish in the correlations. The vote aggregation script is much more sophisticated and computes optimal weightings for each power-user based on aggregations of non-power users: I now have an initial "smart" selection of messages based on a hierarchical consensus finding scheme: The idea is to split the voters into an "aggregation" and a "re-rating" group: The "aggregation" group is comprised of the set of users with less than X number of ratings. These users get aggregated into one mean (i.e. weighting 1:1) and act as the anchor. Users with more than X ratings are used to compute weightings, specifically we attempt to reweight them such that min ∑ (A u - (y + agg)/2)² subject to ∑ u = 1 with "A" being the matrix of votes done by people in the "re-rating" group and "agg" being the aggregated voter vector. y is simply the current weighting result y = A u_prev. I.e. "y" is the current target, assuming that the current u weightings are correct. After finding "better" uweightings in the least-squares optimization, I recompute the new target y and compute the next u until a fixed point is reached. The end result is a weighting of the "re-rating" group in u and a priority we can read from y. However, the choice between what people we put into the "aggregation" and "re-rating" group is pretty arbitrary based on a split with who has more than X ratings: - if we have low X then we have much much more people which can get re-weighted, but our "agg" anchor might be a lot more inaccurate - if we have high X we have a much more statistically stable "agg" anchor, but less weighting potential. The way I decided to solve this is by simply running many iterations 50≤X≤500 which gives us all (reasonable) configurations, and then giving a confidence score based on how often a message is in the top P% of messages. The method outputs a CSV file with - "message_id" - "message_tree_id" - "fracs" (what fraction of the ensemble the message_id is in) - "in_naive" (whether it would be in the naive "1 user; 1 vote" model) --------- Co-authored-by: Alexander Mattick <alex.mattick@fau.de> Co-authored-by: Yannic Kilcher <yk@users.noreply.github.com>
1 parent 89b59fd commit ece3780

File tree

3 files changed

+476
-6
lines changed

3 files changed

+476
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
import logging
2+
import warnings
3+
4+
import numpy as np
5+
import pandas as pd
6+
import psycopg2
7+
from scipy.optimize import LinearConstraint, minimize
8+
from scipy.sparse import coo_array, csr_array, csr_matrix, hstack
9+
from scipy.special import softmax
10+
from tqdm import trange
11+
from tqdm.contrib.logging import logging_redirect_tqdm
12+
13+
14+
def least_squares_fit(features, target, scaling=1):
15+
X = features # (features - np.mean(features, 0)) / np.std(features, 0)
16+
# print("feature",X.shape)
17+
# get target
18+
y = target.reshape(-1)
19+
# Use simple imputer for mean to not change the importance of tree split
20+
# Create an instance of the ExtraTreesRegressor algorithm
21+
zX = X.toarray()
22+
summed_target = y # (y+zX[:,-1])/2
23+
vote_matrix = csr_matrix(zX[:, :-1])
24+
constraint = LinearConstraint(np.ones(X.shape[-1] - 1), 1 * scaling, 1 * scaling)
25+
init = np.ones(X.shape[-1] - 1) # lsqr(vote_matrix,summed_target)[0]
26+
init = init / np.linalg.norm(init)
27+
result = minimize(
28+
lambda x: np.sum((vote_matrix @ x - summed_target) ** 2),
29+
init,
30+
jac=lambda x: 2 * vote_matrix.T @ (vote_matrix @ x - summed_target),
31+
constraints=constraint,
32+
hess=lambda _: 2 * vote_matrix.T @ vote_matrix,
33+
method="trust-constr",
34+
)
35+
# result = least_squares(residual, np.ones(X.shape[-1]-1))
36+
# result = least_squares(zX[:,:-1], (y+zX[:,-1])/2,
37+
# print(result)
38+
return np.concatenate([result.x, np.ones(1)])
39+
40+
41+
def get_df(study_label):
42+
conn = psycopg2.connect("host=0.0.0.0 port=5432 user=postgres password=postgres dbname=postgres")
43+
# Define the SQL query
44+
query = (
45+
"SELECT DISTINCT message_id, labels, message.user_id FROM text_labels JOIN message ON message_id = message.id;"
46+
)
47+
48+
# Read the query results into a Pandas dataframe
49+
df = pd.read_sql(query, con=conn)
50+
print(df.head())
51+
# Close the database connection
52+
conn.close()
53+
users = set()
54+
messages = set()
55+
for row in df.itertuples(index=False):
56+
row = row._asdict()
57+
users.add(str(row["user_id"]))
58+
# for row in df.itertuples(index=False):
59+
# row = row._asdict()
60+
messages.add(str(row["message_id"]))
61+
users = list(users)
62+
messages = list(messages)
63+
print("num users:", len(users), "num messages:", len(messages), "num in df", len(df))
64+
65+
# arr = np.full((len(messages), len(users)), np.NaN, dtype=np.half)
66+
row_idx = []
67+
col_idx = []
68+
data = []
69+
70+
def swap(x):
71+
return (x[1], x[0])
72+
73+
dct = dict(map(swap, enumerate(messages)))
74+
print("converting messages...")
75+
df["message_id"] = df["message_id"].map(dct)
76+
print("converting users...")
77+
df["user_id"] = df["user_id"].map(dict(map(swap, enumerate(users))))
78+
print("converting labels...")
79+
df["labels"] = df["labels"].map(lambda x: float(x.get(study_label, 0)))
80+
row_idx = df["message_id"].to_numpy()
81+
col_idx = df["user_id"].to_numpy()
82+
data = df["labels"].to_numpy()
83+
print(data)
84+
print(row_idx)
85+
print(col_idx)
86+
""" for row in df.itertuples(index=False):
87+
row = row._asdict()
88+
labels = row["labels"]
89+
value = labels.get(study_label, None)
90+
if value is not None:
91+
# tmp=out[str(row["message_id"])]
92+
# tmp = np.array(tmp)
93+
# tmp[users.index(row["user_id"])] = value
94+
# out[str(row["message_id"])] = np.array(tmp)
95+
# print(out[str(row["message_id"])].density)
96+
row_idx.append(messages.index(str(row["message_id"])))
97+
col_idx.append(users.index(str(row["user_id"])))
98+
data.append(value)
99+
#arr[mid, uid] = value """
100+
arr = csr_array(coo_array((data, (row_idx, col_idx))))
101+
print("results", len(users), arr.shape)
102+
# df = pd.DataFrame.from_dict(out,orient="index")
103+
print("generated dataframe")
104+
return arr, messages, users
105+
106+
107+
def reweight_features(features, weights, noise_scale=0.0):
108+
# X = df.drop(target_col, axis=1)
109+
# print("info",features.shape,weights.shape)
110+
# X = (features - np.mean(features, 0).reshape(1,-1)) / np.std(features, 0).reshape(1,-1)
111+
noise = np.random.randn(weights.shape[0]) * noise_scale
112+
weights = weights + noise
113+
# normalizer = (X.notna().astype(float) * weights).sum(skipna=True, axis=1)
114+
values = features @ weights
115+
# values = values / normalizer
116+
return values
117+
118+
119+
def get_subframe(arr, columns_to_filter):
120+
# return np.delete(arr, columns_to_filter, axis=1)
121+
"""
122+
Remove the rows denoted by ``indices`` form the CSR sparse matrix ``mat``.
123+
"""
124+
if not isinstance(arr, csr_array):
125+
raise ValueError("works only for CSR format -- use .tocsr() first")
126+
indices = list(columns_to_filter)
127+
mask = np.ones(arr.shape[1], dtype=bool)
128+
mask[indices] = False
129+
return arr[:, mask]
130+
131+
132+
def sample_importance_weights(importance_weights, temperature=1.0):
133+
weights = softmax(
134+
abs(importance_weights) / temperature,
135+
)
136+
column = np.random.choice(len(importance_weights), p=weights)
137+
return column
138+
139+
140+
def make_random_testframe(num_rows, num_cols, frac_missing):
141+
data = np.random.rand(num_rows, num_cols).astype(np.float16)
142+
mask = np.random.rand(num_rows, num_cols) < frac_missing
143+
data[mask] = np.nan
144+
return data
145+
146+
147+
def combine_underrepresented_columns(arr, num_instances):
148+
# 1. get the mask
149+
mask = arr != 0
150+
to_combine = mask.sum(0) < num_instances
151+
# print("to combine", mask.sum(0))
152+
# print("combining", to_combine.astype(int).sum().tolist(), "many columns")
153+
if not any(to_combine):
154+
return arr
155+
# mean = np.zeros(arr.shape[0])
156+
# for i in to_combine.tolist():
157+
# mean = np.nansum(np.stack(arr[:,i],mean),0)
158+
# mean = mean/len(to_combine)
159+
mean = np.mean(arr[:, to_combine], 1).reshape(-1, 1)
160+
# print("mean shape",mean.shape)
161+
dp = np.arange(len(to_combine))[to_combine]
162+
# print("removing unused columns")
163+
arr = get_subframe(arr, dp)
164+
# print("subframe shape",arr.shape)
165+
arr = hstack([arr, mean])
166+
# print("out arr", arr.shape)
167+
# print((mean==0).astype(int).sum())
168+
return arr
169+
170+
171+
def importance_votes(arr, to_fit=10, init_weight=None):
172+
# arr = combine_underrepresented_columns(matrix,underrepresentation_thresh)
173+
filtered_columns = []
174+
weighter = None
175+
if init_weight is None:
176+
weighter = np.ones(arr.shape[1]) / arr.shape[1] # pd.Series(1.0, index=df.drop(columns=target).columns)
177+
else:
178+
weighter = init_weight
179+
# print(arr.shape)
180+
index = np.arange(arr.shape[1])
181+
# subtract 1: the last one will always have maximal reduction!
182+
bar = trange(to_fit)
183+
target = np.ones(arr.shape[0])
184+
for i in bar:
185+
index = list(filter(lambda x: x not in filtered_columns, index))
186+
# 0. produce target column:
187+
# print("step 0")
188+
target_old = target
189+
target = reweight_features(arr, weighter)
190+
error = np.mean((target - target_old) ** 2)
191+
bar.set_description(f"expected error: {error}", refresh=True)
192+
if error < 1e-10:
193+
break
194+
# 1. get a subframe of interesting features
195+
# print("step 1")
196+
# subframe = get_subframe(arr, filtered_columns)
197+
# 2. compute feature importance
198+
# print("step 2")
199+
# importance_weights=None
200+
# importance_weights = compute_feature_importance(arr, target, index)
201+
weighter = least_squares_fit(arr, target)
202+
# 3. sample column
203+
# print("step 3")
204+
# new_column = sample_importance_weights(importance_weights["importance"], temperature)
205+
# new_column=index[new_column]
206+
# value = importance_weights["importance"][new_column]
207+
# print(weighter.shape, importance_weights["importance"].shape)
208+
# weighter += alpha[i] * importance_weights["importance"].to_numpy()
209+
# normalize to maintain the "1-voter one vote" total number of votes!
210+
# weighter = weighter / sum(abs(weighter))
211+
# stepsize = np.mean(abs(importance_weights["importance"].to_numpy()))
212+
# bar.set_description(f"expected stepsize: {stepsize}", refresh=True)
213+
# filtered_columns.append(new_column)
214+
# print("new weight values", weighter)
215+
return reweight_features(arr, weighter), weighter
216+
217+
218+
def select_ids(arr, pick_frac, minima=(50, 500), folds=50, to_fit=200, frac=0.6):
219+
"""
220+
selects the top-"pick_frac"% of messages from "arr" after merging all
221+
users with less than "minima" votes (minima increases linearly with each iteration from min to max).
222+
The method returns all messages that are within `frac` many "minima" selection
223+
"""
224+
votes = []
225+
minima = np.linspace(*minima, num=folds, dtype=int)
226+
num_per_iter = int(arr.shape[0] * pick_frac)
227+
writer_num = 0
228+
tmp = None
229+
for i in trange(folds):
230+
tofit = combine_underrepresented_columns(arr, minima[i])
231+
if tofit.shape[1] == writer_num:
232+
print("already tested these writer counts, skipping and using cached value.....")
233+
votes.append(tmp)
234+
continue
235+
writer_num = tofit.shape[1]
236+
# print("arr shape", arr.shape)
237+
init_weight = np.ones(tofit.shape[1]) / tofit.shape[1]
238+
out, weight = importance_votes(tofit, init_weight=init_weight, to_fit=to_fit)
239+
# print(i, "final weight")
240+
# print(weight)
241+
# mask =(out>thresh)
242+
# out = np.arange(arr.shape[0])[mask]
243+
indices = np.argpartition(out, -num_per_iter)[-num_per_iter:]
244+
tmp = np.zeros((arr.shape[0]))
245+
tmp[indices] = 1
246+
votes.append(tmp)
247+
# votes.append(indices.tolist())
248+
# print(*[f"user_id: {users[idx]} {m}±{s}" for m, s, idx in zip(weights_mean, weights_std, range(len(weights_mean)))], sep="\n")
249+
out = []
250+
votes = np.stack(votes, axis=0)
251+
print("votespace", votes.shape)
252+
votes = np.mean(votes, 0)
253+
for idx, f in enumerate(votes):
254+
if f > frac:
255+
out.append((idx, f))
256+
return out
257+
258+
259+
LOG = logging.getLogger(__name__)
260+
261+
if __name__ == "__main__":
262+
warnings.filterwarnings("ignore", category=FutureWarning)
263+
warnings.filterwarnings("ignore", category=DeprecationWarning)
264+
warnings.simplefilter("ignore")
265+
logging.captureWarnings(True)
266+
logging.basicConfig(level=logging.ERROR)
267+
# Generate some example data
268+
# df = make_random_testframe(100_000,5000,0.99)
269+
df, message_ids, users = get_df("quality")
270+
print("combining columns:")
271+
# df = combine_underrepresented_columns(df, 100)
272+
weights = np.ones(df.shape[-1])
273+
y = reweight_features(df, weights)
274+
num_per_iter = int(df.shape[0] * 0.5)
275+
naive = np.argpartition(y, -num_per_iter)[-num_per_iter:]
276+
277+
print("after preprocessing")
278+
# print(df)
279+
# preproc input
280+
281+
# Compute feature importances
282+
# y = reweight_features(df,np.ones(df.shape[1]))
283+
# importance_weights = compute_feature_importance(df, y, list(range(df.shape[1])))
284+
# Print the importance weights for each feature
285+
# print(importance_weights)
286+
287+
print("STARTING RUN")
288+
289+
# sampled_columns = sample_importance_weights(
290+
# importance_weights["importance"],
291+
# )
292+
# print("sampled column", sampled_columns)
293+
# print("compute importance votes:")
294+
# weighted_votes, weightings = importance_votes(df)
295+
# print(weighted_votes)
296+
# print(weightings)
297+
with logging_redirect_tqdm():
298+
print("selected ids")
299+
ids = select_ids(df, 0.5, folds=500)
300+
301+
# print(res, frac)
302+
conn = psycopg2.connect("host=0.0.0.0 port=5432 user=postgres password=postgres dbname=postgres")
303+
# Define the SQL query
304+
# , payload#>'{payload, text}' as text
305+
query = "SELECT DISTINCT id as message_id, message_tree_id FROM message;"
306+
print("selected", len(ids), "messages")
307+
# Read the query results into a Pandas dataframe
308+
df = pd.read_sql(query, con=conn)
309+
out = []
310+
fracs = []
311+
in_naive = []
312+
for i, frac in ids:
313+
res = message_ids[i]
314+
out.append((df.loc[df["message_id"] == res]))
315+
fracs.append(frac)
316+
in_naive.append(i in naive)
317+
df = pd.concat(out)
318+
df["fracs"] = fracs
319+
df["in_naive"] = in_naive
320+
print(df.shape)
321+
print("differences from naive", len(in_naive) - sum(in_naive))
322+
print(df)
323+
df.to_csv("output.csv")

0 commit comments

Comments
 (0)