alder_lake_bios/Board/Oem/L05AlderLakePMultiBoardPkg/LfcBpr/Lib/action_email.py

264 lines
12 KiB
Python

from Lib.lcfc_lib import *
from Lib.action_base import ActionBase
import win32com.client as win32
import time
import datetime
import pythoncom
import shutil # for test
from win32com import client
class ActionEmail(ActionBase):
email_send = 1
email_receive = 2
email_get = 3
email_modify = 4
email_exist_send = 5
# file
def __init__(self, sub_function, para_dict):
super().__init__()
self.sub_function = sub_function
if self.sub_function == ActionEmail.email_send:
self.email_receiver = para_dict['email_receiver']
self.email_cc = para_dict['email_cc']
self.email_bcc = para_dict['email_bcc']
self.email_subject = para_dict['email_subject']
self.email_content = para_dict['email_content']
self.email_attachment = para_dict['email_attachment']
elif self.sub_function == ActionEmail.email_receive:
self.email_sender = para_dict['email_sender']
self.email_subject = para_dict['email_subject']
self.email_deadline = para_dict['email_deadline']
self.outlook = None
self.inbox = None
self.messages = None
self.msg = None
self.loop_count = 0
elif self.sub_function == ActionEmail.email_get:
self.email_sender = para_dict['email_sender']
self.email_subject = para_dict['email_subject']
self.email_deadline = para_dict['email_deadline']
elif self.sub_function == ActionEmail.email_modify:
self.src = para_dict['src']
self.replace = para_dict['replace']
self.items = para_dict['items']
self.type = para_dict['type']
elif self.sub_function == ActionEmail.email_exist_send:
self.src = para_dict['src']
def action(self, count, action_str):
super().action(count, action_str)
if self.sub_function == ActionEmail.email_send:
self.send_mail()
elif self.sub_function == ActionEmail.email_receive:
self.recevice_mail()
elif self.sub_function == ActionEmail.email_get:
self.get_mail()
elif self.sub_function == ActionEmail.email_modify:
self.modify_mail()
elif self.sub_function == ActionEmail.email_exist_send:
self.send_exist_mail()
return
def get_mail(self):
# send email
pythoncom.CoInitialize() # This is a must for multi-thread
outlook = client.Dispatch('outlook.application').GetNamespace("MAPI")
mail = outlook.OpenSharedItem(self.dir)
return mail
def modify_mail(self):
pythoncom.CoInitialize() # This is a must for multi-thread
outlook = client.Dispatch('outlook.application').GetNamespace("MAPI")
mail = outlook.OpenSharedItem(self.src)
mail.Subject = self.items['Subject']
if DEBUG_MODE == False:
mail.To = self.items['To']
mail.CC = self.items['CC']
else:
mail.To = 'feng.gu@lcfuturecenter.com'
mail.CC = ''
HTMLBody = mail.HTMLBody
for key, value in self.replace.items():
HTMLBody = HTMLBody.replace(key,value,1)
mail.HTMLBody = HTMLBody
mail.Save()
return
def send_exist_mail(self):
pythoncom.CoInitialize() # This is a must for multi-thread
outlook = client.Dispatch('outlook.application').GetNamespace("MAPI")
mail = outlook.OpenSharedItem(self.src)
mail.Send()
return
def recevice_mail(self):
if DEBUG_MODE and (not DEBUG_MAIL) :
if g.insyde:
if g.formal_release:
self.action_output['file'] = shutil.copy(os.path.join(g.tool_path_abs,
r'Tool\Temp\signed_Insyde.7z'),
os.path.join(self.output_folder, 'Signed.7z'))
else:
self.action_output['file'] = shutil.copy(os.path.join(g.tool_path_abs,
r'Tool\Temp\signed_test_Insyde.7z'),
os.path.join(self.output_folder, 'Signed.7z'))
else:
if g.formal_release:
self.action_output['file'] = shutil.copy(os.path.join(g.tool_path_abs,
r'Tool\Temp\signed_Phoenix.7z'),
os.path.join(self.output_folder, 'Signed.7z'))
else:
self.action_output['file'] = shutil.copy(os.path.join(g.tool_path_abs,
r'Tool\Temp\signed_test_Phoenix.7z'),
os.path.join(self.output_folder, 'Signed.7z'))
return
# receive email
pythoncom.CoInitialize() # This is a must for multi-thread
self.outlook =win32.Dispatch("outlook.Application").GetNamespace("MAPI")
self.inbox = self.outlook.GetDefaultFolder("6")
send_time = self.email_deadline
#start_wait_time = datetime.datetime.now()
self. loop_count = 0
# Get mail one by one from the last
if hasattr(self.inbox, "Items"):
self.messages = self.inbox.Items
DebugPrint('total messages: ' + str(len(self.messages)))
if hasattr(self.messages, "GetLast"):
self.msg = self.messages.GetLast()
while True:
print('expect email_subject = %s' % str(self.email_subject))
DebugPrint('Send mail time is = %s' % str(send_time))
if hasattr(self.msg, 'Subject'):
if self.msg.Class == 43:
if self.msg.SenderEmailType == "EX":
if self.msg.Sender != None and self.msg.Sender.GetExchangeUser() != None:
sender = self.msg.Sender.GetExchangeUser().PrimarySmtpAddress
else:
sender = self.msg.SenderEmailAddress
DebugPrint('-inbox subject is = %s' % str(self.msg.Subject))
DebugPrint('-inbox subject receieve time is =%s' % self.msg.ReceivedTime.replace(
tzinfo=None))
# Receivded Mail is new then the sent mail
if self.msg.ReceivedTime.replace(tzinfo=None) > send_time:
print('self.msg.ReceivedTime.replace(tzinfo=None) big')
if DEBUG_MODE and DEBUG_MAIL:
condition = self.msg.Subject == self.email_subject
else:
condition = self.msg.Subject == self.email_subject and sender == self.email_sender
if condition:
'''
find_line = findline(self.msg.HTMLBody.split('<BR>'), 'Sent:')
previous_sent_time = SplitString(find_line, r': ')
sent_time_in_recieve = mail_time(previous_sent_time)
fomal_receive_time = sent_time_in_recieve.format_time()
'''
#if fomal_receive_time <= datetime.datetime.strptime(start_wait_time.strftime('%Y-%m-%d %H:%M'), "%Y-%m-%d %H:%M") and fomal_receive_time >= datetime.datetime.strptime(send_time.strftime('%Y-%m-%d %H:%M'), "%Y-%m-%d %H:%M"):
DebugPrint('find sign back mail !!!!!')
for _i in self.msg.Attachments:
_i.SaveAsFile(
os.path.join(g.working_path_abs, self.output_folder, 'Signed.7z'))
self.action_output['file'] = \
os.path.join(self.output_folder, 'Signed.7z')
return
if not self.action_output['file']:
error_handle(1, "Email received, can't find attachment")
self.status = STATUS_ERROR
else:
if hasattr(self.messages, "GetPrevious"):
self.msg = self.messages.GetPrevious()
DebugPrint('GetPrevious1')
continue
else:
self.delay()
self.get_last()
else:
if hasattr(self.messages, "GetPrevious"):
self.msg = self.messages.GetPrevious()
DebugPrint('GetPrevious2')
if self.loop_count >= 15:
print(' 30 minutes passed, still not get email, here are some suggestions:')
print(' 1. Check your Outlook Outbox and Inbox manually')
print(' 2. Check your network status')
print(' 3. If the sign email was received successfully but tool still can not get it,')
print(
' please check Outlook setting if you have wrongly set local .pst file as default folder')
print(' File->Account Setting->Data File')
else:
print('inbox is empty')
self.delay()
self.get_last()
DebugPrint('get_last2')
def get_last(self):
self.inbox = self.outlook.GetDefaultFolder("6")
if hasattr(self.inbox, "Items"):
self.messages = self.inbox.Items
if hasattr(self.messages, "GetLast"):
self.msg = self.messages.GetLast()
def get_previous(self):
self.inbox = self.outlook.GetDefaultFolder("6")
if hasattr(self.inbox, "Items"):
self.messages = self.inbox.Items
if hasattr(self.messages, "GetPrevious"):
self.msg = self.messages.GetPrevious()
def delay(self):
# Receivded Mail is old then the sent mail
print(str(datetime.datetime.now().strftime("%H:%M:%S")),
'Email not found, delay 120s to check again')
print('.......................................................................................................')
self.loop_count += 1
time.sleep(120)
def send_mail(self):
# send email
if DEBUG_MODE and (not DEBUG_MAIL):
return
pythoncom.CoInitialize() # This is a must for multi-thread
outlook = win32.Dispatch("outlook.Application")
mail = outlook.CreateItem(0)
mail.To = self.email_receiver
mail.CC = self.email_cc
mail.BCC = self.email_bcc
mail.Subject = self.email_subject
for _i in self.email_attachment:
mail.Attachments.Add(_i)
if self.email_content:
with open(self.email_content) as file:
mail_content = file.read() # 读取html文件中的内容
else:
mail_content = ''
mail.HTMLBody = mail_content
result = mail.Send()
print('result = %s' %result)
return