w

waynewang

V1

2023/05/09阅读:175主题:丘比特忙

向量数据库个人笔记

向量数据库个人笔记

传统的搜索比如Mysql用b+树索引, EleasticSearch索引倒排但本质上都是精确匹配 基于向量检索技术, 做相似度判断去寻找最相似, 可以更好的对图片, 视频等非结构化数据做检索, 玩的就是多维

什么是向量检索?

当查找与当前相似度最高的内容时, 向量搜索都可简化为这三个步骤

  1. 首先, 候选和已选内容都转为向量
  2. 遍历候选向量与已选向量做余弦相似度计算,然后按照计算出的余弦相似度排序
  3. 找出最相似的top N

Milvus向量数据库

Milvus&Llama_index

Llama_index结合milvus, 基于li的多种插件, 对于NLP十分擅长

# 使用llama_index读取数据
# 结合pylimvus导入到milvus数据库


#step1 - 读取数据为文档
from llama_index import download_loader
from glob import glob

MarkdownReader = download_loader("MarkdownReader")
markdownreader = MarkdownReader()

docs = []
for file in glob("./milvus-docs/site/en/**/*.md", recursive=True):
    docs.extend(markdownreader.load_data(file=file))


#step2 - 文档上传到数据库
from llama_index import GPTMilvusIndex
index = GPTMilvusIndex.from_documents(docs, host=HOST, port=PORT, overwrite=True)


#step3 - 查询数据
s = index.query("What is a collection?")
# Output:
# A collection in Milvus is a logical grouping of entities, similar to a table in a relational database management system (RDBMS). It is used to store and manage entities.


#step4 - 保存连接信息复用
saved = index.save_to_dict()
index = GPTMilvusIndex.load_from_dict(saved, overwrite = False)
s = index.query("What communication protocol is used in Pymilvus for commicating with Milvus?")

Milvus&Towhee

Towhee是一个神经网络数据处理流水线, 和llama_index的形态很像, 支持多种数据导入插件, 各擅长于cv 即处理图片和视频

使用towhee检索文本
# 使用towhee导入文本数据

import pandas as pd
import numpy as np
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from openai.embeddings_utils import get_embedding, get_embeddings
from towhee import pipe, ops
import csv
from towhee.datacollection import DataCollection

# 需要封装成工具
#######todo##############


# 创建数据库的collection
# vector字段需要制定dimension, 代表向量的维度, 向量维度取决于你生成embedding列表中的元素个数
def create_collection(collection_name, dim):
    connections.connect(host='172.24.15.115', port='19530')
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)

    fields = [
        FieldSchema(name='id', dtype=DataType.VARCHAR, descrition='ids', max_length=500, is_primary=True, auto_id=False),
        FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=dim)
    ]
    schema = CollectionSchema(fields=fields, description='reverse image search')
    collection = Collection(name=collection_name, schema=schema)

    # create IVF_FLAT index for collection.
    index_params = {
        'metric_type':'L2',
        'index_type':"IVF_FLAT",
        'params':{"nlist":768}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    return collection




def write_collection():
    insert_pipe = (
        pipe.input('id''question''answer')
            .map('question''vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base'))
            .map('vec''vec'lambda x: x / np.linalg.norm(x, axis=0))
            .map(('id''vec'), 'insert_status', ops.ann_insert.milvus_client(host='172.24.15.115', port='19530', collection_name='question_answer'))
            .output()
    )
    
    with open('question_answer.csv', encoding='utf-8'as f:
        reader = csv.reader(f)
        next(reader)
        for row in reader:
            insert_pipe(*row)
    collection.load()




def query_collection(qs, collection_name, source_dict):
    
    ans_pipe = (
    pipe.input('question')
        .map('question''vec', ops.text_embedding.dpr(model_name="facebook/dpr-ctx_encoder-single-nq-base"))
        .map('vec''vec'lambda x: x / np.linalg.norm(x, axis=0))
        .map('vec''res', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name=c, limit=1))
        .map('res''answer'lambda x: [id_answer[int(i[0])] for i in x])
        .output('question''answer')
    )
    
    ans = ans_pipe(qs)
    ans = DataCollection(ans)
    # ans是一个 topk列表, 可以通过ans[0][字段名] 访问
    return ans

if __name__ == '__main__':
    # 创建表
    create_collection('question_answer'768)
    
    # 写入数据
    write_collection()

    df = pd.read_csv('question_answer.csv')
 id_answer = df.set_index('id')['answer'].to_dict()


    # 查询数据
    result = query_collection(qs='what is collection?', collection_name='question', source_dict=id_answer)    
 print(result)

Faiss

Faiss, 是一个较底层向量搜索库, 性能较高, 对10亿量级的索引可以做到毫秒级检索的性能, 支持GPU加速。

Faiss搜索
# step0 - 已有向量数据集
xb = get_vector()
    
# step1 - 创建索引
dim, measure = 64, faiss.METRIC_L2
param = 'Flat'
index = faiss.index_factory(dim, param, measure)
index.is_trained

# step2 - 添加向量
index.add(xb)

# step3 - 查询最相似的top4
index.search(xq, 4)

向量数据库部署和集成Web UI

# 部署docker
$ yum install docker
$ service start docker

# 部署docker-compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

# 下载yml文件同时启动容器
$ wget https://github.com/milvus-io/milvus/releases/download/v2.2.8/milvus-standalone-docker-compose.yml -O docker-compose.yml
$ docker-compose up -d 

# 部署attu
docker run -p 8000:3000 -e HOST_URL=http://localhost:8000 -e MILVUS_URL=localhost:19530 zilliz/attu:latest

# 访问attu web
http://localhost:8000

#填写milvus集群连接地址
milvus address ip: 19530

参考

个人总结

  1. 最近写python较少, pandas和numpy操作有些遗忘了
  2. 不必去理解太高深的算法, 主要掌握基本算法和工程运用

下一篇文章预计会写下GPTcache结合向量检索的使用

分类:

人工智能

标签:

自然语言处理

作者介绍

w
waynewang
V1