1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
#!/usr/bin/env python
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
"""Script that downloads a public dataset and streams it to an Elasticsearch cluster"""
import csv
from os.path import abspath, join, dirname, exists
import tqdm
import urllib3
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
NYC_RESTAURANTS = (
"https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD"
)
DATASET_PATH = join(dirname(abspath(__file__)), "nyc-restaurants.csv")
CHUNK_SIZE = 16384
def download_dataset():
"""Downloads the public dataset if not locally downlaoded
and returns the number of rows are in the .csv file.
"""
if not exists(DATASET_PATH):
http = urllib3.PoolManager()
resp = http.request("GET", NYC_RESTAURANTS, preload_content=False)
if resp.status != 200:
raise RuntimeError("Could not download dataset")
with open(DATASET_PATH, mode="wb") as f:
chunk = resp.read(CHUNK_SIZE)
while chunk:
f.write(chunk)
chunk = resp.read(CHUNK_SIZE)
with open(DATASET_PATH) as f:
return sum([1 for _ in f]) - 1
def create_index(client):
"""Creates an index in Elasticsearch if one isn't already there."""
client.indices.create(
index="nyc-restaurants",
body={
"settings": {"number_of_shards": 1},
"mappings": {
"properties": {
"name": {"type": "text"},
"borough": {"type": "keyword"},
"cuisine": {"type": "keyword"},
"grade": {"type": "keyword"},
"location": {"type": "geo_point"},
}
},
},
ignore=400,
)
def generate_actions():
"""Reads the file through csv.DictReader() and for each row
yields a single document. This function is passed into the bulk()
helper to create many documents in sequence.
"""
with open(DATASET_PATH) as f:
reader = csv.DictReader(f)
for row in reader:
doc = {
"_id": row["CAMIS"],
"name": row["DBA"],
"borough": row["BORO"],
"cuisine": row["CUISINE DESCRIPTION"],
"grade": row["GRADE"] or None,
}
lat = row["Latitude"]
lon = row["Longitude"]
if lat not in ("", "0") and lon not in ("", "0"):
doc["location"] = {"lat": float(lat), "lon": float(lon)}
yield doc
def main():
print("Loading dataset...")
number_of_docs = download_dataset()
client = Elasticsearch(
# Add your cluster configuration here!
)
print("Creating an index...")
create_index(client)
print("Indexing documents...")
progress = tqdm.tqdm(unit="docs", total=number_of_docs)
successes = 0
for ok, action in streaming_bulk(
client=client, index="nyc-restaurants", actions=generate_actions(),
):
progress.update(1)
successes += ok
print("Indexed %d/%d documents" % (successes, number_of_docs))
if __name__ == "__main__":
main()
|