MAXIR向量版最佳实践
1. 访问
MAXIR 提供了多种访问集群的方式,以满足不同的用户需求。本文涉及文件导入操作,建议使用方式一操作。
方式一:使用 postgres client psql 访问
在与MAXIR处于同一VPC下的云主机上,安装postgres client psql 版本。建议最佳为12版本。用户名操作请参考 用户管理 。
psql -h {maxir地址} -U {maxir控制台创建的用户名} -d ${maxir_database}方式二:使用云平台提供的“DWSU控制台”访问
如您想详细了解如何创建集群,请参考 访问集群 。
2. DDL
2.1 CREATE TABLE
示例
CREATE TABLE test_tbl
(
id VARCHAR,
vector vecf16(25),
post_publish_time TIMESTAMP
)
USING heap distributed by(id)
PARTITION BY RANGE (post_publish_time) (START (DATE '2024-08-01') INCLUSIVE END (DATE '2024-08-31') EXCLUSIVE EVERY(INTERVAL '1 day'), DEFAULT PARTITION extra);
ALTER TABLE test_tbl ALTER COLUMN vector SET STORAGE PLAIN;创建以后表的信息如下图:
-
表数据存储依据id作分布
-
使用时间标识的列(创建时间,插入时间等)作为分区列,按天创建分区表用于更高效的增量更新数据
-
修改vector(向量数据)的存储方式为plain以加快查询
向量类型介绍
| 类型名称 | 描述 |
|---|---|
| vecf16 | 向量半精度数据,数据类型为float16浮点数 |
2.2 CREATE INDEX
示例
CREATE INDEX test_vector_idx ON test_tbl USING vectors (vector vecf16_l2_ops) WITH (options='
optimizing.optimizing_threads = 5
segment.max_growing_segment_size = 100000
segment.max_sealed_segment_size = 8000000
[indexing.hnsw]
m=30
ef_construction=250
quantization.product.ratio = "x16"
');增加索引后的表结构:
参数介绍
vecf16_l2_ops
vecf16对应建表语句中向量列的类型
l2_ops表示向量的距离计算公式
向量距离计算支持以下3种:
| 类型名称 | 操作符(可在SELECT中使用该操作符计算距离,详见下方查询语句) | 描述 | 计算公式定义 |
|---|---|---|---|
| l2_ops | <-> | 平方欧氏距离 |
建议使用l2_ops以达到最佳性能
optimizing.optimizing_threads = 5
segment.max_growing_segment_size = 100000
segment.max_sealed_segment_size = 8000000
以上部分为基础向量索引参数,其取值为实际测试过程中获取的最佳值,可直接使用。
其具体含义如下:
| 键 | 类型 | 取值范围 | 默认值 | 说明 |
|---|---|---|---|---|
optimizing.optimizing_threads | integer | [1, 65535] | 1 | 构建索引的最大线程数。 |
optimizing.sealing_secs | integer | [1, 60] | 60 | 构建索引合并探测时间 |
segment.max_growing_segment_size | integer | [1, 4_000_000_000] | 20_000 | 未创建索引的向量的最大大小。 |
segment.max_sealed_segment_size | integer | [1, 4_000_000_000] | 1_000_000 | 用于索引创建的向量的最大大小。 |
[indexing.hnsw]
m=30
ef_construction=250
quantization.product.ratio = “x16”
以上部分指定向量索引的具体类型,以及该类型的一些相关配置。
选择适当的索引算法对于向量搜索的效率、资源利用率以及准确性等至关重要。MAXIR 支持如下三种算法:
HNSW(Hierarchical Navigable Small World)
HNSW算法将跳跃列表(skip list)的概念与可导航小世界图相结合,通过采用分层结构实现了高效的近似最近邻搜索。
HNSW 算法适用于大规模数据集和高维度数据的近似最近邻搜索任务。其优势在于能够快速导航和搜索,并且具有较好的搜索准确性。与传统的 NSW 搜索相比,HNSW 在搜索过程中利用了分层结构,避免了陷入局部最优的问题,提高了搜索的效率和准确性。
其中指定的hnsw索引相关参数含义如下:
| 键 | 类型 | 取值范围 | 默认值 | 说明 |
|---|---|---|---|---|
m | integer | [4, 128] | 12 | 节点的最大度数。 |
ef_construction | integer | [10, 2000] | 300 | 构建中的搜索范围。 |
quantization | table | trivial:不使用量化 scalar:使用标量量 product:使用乘积量化(推荐) | N/A | 计算距离使用的量化算法,其具体参数见下方表格 |
quantization.product 的选项含义
| 键 | 类型 | 取值范围 | 默认值 | 说明 |
|---|---|---|---|---|
sample | integer | [1, 1_000_000] | 65535 | 用于量化的样本。 |
ratio | enum | ”x4”, “x8”, “x16”, “x32”, “x64” | "x4" | 量化的压缩比。 |
3. LOAD DATA
3.1 生成导入数据
测试数据可由https://huggingface.co/datasets/hhy3/ann-datasets/tree/main 下载
本文选取glove-25-angular作为实践数据集
通过如下python脚本可以提取出数据集中的训练数据集(glove-25-angular-train.csv)
若不存在pandas或h5py,请通过以下命令安装:
pip3 install pandas python3 -m pip install h5py
import uuid
import random
import pandas as pd
import h5py
# 读取 hdf5 文件
file = h5py.File('glove-25-angular.hdf5', 'r')
print("output train!")
df = pd.DataFrame(file['train'][:])
df['ID'] = [str(uuid.uuid4()) for _ in range(len(df))]
date_range = pd.date_range(start='2024-08-01', end='2024-08-31', freq='T')
df['Timestamp'] = [random.choice(date_range).isoformat() for _ in range(len(df))]
df['Data'] = df.apply(lambda row: '[' + ','.join(map(str, row[:-2])) + ']', axis=1)
# 调整列顺序
df = df[['ID', 'Data', 'Timestamp']]
# 将结果写入新的 CSV 文件,使用 '|' 作为分隔符
df.to_csv('glove-25-angular-train.csv', sep='|', index=False, header=False)提取出的数据每一行为一条数据,数据格式如下,分别对应ID,向量数据及publish_time,用|分开
b6b07b6d-7f6c-4ffc-b2c5-f753cefc2e90|[-0.28571999073028564,1.6030000448226929,-0.23368999361991882,0.42476001381874084,0.071834996342659,-1.6633000373840332,-0.6774700284004211,-0.20066000521183014,0.7255899906158447,-0.722599983215332,0.09668300300836563,1.0442999601364136,1.1964000463485718,-0.2735399901866913,1.44159996509552,0.06502100080251694,0.9345399737358093,-0.40575000643730164,0.9226999878883362,-0.29600998759269714,-0.5180299878120422,0.8512099981307983,-1.0339000225067139,0.050655998289585114,0.13964000344276428]|2024-08-16T07:22:00数据上传到us3(us3使用文档参考https://docs.ucloud.cn/ufile/tools/us3cli/introduction)
us3cli cp glove-25-angular-train.csv us3://maxir-data-001/backup-data/glove-25-angular-train.csv3.2 本地导入
SQL语句默认无法从本地文件导入数据,若需要导入客户端所在机器的数据文件,可以使用psql客户端进行连接,示例:
假设路径为/home/postgres/glove-25-angular-train.csv,使用psql进行连接(连接地址及账号密码可从控制台中获取):
psql -h xxxx -p xxxx -d xxxx 然后执行下面的语句:
\COPY test_tbl from '/home/postgres/glove-25-angular-train.csv' WITH CSV DELIMITER '|';3.3 从us3导入
数据已经正常上传到us3上,获取us3的路径和ak,sk(参考文档:https://docs.ucloud.cn/ufile/quick/quick_start):
copy test_tbl from 's3://internal.s3-cn-sh2.ufileos.com/maxir-data-001/backup-data/glove-25-angular-train.csv'
ACCESS_KEY_ID 'xxxxxxxx'
SECRET_ACCESS_KEY 'xxxxxxxx'
DELIMITER '|' CSV;4. QUERY
q1: 查询所有向量中离目标向量最近的10个向量
SELECT id,
vector <-> '[0.6377800107002258,0.9509999752044678,0.9408400058746338,-0.5509499907493591,0.06180400028824806,-1.6734999418258667,-0.5704600214958191,-1.5750000476837158,0.5274199843406677,-0.3642300069332123,0.5622000098228455,0.009283199906349182,0.391759991645813,0.46647000312805176,-0.7589899897575378,0.3084399998188019,0.4611699879169464,0.30028998851776123,1.5491000413894653,1.2386000156402588,-0.7254599928855896,1.7488000392913818,0.4075799882411957,-1.96589994430542,0.05322200059890747]'
AS dist FROM test_tbl order by dist limit 10;q2:查询8月8号到15号区间内,由公式 dist * 10 为算法计算相似度,获得相似度大于67,最相似的20条数据
SELECT b.* FROM (SELECT a.id, a.dist * 10 AS similarity FROM
(SELECT id,
vector <-> '[0.6377800107002258,0.9509999752044678,0.9408400058746338,-0.5509499907493591,0.06180400028824806,-1.6734999418258667,-0.5704600214958191,-1.5750000476837158,0.5274199843406677,-0.3642300069332123,0.5622000098228455,0.009283199906349182,0.391759991645813,0.46647000312805176,-0.7589899897575378,0.3084399998188019,0.4611699879169464,0.30028998851776123,1.5491000413894653,1.2386000156402588,-0.7254599928855896,1.7488000392913818,0.4075799882411957,-1.96589994430542,0.05322200059890747]'
AS dist FROM test_tbl
WHERE post_publish_time >= '2024-08-08 00:00:00'
AND post_publish_time <= '2024-8-15 10:52:00'
ORDER BY dist ASC LIMIT 5000) as a) as b WHERE b.similarity > 67 offset 0 limit 20;5. 使用python客户端连接MAXIR
5.1 下载依赖python包
psycopg2:用于与MAXIR交互
使用 pip3 install psycopg2-binary 自助安装psycopg2包
5.2 编写python程序执行
查询
执行python脚本,并且把需要执行的sql放到sql_template中即可执行脚本
import psycopg2
database='postgres'
user='postgres'
password=''
host='localhost'
port='5432'
def run_query(table, vector):
conn = psycopg2.connect(database=database, user=user,
password=password, host=host, port=port)
sql_template = " select id from " + table + " order by vector <-> "
cur = conn.cursor()
sql = sql_template + "'" + vector + "'" + " limit 3;"
cur.execute(sql)
cur.close()
conn.close()
if __name__ == '__main__':
run_query('postgres', '[1,2,3]')插入数据
使用命令python3 insert.py --t 10 --dir ./data --batch 128执行数据的导入,需要导入的数据放到data目录中,执行脚本之前先确认好database,user,password,host等参数
import argparse
import psycopg2
from multiprocessing import Process, Value, Lock, Event, Queue
import os
import time
import sys
database='postgres'
user='postgres'
password=''
host='localhost'
port='5432'
insert_init = "insert into test_tbl values "
g_max_retry=3
ignore_error=True
load_batch=128
num_threads=10
load_dir='data'
def execute_with_retry(conn, sql):
failed = True
for i in range(g_max_retry):
try:
cur = conn.cursor()
cur.execute(sql)
cur.close()
conn.commit()
failed = False
break
except psycopg2.DatabaseError as e:
print(f"[WARNING] execute sql {sql[:100]} failed, error: {str(e)}")
conn.rollback()
return failed
def load_task():
print("load:", load_dir)
processes = []
global num_threads
stop_event = Event()
file_queue = Queue()
lines = Value('i', 0)
lines_lock = Lock()
files = os.listdir(load_dir)
tmp_input_files = [os.path.join(load_dir, filename) for filename in files]
# remove directory
for path in tmp_input_files:
if os.path.isdir(path):
print(f"[WARNING] remove directory {path}")
else:
file_queue.put(path)
print(f"[INFO] need load file {path}.")
if file_queue.qsize() <= 0:
print(f"[WARNING] do not have invalid file at dir {load_dir}, exit.")
sys.exit
input_file_size = file_queue.qsize()
if num_threads > input_file_size:
print(
f"[WARNING] num_threads:{num_threads} > input_files len:{input_file_size}, we just need {input_file_size} threads.")
num_threads = input_file_size
start_time = time.time()
for i in range(num_threads):
process = Process(target=do_load, args=(stop_event, file_queue, lines, lines_lock,))
process.start()
processes.append(process)
for p in processes:
p.join()
end_time = time.time()
interval = end_time - start_time
print(f"[INFO] RPS: {lines.value / interval}, load data time:{interval}, total lines {lines.value}.")
def do_load(stop_event, file_queue, lines, lines_lock):
print("do_load")
try:
conn = psycopg2.connect(database=database, user=user,
password=password, host=host, port=port)
while not file_queue.empty():
filename = file_queue.get()
total_cnt = 0
try:
cnt = 0
sql = insert_init
failed = False
with open(filename, 'r') as f:
for line in f:
if stop_event.is_set():
print("stop by event, exit.")
failed = True
break
line = line.replace('|', "','")
line = line.strip()
if cnt >= (load_batch - 1):
sql = sql + "('" + line + "');"
# print(sql)
failed = execute_with_retry(conn, sql)
if failed and not ignore_error:
break
total_cnt += cnt
cnt = 0
sql = insert_init
else:
sql = sql + "('" + line + "'),"
cnt += 1
if cnt > 0:
# print("last batch")
sql = sql[:-1] + ";"
# print(sql)
failed = execute_with_retry(conn, sql)
if not failed:
total_cnt += cnt
except Exception as e:
print(f"[FATAL] open file {filename} failed: {str(e)}")
stop_event.set()
failed = True
print(f"[INFO] load {total_cnt} lines into database.")
with lines_lock:
lines.value += total_cnt
if failed:
print(f"[FATAL] execute sql faild, exit.")
stop_event.set()
break
conn.close()
except Exception as e:
print(f"[FATAL] run do_load woker faild, exit. {e} ")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process some flags.')
parser.add_argument('--t', type=int, help='Number of threads')
parser.add_argument('--dir', type=str, help='load data dir path')
parser.add_argument('--batch', type=int, default=100, help='batch num')
args = parser.parse_args()
num_threads = args.t
load_dir = args.dir
load_batch = args.batch
load_task()6. 使用pyspark客户端连接MAXIR
6.1 下载依赖python包
psycopg2:用于与MAXIR交互
yum install postgresql-devel*
pip3 install psycopg2-binarypyspark:用于使用spark客户端
pip3 install pyspark6.2 编写pyspark程序执行
准备工作
- 测试数据文件 file.csv 内容格式参考:(测试时可手动复制多条)
id,vector,post_publish_time
b6b07b6d-7f6c-4ffc-b2c5-f753cefc2e90,"-0.28571999073028564,1.6030000448226929,-0.23368999361991882,0.42476001381874084,0.071834996342659,-1.6633000373840332,-0.6774700284004211,-0.20066000521183014,0.7255899906158447,-0.722599983215332,0.09668300300836563,1.0442999601364136,1.1964000463485718,-0.2735399901866913,1.44159996509552,0.06502100080251694,0.9345399737358093,-0.40575000643730164,0.9226999878883362,-0.29600998759269714,-0.5180299878120422,0.8512099981307983,-1.0339000225067139,0.050655998289585114,0.13964000344276428",2024-08-16T07:22:00-
测试前需要手动在 MAXIR 上建好 database,以及 table,参考步骤2 DDL章节。
-
本代码基于 hdfs,需要先将数据文件(file.csv) put 到 hdfs,可按需修改脚本中数据文件路径。
-
需要下载postgresql-42.3.3.jar包:postgresql-42.3.3.jar ,下载地址为:https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar 。
通过pyspark插入数据
有两种方式写入数据到 MAXIR,分别为:
-
Load csv 文件方式:COPY … FROM …
-
JDBC 写入方式
Load csv 文件方式
通过提交本地spark任务,将数据通过spark预处理后导入到MAXIR中,提交任务命令:
spark-submit --deploy-mode client copydatatoMAXIR.py- copydatatoMAXIR.py
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import os
import glob
import subprocess
import shutil
# 创建SparkSession
spark = SparkSession.builder \
.appName("File splitter and copy to MAXIR") \
.getOrCreate()
# 读取本地文件
input_path = "/user/hadoop/spark-maxir-demo/file.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
# 对向量数据预处理udf
def array_to_string(array):
res = "[" + str(array) + "]"
return res
array_to_string_udf = udf(array_to_string, StringType())
# 使用UDF对向量列进行转换
df_transformed = df.withColumn("vector", array_to_string_udf(col("vector")))
# 重新分区(此处测试demo,只设置1个分区,实际场景可按数据量进行分区)
df = df_transformed.repartition(1)
# 输出到本地
output_path = "/user/hadoop/spark-maxir-demo/out/"
df.write.option("delimiter", "|").csv(output_path, header=True, mode="overwrite")
local_output_path = "/tmp/spark-maxir-demo/out/"
# 清空本地目录
if os.path.exists(local_output_path):
shutil.rmtree(local_output_path)
# 确保本地目录存在
os.makedirs(local_output_path, exist_ok=True)
# 下载 HDFS 上的文件到本地
subprocess.run(["hdfs", "dfs", "-get", output_path + "*", local_output_path])
# 连接到PostgreSQL数据库
conn = psycopg2.connect(
dbname="dbname",
user="user",
password="password",
host="127.0.0.1",
port="5432"
)
cur = conn.cursor()
def find_csv_files(directory):
# 使用glob模組查找所有以.csv結尾的文件
csv_files = glob.glob(os.path.join(directory, "*.csv"))
return csv_files
# 获取临时文件路径
csv_files = find_csv_files(local_output_path)
print("csv_files:", csv_files)
for csv_file in csv_files:
print("csv_file:", csv_file)
# 导入数据
with open(csv_file, 'r') as f:
print(f"pyspark:--------start copy {csv_file} to MAXIR--------")
cur.copy_expert("COPY test_tbl (id, vector, post_publish_time) FROM stdin WITH (FORMAT csv, HEADER true, DELIMITER '|')", f)
# 提交事务并关闭连接
conn.commit()
cur.close()
conn.close()
# 停止SparkSession
spark.stop()JDBC 写入方式
spark 任务提交命令:
spark-submit --deploy-mode client --jars postgresql-42.3.3.jar copydatatoMAXIR.py- copydatatoMAXIR.py
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import os
import glob
import subprocess
import shutil
# 创建SparkSession
spark = SparkSession.builder \
.appName("File splitter and copy to MAXIR") \
.getOrCreate()
# 读取本地文件
input_path = "/user/hadoop/spark-maxir-demo/file.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
# 对向量数据预处理udf
def array_to_string(array):
res = "[" + str(array) + "]"
return res
array_to_string_udf = udf(array_to_string, StringType())
# 使用UDF对向量列进行转换
df_transformed = df.withColumn("vector", array_to_string_udf(col("vector")))
# 重新分区(此处测试demo,只设置1个分区,实际场景可按数据量进行分区)
df = df_transformed.repartition(1)
df.write \
.mode("append") \
.format("jdbc") \
.option("url", "jdbc:postgresql://${ip}:5432/${database}") \
.option("driver", "org.postgresql.Driver") \
.option("isolationLevel", "REPEATABLE_READ") \
.option("dbtable", "${tablename}") \
.option("user", "${user}") \
.option("password", "${}") \
.save()
# 停止SparkSession
spark.stop()查询
通过提交本地spark任务,查询 MAXIR 中的数据,并且对查询的数据进行操作。提交任务命令:
spark-submit --deploy-mode client --jars postgresql-42.3.3.jar queryMAXIR.py- queryMAXIR.py
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Read Postgres Data") \
.getOrCreate()
# 设置Postgres数据库的JDBC URL
jdbc_url = "jdbc:postgresql://127.0.0.1:5432/postgres"
# 设置连接数据库
connection_properties = {
"user": "user",
"password": "password",
"driver": "org.postgresql.Driver"
}
# 读取数据
df = spark.read.jdbc(url=jdbc_url, table="(SELECT id, vector <-> '[-0.28571999073028564,1.6030000448226929,-0.23368999361991882,0.42476001381874084,0.071834996342659,-1.6633000373840332,-0.6774700284004211,-0.20066000521183014,0.7255899906158447,-0.722599983215332,0.09668300300836563,1.0442999601364136,1.1964000463485718,-0.2735399901866913,1.44159996509552,0.06502100080251694,0.9345399737358093,-0.40575000643730164,0.9226999878883362,-0.29600998759269714,-0.5180299878120422,0.8512099981307983,-1.0339000225067139,0.050655998289585114,0.139640003442764285]' as distance FROM test_tbl order by distance) AS test_tbl_alias", properties=connection_properties)
# 显示数据
# +--------------------+--------+
# | id|distance|
# +--------------------+--------+
# |b6b07b6d-7f6c-4ff...| 0.0|
# |b6b07b6d-7f6c-4ff...| 0.0|
# +--------------------+--------+
df.show()
# 过滤数据
filtered_df = df.filter(df["distance"] == 0)
# 显示结果
# +--------------------+--------+
# | id|distance|
# +--------------------+--------+
# |b6b07b6d-7f6c-4ff...| 0.0|
# |b6b07b6d-7f6c-4ff...| 0.0|
# +--------------------+--------+
filtered_df.show()
spark.stop()