Skip to content

Commit

Permalink
improve performance about 5 times when importing
Browse files Browse the repository at this point in the history
  • Loading branch information
xros committed Mar 30, 2016
1 parent 7619811 commit d7b16eb
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 54 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Alexander Liu

jsonpyes
--------
* ![user interface](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot139.png)
* ![user interface](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot236.png)

#### Instructions:
There are 3 proccesses of importing raw JSON data to ElasticSearch
Expand All @@ -51,11 +51,17 @@ jsonpyes
* ```jsonpyes``` supports multi-threads when importing data to elasticsearch
![muti-threads enabled](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot132.png)

> As you can see these two containers have same docs loaded, if we use **_--thread 8_** it could be slightly faster.
> As you can see these two containers have same docs loaded, if we use **_--thread 8_** it could be several times faster, usually 5 to 10 times faster.
That really depends on your computer/server resources.
This was tested on a 4GB RAM / 2.4Ghz intel i5 Linux x64 laptop system.

* ![benmarks](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot133.png)
* No multi-threads

![benchmarks](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot237.png)

* With 8 threads and `jsonpyes` cuts files into pieces, then destributes to a worker fairly

![use helpers.bulk API with multi-threads](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot235.png)

* And it works.
![it works](https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot105.jpg)
Expand Down
24 changes: 17 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,24 @@ Instructions:
elasticsearch |muti-threads enabled|

As you can see these two containers have same docs loaded, if we use
***--thread 8*** it could be slightly faster. That really depends on
your computer/server resources. This was tested on a 4GB RAM /
2.4Ghz intel i5 Linux x64 laptop system.
***--thread 8*** it could be several times faster, usually 5 to 10
times faster. That really depends on your computer/server resources.
This was tested on a 4GB RAM / 2.4Ghz intel i5 Linux x64 laptop
system.

- .. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot133.png
:alt: benmarks
- No multi-threads

benmarks
.. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot237.png
:alt: benchmarks

benchmarks
- With 8 threads and ``jsonpyes`` cuts files into pieces, then
destributes to a worker fairly

.. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot235.png
:alt: use helpers.bulk API with multi-threads

use helpers.bulk API with multi-threads
- And it works. |it works|

3. Both validating and importing
Expand All @@ -89,7 +99,7 @@ Happy hacking!
.. |GitHub license| image:: https://img.shields.io/github/license/xros/jsonpyes.svg
:target: https://github.com/xros/jsonpyes/blob/master/LICENSE
.. |before image| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot106.jpg
.. |user interface| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot139.png
.. |user interface| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot236.png
.. |json valid| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot98.jpg
.. |json invalid| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot99.jpg
.. |no threads| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot102.jpg
Expand Down
28 changes: 19 additions & 9 deletions jsonpyes.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Metadata-Version: 1.1
Name: jsonpyes
Version: 1.2.22
Version: 1.2.23
Summary: Import JSON raw data to ElasticSearch using Python in one line of commands
Home-page: https://github.com/xros/jsonpyes
Author: Alexander Liu
Author-email: PGP E6DE7BEA
License: GPL V3
Download-URL: https://github.com/xros/jsonpyes/tarball/1.2.22
Download-URL: https://github.com/xros/jsonpyes/tarball/1.2.23
Description: json-py-es
==========

Expand Down Expand Up @@ -68,14 +68,24 @@ Description: json-py-es
elasticsearch |muti-threads enabled|

As you can see these two containers have same docs loaded, if we use
***--thread 8*** it could be slightly faster. That really depends on
your computer/server resources. This was tested on a 4GB RAM /
2.4Ghz intel i5 Linux x64 laptop system.
***--thread 8*** it could be several times faster, usually 5 to 10
times faster. That really depends on your computer/server resources.
This was tested on a 4GB RAM / 2.4Ghz intel i5 Linux x64 laptop
system.

- .. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot133.png
:alt: benmarks
- No multi-threads

benmarks
.. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot237.png
:alt: benchmarks

benchmarks
- With 8 threads and ``jsonpyes`` cuts files into pieces, then
destributes to a worker fairly

.. figure:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot235.png
:alt: use helpers.bulk API with multi-threads

use helpers.bulk API with multi-threads
- And it works. |it works|

3. Both validating and importing
Expand All @@ -98,7 +108,7 @@ Description: json-py-es
.. |GitHub license| image:: https://img.shields.io/github/license/xros/jsonpyes.svg
:target: https://github.com/xros/jsonpyes/blob/master/LICENSE
.. |before image| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot106.jpg
.. |user interface| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot139.png
.. |user interface| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot236.png
.. |json valid| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot98.jpg
.. |json invalid| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot99.jpg
.. |no threads| image:: https://raw.githubusercontent.com/xros/jsonpyes/master/static/snapshot102.jpg
Expand Down
125 changes: 91 additions & 34 deletions jsonpyes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/usr/bin/env python2
# encoding: utf-8

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import sys
Expand All @@ -7,6 +9,8 @@
import linecache
import jsonpyes_contrib
from jsonpyes_contrib.utils import count_file_lines as c_file_lines
import logging
import time

try:
import simplejson as json
Expand All @@ -27,6 +31,8 @@
3. After validating successfully, then import data to ElasticSearch
"""

es = Elasticsearch(['http://localhost:9200'], verify_certs=True)

def show_version():
print(version)

Expand All @@ -39,7 +45,7 @@ def show_help():
/ /_/ /___/ / /_/ / /| /_____/ ____/ / /_____/ /___ ___/ /
\____//____/\____/_/ |_/ /_/ /_/ /_____//____/
Import JSON to ElasticSearch using Python
Import raw JSON to ElasticSearch in one line of commands
-- Alexander Liu
"""
Expand All @@ -63,6 +69,10 @@ def show_help():
--version : Prints the version number
--help : Display this help
Notice:
It's recommended that you use multi-threads when importing data. Because it's way faster.
Examples:
Expand All @@ -82,11 +92,9 @@ def show_help():
""")



def validate_json_data(json_file=""):
"""
To validate whether the JSON data file is fully a JSON file without any format validation
To validate whether the JSON data file is fully a JSON file without any format invalidation
"""
if str(json_file)=="":
raise ValueError("No JSON file was input\n")
Expand All @@ -111,6 +119,65 @@ def validate_json_data(json_file=""):
return True


def worker_import_to_es_for_threading(data='a_raw_file.json', start_line=0, stop_line=0, es=es, index="", doc_type=""):
# NOTICE: the 'start_line' and 'stop_line' are all included. 'stop_line' can not be omitted.
actions = []
try_times = 0
es = es
# Using linecache to read big data in RAM
for i in range(start_line, stop_line + 1):
# Enhancement: This version of jsonpyes use `elastisearch.helpers.bulk`. Thanks to suggestion from piterjoin
row = linecache.getline(data, i)
try:
action = {
"_index": index,
"_type": doc_type,
"_source": json.loads(row)
}
except Exception, e:
logging.warning(str(e))
continue

actions.append(action)

# https://elasticsearch-py.readthedocs.org/en/master/helpers.html?highlight=helpers#module-elasticsearch.helpers
# In some cases, the size of 1000 docs can be at around 10 MB. And they are stored in RAM

if len(actions) >= 5000:
# try serveral times if ES rejects, is busy or down
while try_times < 5:
try:
# single chunk_bytes max upto 200 MB assumption
helpers.bulk(es, actions)
try_times = 0
break
except Exception, e:
try_times = try_times + 1
logging.warning("Can not send a group of actions(docs) to ElasticSearch using parallel_bulk, with error: " + str(e))
# wait for the ElasticSearch to response
time.sleep(5)
if try_times >= 5:
msg = "After trying " + str(try_times) + \
" times. It still can not send a group of actions(docs) to ElasticSearch using parallel_bulk, with error: " + str(e)
logging.error(msg)
try_times = 0

# delete previous docs
del actions[0:len(actions)]

# if we have leftovers, finish them
if len(actions) > 0:
try:
helpers.bulk(es, actions)
except Exception, e:
logging.warning("Can not send a group of actions(docs) to ElasticSearch using parallel_bulk, with error: " + str(e))
# delete previous docs
del actions[0:len(actions)]

# terminate this job
return


class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
Expand Down Expand Up @@ -397,36 +464,30 @@ def run():



def worker_import_to_es_for_threading(data=data, start_line=0, stop_line=0):
# NOTICE: the 'start_line' and 'stop_line' are all included. 'stop_line' can not be omitted.
es = Elasticsearch([bulk], verify_certs=True)
# Using linecache to read big data in RAM
for i in range(start_line, stop_line + 1):
es.index(
index=index, doc_type=doc_type,
#id=2,
body=json.loads(linecache.getline(data, i))
)



threads = []
for i in start_stop_line_list:
#t = StoppableThread(target=worker_import_to_es_for_threading, args=(data, i['start'], i['stop']))
t = threading.Thread(target=worker_import_to_es_for_threading, args=(data, i['start'], i['stop']))
t = threading.Thread(target=worker_import_to_es_for_threading,
args=(data, i['start'], i['stop'], Elasticsearch([bulk], verify_certs=True), index, doc_type, )
)
threads.append(t)
t.start()
t.join()


# stop all threads if interrupts
try:
while len(threading.enumerate()):
while len(threading.enumerate()) > 1:
pass
print("Successfully data imported!")
return
except KeyboardInterrupt:
for i in threads:
i.stop()
# for i in threads:
# i.stop()
print("Data importing interrupted!")
exit(0)
return

print("Successfully data imported!")
Expand Down Expand Up @@ -514,36 +575,32 @@ def worker_import_to_es_for_threading(data=data, start_line=0, stop_line=0):



def worker_import_to_es_for_threading(data=data, start_line=0, stop_line=0):
# NOTICE: the 'start_line' and 'stop_line' are all included. 'stop_line' can not be omitted.
es = Elasticsearch([bulk], verify_certs=True)
# Using linecache to read big data in RAM
for i in range(start_line, stop_line + 1):
es.index(
index=index, doc_type=doc_type,
#id=2,
body=json.loads(linecache.getline(data, i))
)



threads = []
for i in start_stop_line_list:
#t = StoppableThread(target=worker_import_to_es_for_threading, args=(data, i['start'], i['stop']))
t = threading.Thread(target=worker_import_to_es_for_threading, args=(data, i['start'], i['stop']))
t = threading.Thread(target=worker_import_to_es_for_threading,
args=(data, i['start'], i['stop'], Elasticsearch([bulk], verify_certs=True), index, doc_type, )
)
threads.append(t)
t.start()
t.join()


# stop all threads if interrupts
try:
while len(threading.enumerate()):
# there is at least one main threading for all threadings
while len(threading.enumerate()) > 1:
pass
print("Successfully data imported!")
return
except KeyboardInterrupt:
for i in threads:
i.stop()
print(len(threading.enumerate()))
# for i in threads:
# i.stop()
print("Data importing interrupted!")
exit(0)
return

print("Successfully data imported!")
Expand Down
2 changes: 1 addition & 1 deletion jsonpyes_contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""
# version number like 1.2.3a0 or 1.2.3, must have at least 2 parts, like 1.2
__version__ = '1.2.22'
__version__ = '1.2.23'
Binary file added static/snapshot235.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/snapshot236.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/snapshot237.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit d7b16eb

Please sign in to comment.