Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatible python2.x #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions pierky/p2es/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
# See full license in LICENSE file.

import json
try:
import signal
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
except:
pass
import requests
from requests.auth import HTTPDigestAuth, HTTPBasicAuth

Expand All @@ -16,7 +21,7 @@ def http(CONFIG, url, method="GET", data=None):
auth = HTTPDigestAuth(CONFIG['ES_UserName'], CONFIG['ES_Password'])
else:
raise P2ESError(
'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType'])
'Unexpected authentication type: {0}'.format(CONFIG['ES_AuthType'])
)

headers = {'Content-Type': 'application/x-ndjson'}
Expand All @@ -30,14 +35,14 @@ def http(CONFIG, url, method="GET", data=None):
elif method == "HEAD":
return requests.head(url, auth=auth, headers=headers)
else:
raise Exception("Method unknown: {}".format(method))
raise Exception("Method unknown: {0}".format(method))

# Sends data to ES.
# Raises exceptions: yes.
def send_to_es(CONFIG, index_name, data):
# HTTP bulk insert toward ES

url = '{}/{}/{}/_bulk'.format(
url = '{0}/{1}/{2}/_bulk'.format(
CONFIG['ES_URL'],
index_name,
CONFIG['ES_Type']
Expand All @@ -47,17 +52,17 @@ def send_to_es(CONFIG, index_name, data):
http_res = http(CONFIG, url, method="POST", data=data)
except Exception as e:
raise P2ESError(
'Error while executing HTTP bulk insert on {} - {}'.format(
'Error while executing HTTP bulk insert on {0} - {1}'.format(
index_name, str(e)
)
)

# Interpreting HTTP bulk insert response
if http_res.status_code != 200:
raise P2ESError(
'Bulk insert on {} failed - '
'HTTP status code = {} - '
'Response {}'.format(
'Bulk insert on {0} failed - '
'HTTP status code = {1} - '
'Response {2}'.format(
index_name, http_res.status_code, http_res.text
)
)
Expand All @@ -67,35 +72,35 @@ def send_to_es(CONFIG, index_name, data):
except Exception as e:
raise P2ESError(
'Error while decoding JSON HTTP response - '
'{} - '
'first 100 characters: {}'.format(
'{0} - '
'first 100 characters: {1}'.format(
str(e),
http_res.text[:100],
)
)

if json_res['errors']:
raise P2ESError(
'Bulk insert on {} failed to process '
'Bulk insert on {0} failed to process '
'one or more documents'.format(index_name)
)

# Checks if index_name exists.
# Returns: True | False.
# Raises exceptions: yes.
def does_index_exist(index_name, CONFIG):
url = '{}/{}'.format(CONFIG['ES_URL'], index_name)
url = '{0}/{1}'.format(CONFIG['ES_URL'], index_name)

try:
status_code = http(CONFIG, url, method="HEAD").status_code
if status_code == 200:
return True
if status_code == 404:
return False
raise Exception("Unexpected status code: {}".format(status_code))
raise Exception("Unexpected status code: {0}".format(status_code))
except Exception as err:
raise P2ESError(
'Error while checking if {} index exists: {}'.format(
'Error while checking if {0} index exists: {1}'.format(
index_name, str(err)
)
)
Expand All @@ -109,33 +114,33 @@ def create_index(index_name, CONFIG):
return

# index does not exist, creating it
tpl_path = '{}/{}'.format(CONFIG['CONF_DIR'], CONFIG['ES_IndexTemplateFileName'])
tpl_path = '{0}/{1}'.format(CONFIG['CONF_DIR'], CONFIG['ES_IndexTemplateFileName'])

try:
with open(tpl_path, "r") as f:
tpl = f.read()
except Exception as e:
raise P2ESError(
'Error while reading index template from file {}: {}'.format(
'Error while reading index template from file {0}: {1}'.format(
tpl_path, str(e)
)
)

url = '{}/{}'.format(CONFIG['ES_URL'], index_name)
url = '{0}/{1}'.format(CONFIG['ES_URL'], index_name)

last_err = None
try:
# using PUT
http_res = http(CONFIG, url, method="PUT", data=tpl)
except Exception as e1:
last_err = "Error using PUT method: {}".format(str(e1))
last_err = "Error using PUT method: {0}".format(str(e1))
# trying the old way
try:
http_res = http(CONFIG, url, method="POST", data=tpl)
except Exception as e2:
# something went wrong: does index exist anyway?
last_err += " - "
last_err += "Error using old way: {}".format(str(e2))
last_err += "Error using old way: {0}".format(str(e2))
pass

try:
Expand All @@ -144,7 +149,7 @@ def create_index(index_name, CONFIG):
except:
pass

err = "An error occurred while creating index {} from template {}: "
err = "An error occurred while creating index {0} from template {1}: "
if last_err:
err += last_err
else:
Expand Down
4 changes: 2 additions & 2 deletions pierky/p2es/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def expand_data_macros(s, dic):
if "$" in s:
res = s
for k in dic:
res = res.replace("${}".format(k), str(dic[k]))
res = res.replace("${0}".format(k), str(dic[k]))
return res
return s

Expand Down Expand Up @@ -118,7 +118,7 @@ def __init__(self, CONFIG, input_file, errors_queue, writer_queue):
for thread_idx in range(CONFIG['ReaderThreads']):
self.readers.append(
self.READER_THREAD_CLASS(
"reader{}".format(thread_idx),
"reader{0}".format(thread_idx),
CONFIG,
errors_queue,
writer_queue
Expand Down
42 changes: 21 additions & 21 deletions pierky/p2es/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def parse_conditions_list(c, d):
else:
raise P2ESError(
'Logical groups must begin with "AND" or "OR" '
'("{}" found)'.format(c[0])
'("{0}" found)'.format(c[0])
)
else:
# default to "AND" if not specified
Expand All @@ -60,7 +60,7 @@ def parse_conditions_dict(c, d, opfield):
op = c[k]

if not op in ('=', '>', '>=', '<', '<=', '!=', 'in', 'notin'):
raise P2ESError('Unexpected operator: "{}"'.format(op))
raise P2ESError('Unexpected operator: "{0}"'.format(op))
else:
if n is None:
n = k
Expand All @@ -69,7 +69,7 @@ def parse_conditions_dict(c, d, opfield):
raise P2ESError('Only one name/value pair allowed')

if op in ('in', 'notin') and not isinstance(v, list):
raise P2ESError('The "{}" operator requires a list'.format(op))
raise P2ESError('The "{0}" operator requires a list'.format(op))

if n is None:
raise P2ESError('Name/value pair expected')
Expand All @@ -94,7 +94,7 @@ def parse_conditions_dict(c, d, opfield):
elif op == 'notin':
return not d[n] in v
else:
raise P2ESError('Operator not implemented: "{}"'.format(op))
raise P2ESError('Operator not implemented: "{0}"'.format(op))

# Parse conditions c against data d.
# Return: True | False (conditions matched / did not match).
Expand All @@ -105,7 +105,7 @@ def parse_conditions(c, d, opfield='__op__'):
elif isinstance(c, dict):
return parse_conditions_dict(c, d, opfield)
else:
raise P2ESError('Unexpected object type {} from {}'.format(
raise P2ESError('Unexpected object type {0} from {1}'.format(
type(c), str(c)
))

Expand All @@ -116,64 +116,64 @@ def test_transformation(tr):
ret = True

try:
tr_det = 'Transformations matrix ({})'.format(transformation)
tr_det = 'Transformations matrix ({0})'.format(transformation)
except:
tr_det = 'Transformations matrix'

if 'Conditions' not in tr:
raise P2ESError('{}, "Conditions" is missing'.format(tr_det))
raise P2ESError('{0}, "Conditions" is missing'.format(tr_det))

if 'Actions' not in tr:
raise P2ESError('{}, "Actions" is missing'.format(tr_det))
raise P2ESError('{0}, "Actions" is missing'.format(tr_det))

try:
parse_conditions(tr['Conditions'], {})
parse_conditions(tr['Conditions'], {0})
except P2ESError as e:
raise P2ESError('{}, invalid "Conditions": {}'.format(tr_det, str(e)))
raise P2ESError('{0}, invalid "Conditions": {1}'.format(tr_det, str(e)))

for action in tr['Actions']:
if 'Type' not in action:
raise P2ESError('{}, "Type" is missing'.format(tr_det))
raise P2ESError('{0}, "Type" is missing'.format(tr_det))

tr_det += ', action type = {}'.format(action['Type'])
tr_det += ', action type = {0}'.format(action['Type'])

if action['Type'] not in ('AddField', 'AddFieldLookup', 'DelField'):
raise P2ESError('{}, "Type" unknown'.format(tr_det))
raise P2ESError('{0}, "Type" unknown'.format(tr_det))

if 'Name' not in action:
raise P2ESError('{}, "Name" is missing'.format(tr_det))
raise P2ESError('{0}, "Name" is missing'.format(tr_det))

if action['Type'] == 'AddField':
if 'Value' not in action:
raise P2ESError(
'{}, "Value" is missing for new field "{}"'.format(
'{0}, "Value" is missing for new field "{1}"'.format(
tr_det, action['Name']
)
)

if action['Type'] == 'AddFieldLookup':
if 'LookupFieldName' not in action:
raise P2ESError(
'{}, "LookupFieldName" is missing for '
'new field "{}"'.format(tr_det, action['Name'])
'{0}, "LookupFieldName" is missing for '
'new field "{1}"'.format(tr_det, action['Name'])
)
if 'LookupTable' in action and 'LookupTableFile' in action:
raise P2ESError(
'{}, only one from "LookupTable" and '
'{0}, only one from "LookupTable" and '
'"LookupTableFile" allowed'.format(tr_det)
)
if 'LookupTable' not in action and 'LookupTableFile' not in action:
raise P2ESError(
'{}, "LookupTable" and "LookupTableFile" missing '
'for new field "{}"'.format(tr_det, action['Name'])
'{0}, "LookupTable" and "LookupTableFile" missing '
'for new field "{1}"'.format(tr_det, action['Name'])
)
if 'LookupTableFile' in action:
try:
with open(action['LookupTableFile'], "r") as f:
action['LookupTable'] = json.load(f)
except Exception as e:
raise P2ESError(
'{}, error loading lookup table from {}: {}'.format(
'{0}, error loading lookup table from {1}: {2}'.format(
tr_det, action['LookupTableFile'], str(e)
)
)
Expand Down
2 changes: 1 addition & 1 deletion pierky/p2es/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(self, *args, **kwargs):
create_index(self.index_name, self.CONFIG)
except P2ESError as e:
raise P2ESError(
"Error while creating index {}: {}".format(
"Error while creating index {0}: {1}".format(
self.index_name, str(e)
)
)
Expand Down
Loading