基于 magi_dataset 构建 ElasticSearch 搜索

date
Feb 15, 2023
slug
from-magi_dataset-to-elasticsearch
status
Published
summary
如何在Amazon EC2上配置Elasticsearch并基于 magi_dataset导入数据
type
Post
tags
DataScience
SemanticSearch
notion image
magi_dataset 是我为了近期的一个小项目写的数据集工具。通过这个工具,你可以快速访问从 GitHub 和 HackerNews 抓取的开源软件语料。通过这个工具,可以快速建立GitHub的语义搜索和传统检索服务,也可以基于Metarank做二者的混合搜索。
本文简单介绍如何在Amazon EC2上配置Elasticsearch,然后基于 magi_dataset向Elasticsearch导入数据。

安装Elasticsearch 8.6.2

首先创建一台 EC2 Instance,系统选用 Amazon Linux。运行下列命令:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm.sha512 shasum -a 512 -c elasticsearch-8.6.2-x86_64.rpm.sha512 sudo rpm --install elasticsearch-8.6.2-x86_64.rpm
完成后理论上会自动执行安全配置,并且输出配置结果。这时记得把终端输出拷贝保存一下。然后配置 systemd
sudo /bin/systemctl daemon-reload sudo /bin/systemctl enable elasticsearch.service sudo systemctl start elasticsearch.service
安装完成以后可以测试一下:
sudo curl --cacert /etc/elasticsearch/certs/http_ca.crt -u elastic https://localhost:9200
在要求 Enter host password for user 'elastic' 时输入刚才复制的密码。看到如下输出表示一切正常:
{ "name" : "ip-172-31-50-108.ec2.internal", "cluster_name" : "elasticsearch", "cluster_uuid" : "CnxGUCkvRpqQhRl4ghNhKQ", "version" : { "number" : "8.6.2", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "2d58d0f136141f03239816a4e360a8d17b6d8f29", "build_date" : "2023-02-13T09:35:20.314882762Z", "build_snapshot" : false, "lucene_version" : "9.4.2", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" }
接下来修改 Elasticsearch 的配置。在sudo nano /etc/elasticsearch/elasticsearch.yml中,修改
network.host: $EC2_IP_PRIV_ADDR
注意这里的$EC2_IP_PRIV_ADDR是机器的内网IP地址。然后重启服务
sudo systemctl stop elasticsearch.service sudo systemctl start elasticsearch.service
最后我们保存 http_ca.crt 证书到本地的机器上,方便以后连接使用。首先把这个文件的所有权转换到普通用户上:
sudo cp /etc/elasticsearch/certs/http_ca.crt . sudo chown $USER:$USER ./http_ca.crt
然后在本地机器上运行
scp ec2-user@$EC2_IP_ADDR:/home/ec2-user/http_ca.crt ./http_ca.crt
即可拷贝证书至本地。其中 $EC2_IP_ADDR 是这台 EC2 机器的公网 IP 地址。

使用 Python 接口建立 Index

Python Elasticsearch Client 文档:
Magi Dataset 文档:
尝试手动向这个 Elasticsearch 实例添加 magi_dataset 中的数据。首先安装依赖:
pip3 install magi_dataset elasticsearch
建立连接:
from magi_dataset import GitHubDataset from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from dataclasses import asdict from tqdm.auto import tqdm import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ELASTIC_PASSWORD = 'YOUR_PASSWORD' es = Elasticsearch( 'https://52.87.231.111:9200', # ssl_assert_fingerprint=CERT_FINGERPRINT, ca_certs = './http_ca.crt', basic_auth = ("elastic", ELASTIC_PASSWORD), verify_certs=False, ) es.info()
运行后显示:
ObjectApiResponse({'name': 'ip-172-31-50-108.ec2.internal', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'CnxGUCkvRpqQhRl4ghNhKQ', 'version': {'number': '8.6.2', 'build_flavor': 'default', 'build_type': 'rpm', 'build_hash': '2d58d0f136141f03239816a4e360a8d17b6d8f29', 'build_date': '2023-02-13T09:35:20.314882762Z', 'build_snapshot': False, 'lucene_version': '9.4.2', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})
下面来尝试批量上传数据。
def upload_to_es(es_instance, data, index:str, batch_size=1000): bulk_data = [] for i, repo in enumerate(tqdm(data)): bulk_data.append( { '_index': index, '_id': i, "_source": asdict(repo) } ) if (i + 1) % batch_size == 0: bulk(es_instance, bulk_data) bulk_data = [] bulk(es_instance, bulk_data) es_instance.indices.refresh(index=index) return es_instance.cat.count(index=index, format="json") for lang in ['Python', 'C++', 'JavaScript', 'Go', 'Rust']: lang_safe = lang.lower().replace('++', 'pp') es.options(ignore_status=400).indices.create(index=f'{lang_safe}-index') data = GitHubDataset(empty=False, file_path=f'{lang_safe}-latest') print( upload_to_es( data, index = f'{lang_safe}-index', batch_size = 1000 ) )
构建一个简单的搜索:
resp = es.search( index='python-index', body={ "query": { "match" : { "readme" : "python web archiving service" } }, } ) [(x['_source']['name'], x['_score']) for x in resp.body['hits']['hits']]
[('internetarchive/brozzler', 17.063648), ('ArchiveBox/ArchiveBox', 16.825933), ('Rhizome-Conifer/conifer', 15.135596), ('oduwsdl/ipwb', 14.298318), ('foxmask/django-th', 13.880616), ('wal-e/wal-e', 12.302505), ('laiwei/thepast', 11.558967), ('inAudible-NG/audible-activator', 11.079715), ('ciur/papermerge', 11.074305), ('WikiTeam/wikiteam', 10.133091)]
 
If you have any questions, please contact me.