1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
from os.path import dirname, basename, abspath
from itertools import chain
from datetime import datetime
import logging
import git
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, streaming_bulk
def create_git_index(client, index):
# create empty index
client.indices.create(
index=index,
body={
'settings': {
# just one shard, no replicas for testing
'number_of_shards': 1,
'number_of_replicas': 0,
# custom analyzer for analyzing file paths
'analysis': {
'analyzer': {
'file_path': {
'type': 'custom',
'tokenizer': 'path_hierarchy',
'filter': ['lowercase']
}
}
}
}
},
# ignore already existing index
ignore=400
)
# we will use user on several places
user_mapping = {
'properties': {
'name': {
'type': 'multi_field',
'fields': {
'raw': {'type' : 'string', 'index' : 'not_analyzed'},
'name': {'type' : 'string'}
}
}
}
}
client.indices.put_mapping(
index=index,
doc_type='repos',
body={
'repos': {
'properties': {
'owner': user_mapping,
'created_at': {'type': 'date'},
'description': {
'type': 'string',
'analyzer': 'snowball',
},
'tags': {
'type': 'string',
'index': 'not_analyzed'
}
}
}
}
)
client.indices.put_mapping(
index=index,
doc_type='commits',
body={
'commits': {
'_parent': {
'type': 'repos'
},
'properties': {
'author': user_mapping,
'authored_date': {'type': 'date'},
'committer': user_mapping,
'committed_date': {'type': 'date'},
'parent_shas': {'type': 'string', 'index' : 'not_analyzed'},
'description': {'type': 'string', 'analyzer': 'snowball'},
'files': {'type': 'string', 'analyzer': 'file_path'}
}
}
}
)
def parse_commits(repo, name):
"""
Go through the git repository log and generate a document per commit
containing all the metadata.
"""
for commit in repo.log():
yield {
'_id': commit.id,
'_parent': name,
'committed_date': datetime(*commit.committed_date[:6]),
'committer': {
'name': commit.committer.name,
'email': commit.committer.email,
},
'authored_date': datetime(*commit.authored_date[:6]),
'author': {
'name': commit.author.name,
'email': commit.author.email,
},
'description': commit.message,
'parent_shas': [p.id for p in commit.parents],
# we only care about the filenames, not the per-file stats
'files': list(chain(commit.stats.files)),
'stats': commit.stats.total,
}
def load_repo(client, path=None, index='git'):
"""
Parse a git repository with all it's commits and load it into elasticsearch
using `client`. If the index doesn't exist it will be created.
"""
path = dirname(dirname(abspath(__file__))) if path is None else path
repo_name = basename(path)
repo = git.Repo(path)
create_git_index(client, index)
# create the parent document in case it doesn't exist
client.create(
index=index,
doc_type='repos',
id=repo_name,
body={},
ignore=409 # 409 - conflict - would be returned if the document is already there
)
# we let the streaming bulk continuously process the commits as they come
# in - since the `parse_commits` function is a generator this will avoid
# loading all the commits into memory
for ok, result in streaming_bulk(
client,
parse_commits(repo, repo_name),
index=index,
doc_type='commits',
chunk_size=50 # keep the batch sizes small for appearances only
):
action, result = result.popitem()
doc_id = '/%s/commits/%s' % (index, result['_id'])
# process the information from ES whether the document has been
# successfully indexed
if not ok:
print('Failed to %s document %s: %r' % (action, doc_id, result))
else:
print(doc_id)
# we manually create es repo document and update elasticsearch-py to include metadata
REPO_ACTIONS = [
{'_type': 'repos', '_id': 'elasticsearch', '_source': {
'owner': {'name': 'Shay Bannon', 'email': 'kimchy@gmail.com'},
'created_at': datetime(2010, 2, 8, 15, 22, 27),
'tags': ['search', 'distributed', 'lucene'],
'description': 'You know, for search.'}
},
{'_type': 'repos', '_id': 'elasticsearch-py', '_op_type': 'update', 'doc': {
'owner': {'name': 'Honza Král', 'email': 'honza.kral@gmail.com'},
'created_at': datetime(2013, 5, 1, 16, 37, 32),
'tags': ['elasticsearch', 'search', 'python', 'client'],
'description': 'For searching snakes.'}
},
]
if __name__ == '__main__':
# get trace logger and set level
tracer = logging.getLogger('elasticsearch.trace')
tracer.setLevel(logging.INFO)
tracer.addHandler(logging.FileHandler('/tmp/es_trace.log'))
# instantiate es client, connects to localhost:9200 by default
es = Elasticsearch()
# we load the repo and all commits
load_repo(es)
# run the bulk operations
success, _ = bulk(es, REPO_ACTIONS, index='git', raise_on_error=True)
print('Performed %d actions' % success)
# now we can retrieve the documents
es_repo = es.get(index='git', doc_type='repos', id='elasticsearch')
print('%s: %s' % (es_repo['_id'], es_repo['_source']['description']))
# update - add java to es tags
es.update(
index='git',
doc_type='repos',
id='elasticsearch',
body={
"script" : "ctx._source.tags += tag",
"params" : {
"tag" : "java"
}
}
)
# refresh to make the documents available for search
es.indices.refresh(index='git')
# and now we can count the documents
print(es.count(index='git')['count'], 'documents in index')
|