diff --git a/cmdb-api/api/lib/common_setting/employee.py b/cmdb-api/api/lib/common_setting/employee.py index 6cf27f8f..1f9e3a8e 100644 --- a/cmdb-api/api/lib/common_setting/employee.py +++ b/cmdb-api/api/lib/common_setting/employee.py @@ -212,159 +212,6 @@ def get_employee_count(block_status): *criterion ).count() - @staticmethod - def import_employee(employee_list): - return CreateEmployee().batch_create(employee_list) - - @staticmethod - def get_export_employee_list(block_status): - if block_status >= 0: - employees = Employee.get_by(block=block_status, to_dict=False) - else: - employees = Employee.get_by(to_dict=False) - - all_departments = Department.get_by(to_dict=False) - d_id_map = {d.department_id: d.department_name for d in all_departments} - e_id_map = {e.employee_id: e.nickname for e in employees} - - export_columns_map = { - 'username': "用户名", - 'nickname': '姓名', - 'email': '邮箱', - 'department_name': '部门', - 'sex': '性别', - 'mobile': '手机号', - 'position_name': '岗位', - 'nickname_direct_supervisor': '直属上级', - 'last_login': '上次登录时间', - } - - data_list = [] - for e in employees: - department_name = d_id_map.get(e.department_id, '') - nickname_direct_supervisor = e_id_map.get(e.direct_supervisor_id, '') - try: - last_login = str(e.last_login) if e.last_login else '' - except: - last_login = '' - data = e.to_dict() - data['last_login'] = last_login - data['department_name'] = department_name - data['nickname_direct_supervisor'] = nickname_direct_supervisor - - tmp = {export_columns_map[k]: data[k] for k in export_columns_map.keys()} - - data_list.append(tmp) - - return data_list - - @staticmethod - def batch_employee(column_name, column_value, employee_id_list): - if not column_value: - abort(400, ErrFormat.value_is_required) - if column_name in ['password', 'block']: - return EmployeeCRUD.batch_edit_password_or_block_column(column_name, employee_id_list, column_value, True) - - elif column_name in ['department_id']: - return EmployeeCRUD.batch_edit_employee_department(employee_id_list, column_value) - - elif column_name in [ - 'direct_supervisor_id', 'position_name' - ]: - return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, False) - - else: - abort(400, ErrFormat.column_name_not_support) - - @staticmethod - def batch_edit_employee_department(employee_id_list, column_value): - err_list = [] - employee_list = [] - for _id in employee_id_list: - try: - existed = EmployeeCRUD.get_employee_by_id(_id) - employee = dict( - e_acl_rid=existed.acl_rid, - department_id=existed.department_id - ) - employee_list.append(employee) - existed.update(department_id=column_value) - - except Exception as e: - err_list.append({ - 'employee_id': _id, - 'err': str(e), - }) - from api.tasks.common_setting import edit_employee_department_in_acl - edit_employee_department_in_acl.apply_async( - args=(employee_list, column_value, current_user.uid), - queue=COMMON_SETTING_QUEUE - ) - return err_list - - @staticmethod - def batch_edit_password_or_block_column(column_name, employee_id_list, column_value, is_acl=False): - if column_name == 'block': - err_list = [] - success_list = [] - for _id in employee_id_list: - try: - employee = EmployeeCRUD.edit_employee_block_column( - _id, is_acl, **{column_name: column_value}) - success_list.append(employee) - except Exception as e: - err_list.append({ - 'employee_id': _id, - 'err': str(e), - }) - return err_list - else: - return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, is_acl) - - @staticmethod - def batch_edit_column(column_name, employee_id_list, column_value, is_acl=False): - err_list = [] - for _id in employee_id_list: - try: - EmployeeCRUD.edit_employee_single_column( - _id, is_acl, **{column_name: column_value}) - except Exception as e: - err_list.append({ - 'employee_id': _id, - 'err': str(e), - }) - - return err_list - - @staticmethod - def edit_employee_single_column(_id, is_acl=False, **kwargs): - existed = EmployeeCRUD.get_employee_by_id(_id) - - if is_acl: - return edit_acl_user(existed.acl_uid, **kwargs) - - try: - for column in employee_pop_columns: - if kwargs.get(column): - kwargs.pop(column) - - return existed.update(**kwargs) - except Exception as e: - return abort(400, str(e)) - - @staticmethod - def edit_employee_block_column(_id, is_acl=False, **kwargs): - existed = EmployeeCRUD.get_employee_by_id(_id) - value = get_block_value(kwargs.get('block')) - if value is True: - check_department_director_id_or_direct_supervisor_id(_id) - - if is_acl: - kwargs['block'] = value - edit_acl_user(existed.acl_uid, **kwargs) - data = existed.to_dict() - return data - @staticmethod def check_email_unique(email, _id=0): criterion = [ diff --git a/cmdb-api/api/views/common_setting/employee.py b/cmdb-api/api/views/common_setting/employee.py index 804f54bc..56adf8b7 100644 --- a/cmdb-api/api/views/common_setting/employee.py +++ b/cmdb-api/api/views/common_setting/employee.py @@ -145,29 +145,3 @@ def get(self): result = EmployeeCRUD.get_all_position() return self.jsonify(result) - -class EmployeeViewExportExcel(APIView): - url_prefix = (f'{prefix}/export_all',) - - def get(self): - excel_filename = 'all_employee_info.xlsx' - excel_path = current_app.config['UPLOAD_DIRECTORY_FULL'] - excel_path_with_filename = os.path.join(excel_path, excel_filename) - - block_status = int(request.args.get('block_status', -1)) - data_list = EmployeeCRUD.get_export_employee_list(block_status) - - headers = data_list[0].keys() - from openpyxl import Workbook - - wb = Workbook() - ws = wb.active - # insert header - for col_num, col_data in enumerate(headers, start=1): - ws.cell(row=1, column=col_num, value=col_data) - - for row_num, row_data in enumerate(data_list, start=2): - for col_num, col_data in enumerate(row_data.values(), start=1): - ws.cell(row=row_num, column=col_num, value=col_data) - wb.save(excel_path_with_filename) - return send_from_directory(excel_path, excel_filename, as_attachment=True)