forked from LAION-AI/Open-Assistant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
344 lines (299 loc) · 14.1 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import random
from typing import Any, Optional, Tuple
from uuid import UUID
from fastapi import APIRouter, Depends
from fastapi.security.api_key import APIKey
from loguru import logger
from oasst_backend.api import deps
from oasst_backend.api.v1.utils import prepare_conversation
from oasst_backend.config import settings
from oasst_backend.prompt_repository import PromptRepository, TaskRepository
from oasst_backend.utils.hugging_face import HfEmbeddingModel, HfUrl, HuggingFaceAPI
from oasst_shared.exceptions import OasstError, OasstErrorCode
from oasst_shared.schemas import protocol as protocol_schema
from sqlmodel import Session
from starlette.status import HTTP_204_NO_CONTENT
router = APIRouter()
def generate_task(
request: protocol_schema.TaskRequest, pr: PromptRepository
) -> Tuple[protocol_schema.Task, Optional[UUID], Optional[UUID]]:
message_tree_id = None
parent_message_id = None
match request.type:
case protocol_schema.TaskRequestType.random:
logger.info("Frontend requested a random task.")
disabled_tasks = (
protocol_schema.TaskRequestType.random,
protocol_schema.TaskRequestType.summarize_story,
protocol_schema.TaskRequestType.rate_summary,
)
candidate_tasks = set(protocol_schema.TaskRequestType).difference(disabled_tasks)
request.type = random.choice(tuple(candidate_tasks)).value
return generate_task(request, pr)
# AKo: Summary tasks are currently disabled/supported, we focus on the conversation tasks.
# case protocol_schema.TaskRequestType.summarize_story:
# logger.info("Generating a SummarizeStoryTask.")
# task = protocol_schema.SummarizeStoryTask(
# story="This is a story. A very long story. So long, it needs to be summarized.",
# )
# case protocol_schema.TaskRequestType.rate_summary:
# logger.info("Generating a RateSummaryTask.")
# task = protocol_schema.RateSummaryTask(
# full_text="This is a story. A very long story. So long, it needs to be summarized.",
# summary="This is a summary.",
# scale=protocol_schema.RatingScale(min=1, max=5),
# )
case protocol_schema.TaskRequestType.initial_prompt:
logger.info("Generating an InitialPromptTask.")
task = protocol_schema.InitialPromptTask(
hint="Ask the assistant about a current event." # this is optional
)
case protocol_schema.TaskRequestType.prompter_reply:
logger.info("Generating a PrompterReplyTask.")
messages = pr.fetch_random_conversation("assistant")
task_messages = [
protocol_schema.ConversationMessage(
text=msg.text,
is_assistant=(msg.role == "assistant"),
message_id=msg.id,
front_end_id=msg.frontend_message_id,
)
for msg in messages
]
task = protocol_schema.PrompterReplyTask(conversation=protocol_schema.Conversation(messages=task_messages))
message_tree_id = messages[-1].message_tree_id
parent_message_id = messages[-1].id
case protocol_schema.TaskRequestType.assistant_reply:
logger.info("Generating a AssistantReplyTask.")
messages = pr.fetch_random_conversation("prompter")
task_messages = [
protocol_schema.ConversationMessage(
text=msg.text,
is_assistant=(msg.role == "assistant"),
message_id=msg.id,
front_end_id=msg.frontend_message_id,
)
for msg in messages
]
task = protocol_schema.AssistantReplyTask(conversation=protocol_schema.Conversation(messages=task_messages))
message_tree_id = messages[-1].message_tree_id
parent_message_id = messages[-1].id
case protocol_schema.TaskRequestType.rank_initial_prompts:
logger.info("Generating a RankInitialPromptsTask.")
messages = pr.fetch_random_initial_prompts()
task = protocol_schema.RankInitialPromptsTask(prompts=[msg.text for msg in messages])
case protocol_schema.TaskRequestType.rank_prompter_replies:
logger.info("Generating a RankPrompterRepliesTask.")
conversation, replies = pr.fetch_multiple_random_replies(message_role="assistant")
task_messages = [
protocol_schema.ConversationMessage(
text=p.text,
is_assistant=(p.role == "assistant"),
message_id=p.id,
front_end_id=p.frontend_message_id,
)
for p in conversation
]
replies = [p.text for p in replies]
task = protocol_schema.RankPrompterRepliesTask(
conversation=protocol_schema.Conversation(
messages=task_messages,
),
replies=replies,
)
case protocol_schema.TaskRequestType.rank_assistant_replies:
logger.info("Generating a RankAssistantRepliesTask.")
conversation, replies = pr.fetch_multiple_random_replies(message_role="prompter")
task_messages = [
protocol_schema.ConversationMessage(
text=p.text,
is_assistant=(p.role == "assistant"),
message_id=p.id,
front_end_id=p.frontend_message_id,
)
for p in conversation
]
replies = [p.text for p in replies]
task = protocol_schema.RankAssistantRepliesTask(
conversation=prepare_conversation(conversation),
replies=replies,
)
case protocol_schema.TaskRequestType.label_initial_prompt:
logger.info("Generating a LabelInitialPromptTask.")
message = pr.fetch_random_initial_prompts(1)[0]
task = protocol_schema.LabelInitialPromptTask(
message_id=message.id,
prompt=message.text,
valid_labels=list(map(lambda x: x.value, protocol_schema.TextLabel)),
)
case protocol_schema.TaskRequestType.label_prompter_reply:
logger.info("Generating a LabelPrompterReplyTask.")
conversation, messages = pr.fetch_multiple_random_replies(max_size=1, message_role="assistant")
message = messages[0]
task = protocol_schema.LabelPrompterReplyTask(
message_id=message.id,
conversation=prepare_conversation(conversation),
reply=message.text,
valid_labels=list(map(lambda x: x.value, protocol_schema.TextLabel)),
)
case protocol_schema.TaskRequestType.label_assistant_reply:
logger.info("Generating a LabelAssistantReplyTask.")
conversation, messages = pr.fetch_multiple_random_replies(max_size=1, message_role="prompter")
message = messages[0]
task = protocol_schema.LabelAssistantReplyTask(
message_id=message.id,
conversation=prepare_conversation(conversation),
reply=message.text,
valid_labels=list(map(lambda x: x.value, protocol_schema.TextLabel)),
)
case _:
raise OasstError("Invalid request type", OasstErrorCode.TASK_INVALID_REQUEST_TYPE)
logger.info(f"Generated {task=}.")
return task, message_tree_id, parent_message_id
@router.post(
"/",
response_model=protocol_schema.AnyTask,
dependencies=[
Depends(deps.UserRateLimiter(times=100, minutes=5)),
Depends(deps.APIClientRateLimiter(times=10_000, minutes=1)),
],
) # work with Union once more types are added
def request_task(
*,
db: Session = Depends(deps.get_db),
api_key: APIKey = Depends(deps.get_api_key),
request: protocol_schema.TaskRequest,
) -> Any:
"""
Create new task.
"""
api_client = deps.api_auth(api_key, db)
try:
pr = PromptRepository(db, api_client, client_user=request.user)
task, message_tree_id, parent_message_id = generate_task(request, pr)
pr.task_repository.store_task(task, message_tree_id, parent_message_id, request.collective)
except OasstError:
raise
except Exception:
logger.exception("Failed to generate task..")
raise OasstError("Failed to generate task.", OasstErrorCode.TASK_GENERATION_FAILED)
return task
@router.post("/{task_id}/ack", response_model=None, status_code=HTTP_204_NO_CONTENT)
def tasks_acknowledge(
*,
db: Session = Depends(deps.get_db),
api_key: APIKey = Depends(deps.get_api_key),
task_id: UUID,
ack_request: protocol_schema.TaskAck,
) -> None:
"""
The frontend acknowledges a task.
"""
api_client = deps.api_auth(api_key, db)
try:
pr = PromptRepository(db, api_client)
# here we store the message id in the database for the task
logger.info(f"Frontend acknowledges task {task_id=}, {ack_request=}.")
pr.task_repository.bind_frontend_message_id(task_id=task_id, frontend_message_id=ack_request.message_id)
except OasstError:
raise
except Exception:
logger.exception("Failed to acknowledge task.")
raise OasstError("Failed to acknowledge task.", OasstErrorCode.TASK_ACK_FAILED)
@router.post("/{task_id}/nack", response_model=None, status_code=HTTP_204_NO_CONTENT)
def tasks_acknowledge_failure(
*,
db: Session = Depends(deps.get_db),
api_key: APIKey = Depends(deps.get_api_key),
task_id: UUID,
nack_request: protocol_schema.TaskNAck,
) -> None:
"""
The frontend reports failure to implement a task.
"""
try:
logger.info(f"Frontend reports failure to implement task {task_id=}, {nack_request=}.")
api_client = deps.api_auth(api_key, db)
pr = PromptRepository(db, api_client)
pr.task_repository.acknowledge_task_failure(task_id)
except (KeyError, RuntimeError):
logger.exception("Failed to not acknowledge task.")
raise OasstError("Failed to not acknowledge task.", OasstErrorCode.TASK_NACK_FAILED)
@router.post("/interaction", response_model=protocol_schema.TaskDone)
async def tasks_interaction(
*,
db: Session = Depends(deps.get_db),
api_key: APIKey = Depends(deps.get_api_key),
interaction: protocol_schema.AnyInteraction,
) -> Any:
"""
The frontend reports an interaction.
"""
api_client = deps.api_auth(api_key, db)
try:
pr = PromptRepository(db, api_client, client_user=interaction.user)
match type(interaction):
case protocol_schema.TextReplyToMessage:
logger.info(
f"Frontend reports text reply to {interaction.message_id=} with {interaction.text=} by {interaction.user=}."
)
# here we store the text reply in the database
newMessage = pr.store_text_reply(
text=interaction.text,
frontend_message_id=interaction.message_id,
user_frontend_message_id=interaction.user_message_id,
)
if not settings.DEBUG_SKIP_EMBEDDING_COMPUTATION:
try:
hugging_face_api = HuggingFaceAPI(
f"{HfUrl.HUGGINGFACE_FEATURE_EXTRACTION.value}/{HfEmbeddingModel.MINILM.value}"
)
embedding = await hugging_face_api.post(interaction.text)
pr.insert_message_embedding(
message_id=newMessage.id, model=HfEmbeddingModel.MINILM.value, embedding=embedding
)
except OasstError:
logger.error(
f"Could not fetch embbeddings for text reply to {interaction.message_id=} with {interaction.text=} by {interaction.user=}."
)
return protocol_schema.TaskDone()
case protocol_schema.MessageRating:
logger.info(
f"Frontend reports rating of {interaction.message_id=} with {interaction.rating=} by {interaction.user=}."
)
# here we store the rating in the database
pr.store_rating(interaction)
return protocol_schema.TaskDone()
case protocol_schema.MessageRanking:
logger.info(
f"Frontend reports ranking of {interaction.message_id=} with {interaction.ranking=} by {interaction.user=}."
)
# TODO: check if the ranking is valid
pr.store_ranking(interaction)
# here we would store the ranking in the database
return protocol_schema.TaskDone()
case protocol_schema.TextLabels:
logger.info(
f"Frontend reports labels of {interaction.message_id=} with {interaction.labels=} by {interaction.user=}."
)
# Labels are implicitly validated when converting str -> TextLabel
# So no need for explicit validation here
pr.store_text_labels(interaction)
return protocol_schema.TaskDone()
case _:
raise OasstError("Invalid response type.", OasstErrorCode.TASK_INVALID_RESPONSE_TYPE)
except OasstError:
raise
except Exception:
logger.exception("Interaction request failed.")
raise OasstError("Interaction request failed.", OasstErrorCode.TASK_INTERACTION_REQUEST_FAILED)
@router.post("/close", response_model=protocol_schema.TaskDone)
def close_collective_task(
close_task_request: protocol_schema.TaskClose,
db: Session = Depends(deps.get_db),
api_key: APIKey = Depends(deps.get_api_key),
):
api_client = deps.api_auth(api_key, db)
tr = TaskRepository(db, api_client)
tr.close_task(close_task_request.message_id)
return protocol_schema.TaskDone()