|
| 1 | + |
| 2 | + |
| 3 | +from qdrant_client import models, QdrantClient |
| 4 | +import hashlib |
| 5 | +from concurrent.futures import ProcessPoolExecutor |
| 6 | +import json |
| 7 | +from sentence_transformers import SentenceTransformer |
| 8 | +from tqdm import tqdm |
| 9 | +import os |
| 10 | + |
| 11 | +os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| 12 | + |
| 13 | +def upload_records_process(documents_chunk): |
| 14 | + qdrant = QdrantClient() |
| 15 | + |
| 16 | + qdrant.upload_records("papers", [ |
| 17 | + models.Record( |
| 18 | + id=hashlib.md5(doc["id"].encode()).hexdigest(), |
| 19 | + vector=doc["vector"], |
| 20 | + payload=doc |
| 21 | + ) for doc in documents_chunk |
| 22 | + ]) |
| 23 | + |
| 24 | + |
| 25 | +print("Loading encoder...") |
| 26 | +encoder = SentenceTransformer('all-MiniLM-L6-v2', device='cuda') |
| 27 | + |
| 28 | +print(f"Opening documents file...") |
| 29 | + |
| 30 | +documents_list = [] |
| 31 | +with open("documents.json", "r") as fp: |
| 32 | + for line in fp: |
| 33 | + documents_list.append(json.loads(line)) |
| 34 | + |
| 35 | +print(f"Indexing {len(documents_list)} documents...") |
| 36 | + |
| 37 | +batch_size = 4096 |
| 38 | +documents_list_chunked = [documents_list[i:i + batch_size] for i in range(0, len(documents_list), batch_size)] |
| 39 | + |
| 40 | +qdrant = QdrantClient() |
| 41 | +qdrant.recreate_collection( |
| 42 | + collection_name="papers", |
| 43 | + vectors_config=models.VectorParams( |
| 44 | + size=encoder.get_sentence_embedding_dimension(), # Vector size is defined by used model |
| 45 | + distance=models.Distance.COSINE |
| 46 | + ) |
| 47 | +) |
| 48 | + |
| 49 | +# We want to upload the documents in parallel with continuing |
| 50 | +# to encode the next batch of documents. If we don't do this, |
| 51 | +# then we have a lot of GPU idle time while docs are being |
| 52 | +# uploaded to Qdrant. |
| 53 | +upload_executor = ProcessPoolExecutor(max_workers=3) |
| 54 | + |
| 55 | +for documents_chunk in tqdm(documents_list_chunked, desc="Processing document chunks"): |
| 56 | + abstracts = encoder.encode([doc["abstract"] for doc in documents_chunk]) |
| 57 | + for idx, doc in enumerate(documents_chunk): |
| 58 | + doc["vector"] = abstracts[idx].tolist() |
| 59 | + |
| 60 | + upload_executor.submit(upload_records_process, documents_chunk) |
| 61 | + |
| 62 | +# Wait for the executors to finish |
| 63 | +upload_executor.shutdown() |
0 commit comments