forked from LAION-AI/Open-Assistant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_create_convs.py
139 lines (112 loc) · 4.62 KB
/
twitter_create_convs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import json
from pathlib import Path
import polars as pl
from tqdm import tqdm
# Sets up paths
# TODO: Source paths from env file
path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES"
folder_path = Path(path_string)
processed_folder_path = folder_path / "processed"
output_path = folder_path / "twitter-conv-trees.jsonl"
# Get parq files
parq_files = sorted(processed_folder_path.rglob("*.parquet"))
wanted_cols = [
"timestamp_ms",
"id",
"text",
"truncated",
"in_reply_to_status_id",
"in_reply_to_user_id",
"is_quote_status",
"quote_count",
"reply_count",
"retweet_count",
"favorite_count",
"filter_level",
"lang",
"possibly_sensitive",
"hashtags",
"user_id",
"user_verified",
"user_followers_count",
"user_statuses_count",
]
# Load parqs into list. Using Polars for performance reasons.
df_list = []
for p in parq_files:
df_list.append(pl.read_parquet(p, columns=wanted_cols))
# Create major dataframe.
# This can be done incrementally if RAM is constrained by modifying the above code.
p_df = pl.concat(df_list)
# Clean up the reference just in case to help with memory if needed.
del df_list
# Get tweets that are replies to other tweets
p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not())
# Group by replied to status id to see the most replied to statuses. This can take some time.
p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True)
# Save output of grouping the top replied to statuses
group_reply_parq = folder_path / "group_reply_parq.parquet"
p_df_group_reply_to_status.write_parquet(group_reply_parq)
# Join the main dataframe with the top replies to find tweets that have replies.
p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner")
# Save output of tweets that have replies
tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet"
p_join.write_parquet(tweets_that_have_replies_path)
# Save output of tweets that are replies to other tweets
tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet"
p_df_replies_only.write_parquet(tweets_that_are_replies_path)
# Filter the tweets that have replies to ones that aren't replies to others.
# Also filter for only english for now.
# This gives the root tweets that have replies but are the start of a conversation.
origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en"))
# Helper functions and classes below for the next steps
def role_decide(user_id, prompt_user):
if user_id == prompt_user:
return "prompter"
else:
return "assistant"
class ConversationTreeNode:
def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None):
if metadata:
self.metadata = metadata
else:
self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0]
self.metadata["prompt_user"] = prompt_user
self.role = role_decide(self.metadata["user_id"], prompt_user)
self.children = None
self.text = self.metadata["text"]
del self.metadata["text"]
self.get_children(tweet_id=tweet_id, children_df=children_df)
def get_children(self, tweet_id, children_df):
children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts()
if len(children_dicts) > 0:
children = [
ConversationTreeNode(
tweet_id=c["id"],
prompt_user=self.metadata["prompt_user"],
from_df=children_df,
children_df=children_df,
metadata=c,
)
for c in children_dicts
]
self.children = children
class ConversationTree:
def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None):
self.root = ConversationTreeNode(
tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata
)
self.metadata = None
# Create conversation trees
conv_tree_list = [
ConversationTree(
tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r
)
for r in tqdm(origin_tweets.to_dicts())
]
# Write conversation trees to jsonl file.
# Might need to clean up the last newline.
with open(output_path, "w") as output:
for t in tqdm(conv_tree_list):
json.dump(obj=t, fp=output, default=lambda x: x.__dict__)
output.write("\n")