1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
# -*- coding:utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import defaultdict
from datetime import datetime, time
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import ValidationError
from odoo.osv.expression import AND
from odoo.tools import format_date
class HrLeaveType(models.Model):
_inherit = 'hr.leave.type'
work_entry_type_id = fields.Many2one('hr.work.entry.type', string='Work Entry Type')
class HrLeave(models.Model):
_inherit = 'hr.leave'
def _prepare_resource_leave_vals(self):
vals = super(HrLeave, self)._prepare_resource_leave_vals()
vals['work_entry_type_id'] = self.holiday_status_id.work_entry_type_id.id
return vals
def _cancel_work_entry_conflict(self):
"""
Creates a leave work entry for each hr.leave in self.
Check overlapping work entries with self.
Work entries completely included in a leave are archived.
e.g.:
|----- work entry ----|---- work entry ----|
|------------------- hr.leave ---------------|
||
vv
|----* work entry ****|
|************ work entry leave --------------|
"""
if not self:
return
# 1. Create a work entry for each leave
work_entries_vals_list = []
for leave in self:
contracts = leave.employee_id.sudo()._get_contracts(leave.date_from, leave.date_to, states=['open', 'close'])
for contract in contracts:
# Generate only if it has aleady been generated
if leave.date_to >= contract.date_generated_from and leave.date_from <= contract.date_generated_to:
work_entries_vals_list += contracts._get_work_entries_values(leave.date_from, leave.date_to)
new_leave_work_entries = self.env['hr.work.entry'].create(work_entries_vals_list)
if new_leave_work_entries:
# 2. Fetch overlapping work entries, grouped by employees
start = min(self.mapped('date_from'), default=False)
stop = max(self.mapped('date_to'), default=False)
work_entry_groups = self.env['hr.work.entry']._read_group([
('date_start', '<', stop),
('date_stop', '>', start),
('employee_id', 'in', self.employee_id.ids),
], ['employee_id'], ['id:recordset'])
work_entries_by_employee = {
employee.id: work_entries
for employee, work_entries in work_entry_groups
}
# 3. Archive work entries included in leaves
included = self.env['hr.work.entry']
overlappping = self.env['hr.work.entry']
for work_entries in work_entries_by_employee.values():
# Work entries for this employee
new_employee_work_entries = work_entries & new_leave_work_entries
previous_employee_work_entries = work_entries - new_leave_work_entries
# Build intervals from work entries
leave_intervals = new_employee_work_entries._to_intervals()
conflicts_intervals = previous_employee_work_entries._to_intervals()
# Compute intervals completely outside any leave
# Intervals are outside, but associated records are overlapping.
outside_intervals = conflicts_intervals - leave_intervals
overlappping |= self.env['hr.work.entry']._from_intervals(outside_intervals)
included |= previous_employee_work_entries - overlappping
overlappping.write({'leave_id': False})
included.write({'active': False})
def write(self, vals):
if not self:
return True
skip_check = not bool({'employee_id', 'state', 'request_date_from', 'request_date_to'} & vals.keys())
employee_ids = self.employee_id.ids
if 'employee_id' in vals and vals['employee_id']:
employee_ids += [vals['employee_id']]
# We check a whole day before and after the interval of the earliest
# request_date_from and latest request_date_end because date_{from,to}
# can lie in this range due to time zone reasons.
# (We can't use date_from and date_to as they are not yet computed at
# this point.)
start_dates = self.filtered('request_date_from').mapped('request_date_from') + [fields.Date.to_date(vals.get('request_date_from', False)) or datetime.max.date()]
stop_dates = self.filtered('request_date_to').mapped('request_date_to') + [fields.Date.to_date(vals.get('request_date_to', False)) or datetime.min.date()]
start = datetime.combine(min(start_dates) - relativedelta(days=1), time.min)
stop = datetime.combine(max(stop_dates) + relativedelta(days=1), time.max)
with self.env['hr.work.entry']._error_checking(start=start, stop=stop, skip=skip_check, employee_ids=employee_ids):
return super().write(vals)
@api.model_create_multi
def create(self, vals_list):
employee_ids = {v['employee_id'] for v in vals_list if v.get('employee_id')}
# We check a whole day before and after the interval of the earliest
# request_date_from and latest request_date_end because date_{from,to}
# can lie in this range due to time zone reasons.
# (We can't use date_from and date_to as they are not yet computed at
# this point.)
start_dates = [fields.Date.to_date(v.get('request_date_from')) for v in vals_list if v.get('request_date_from')]
stop_dates = [fields.Date.to_date(v.get('request_date_to')) for v in vals_list if v.get('request_date_to')]
start = datetime.combine(min(start_dates, default=datetime.max.date()) - relativedelta(days=1), time.min)
stop = datetime.combine(max(stop_dates, default=datetime.min.date()) + relativedelta(days=1), time.max)
with self.env['hr.work.entry']._error_checking(start=start, stop=stop, employee_ids=employee_ids):
return super().create(vals_list)
def action_reset_confirm(self):
start = min(self.mapped('date_from'), default=False)
stop = max(self.mapped('date_to'), default=False)
with self.env['hr.work.entry']._error_checking(start=start, stop=stop, employee_ids=self.employee_id.ids):
return super().action_reset_confirm()
def _get_leaves_on_public_holiday(self):
return super()._get_leaves_on_public_holiday().filtered(
lambda l: l.holiday_status_id.work_entry_type_id.code not in ['LEAVE110', 'LEAVE210', 'LEAVE280'])
def _validate_leave_request(self):
super(HrLeave, self)._validate_leave_request()
self.sudo()._cancel_work_entry_conflict() # delete preexisting conflicting work_entries
return True
def action_refuse(self):
"""
Override to archive linked work entries and recreate attendance work entries
where the refused leave was.
"""
res = super(HrLeave, self).action_refuse()
self._regen_work_entries()
return res
def _action_user_cancel(self, reason):
res = super()._action_user_cancel(reason)
self.sudo()._regen_work_entries()
return res
def _regen_work_entries(self):
"""
Called when the leave is refused or cancelled to regenerate the work entries properly for that period.
"""
work_entries = self.env['hr.work.entry'].sudo().search([('leave_id', 'in', self.ids)])
work_entries.write({'active': False})
# Re-create attendance work entries
vals_list = []
for work_entry in work_entries:
vals_list += work_entry.contract_id._get_work_entries_values(work_entry.date_start, work_entry.date_stop)
self.env['hr.work.entry'].create(vals_list)
def _compute_can_cancel(self):
super()._compute_can_cancel()
cancellable_leaves = self.filtered('can_cancel')
work_entries = self.env['hr.work.entry'].sudo().search([('state', '=', 'validated'), ('leave_id', 'in', cancellable_leaves.ids)])
leave_ids = work_entries.mapped('leave_id').ids
for leave in cancellable_leaves:
leave.can_cancel = leave.id not in leave_ids
|