发布于: Feb 7, 2022

以下示例部分的完整Python代码可参见:https://github.com/Iwillsky/lightHPC

  • Lambda中的处理函数

Lambda服务的控制台中新一个名为“lightScheduler”的处理函数,Runtime 类型选择“Python3.7”,权限配置上分配一个包含 EC2DynamoDBSNS 服务足够操作权限的角色。

处理函数中的事务表写入部分的代码如下。

table = dynamodb.Table('tblTrans2')
    
    #Check ahead to avoid redo again
    try:
        existRec = table.get_item(
            Key={'srcFilename': srcFilename}
        )
        strIndi = 'Item'
        if ('Item' in existRec):
            print('exist')
            existItem = existRec['Item']['bucketname']
            existLog = existRec['Item']['numlog']+'+1'
            response = table.update_item(
                Key={'srcFilename': srcFilename},
                UpdateExpression="set numlog = :a",
                ExpressionAttributeValues={':a': existLog},
                ReturnValues="UPDATED_NEW"
            )
        else:
            #======= append a trans log  =================
            response = table.put_item(
                Item={
                    'srcFilename': srcFilename,
                    'timeBoarding': localtime,
                    'bucketname': bucketstr, 
                    'trigTime': trigTime,
                    'trigName': trigName,
                    'numlog': numrec
                 }
            )

处理函数启动 Spot 实例的代码如下,启动时将对应样本文件名通过实例 Tag 值传递至实例。

#======= spin a spot instance =================
            ec2res = boto3.resource('ec2',region_name = spot_region)
            
            response = ec2res.create_instances(
                InstanceType=wantInstaceType,
                KeyName='myLabNV',
                MinCount=1,
                MaxCount=1,
                InstanceMarketOptions={
                    'MarketType': 'spot',
                    'SpotOptions': {
                        'MaxPrice': bidPrice,
                        'SpotInstanceType': 'one-time',
                        'BlockDurationMinutes': 60,
                        'InstanceInterruptionBehavior': 'terminate'
                    }
                },
                LaunchTemplate={
                    'LaunchTemplateId': tmplID,
                    'Version': tmplVer
                },
                TagSpecifications=[
                    {
                        'ResourceType': 'instance',
                        'Tags': [
                            {
                                'Key': 'Name',
                                'Value': 'testSpot-'+srcFilename 
                            },
                            {
                                'Key': 'srcID',
                                'Value': srcFilename 
                            }
                        ]
                    }
                ]
            )

代码拷贝完成后,在右上角 Actions 菜单内选择 “Publish new version”

  • S3中的触发设置

S3控制台 Bucket 属性页面的  Events  选项中配置事件驱动发送给 “Lambda Function”,并指定  Lambda 处理函数名为 LightScheduler

  • 计算节点的执行过程

先启动一个 Amazon Linux2 的实例并拷贝好 jobrun.py 脚本文件,手动创建好计算节点的AMI镜像。再进行启动模板的配置,EC2 的控制台界面中 Launch Template 的配置参数如下图,设置好启动对应的 AMI 镜像、实例类型和安全组。

在启动模板 Advanced details User Data 中填入加载的启动脚本,如下图。

Demo 测试脚本用随机延迟来模拟处理耗时并更新计算进度,如希望准确模拟工作负载的情况,可将模拟延迟改为对应脚本的调用。代码示例如下所示。

ec2res = boto3.resource('ec2','us-east-1')
curInstance = ec2res.Instance(curInstanceId)
tags = curInstance.tagsfor tag in tags:
    if tag["Key"] == 'srcID':
        transIDstr = tag["Value"]

tmCost = 0for i in range(1,numCircle):
    tl = random.randint(12,20)
    tmCost = tmCost+tl
    tmCoststr = str(tmCost)
    strProgress = str(i*100/numCircle)+'%'
    #===== Simluate time cost block =========# You can add the molecular matching analysis or genetic analysis workload here
time.sleep( tl )print(i, tl)#update progress
    response = table.update_item(
        Key={ 'srcFilename': transIDstr },
        UpdateExpression="set strIndiProgress = :u, execInstanceID = :i, timeCost=:c",
        ExpressionAttributeValues={
            ':u': strProgress,
            ':i': curInstanceId,
            ':c': tmCoststr
        },
        ReturnValues="UPDATED_NEW")
单个计算事务完成写完结果文件后,更新事务状态值并终止本实例,节省资源占有,代码如下图。
#push result file
outputFile = 's3://xxxrsltbucket1/rslt_'+transIDstr
os.system('aws s3 cp /home/ec2-user/test/rslt.dat '+outputFile)
#update rslt info
response = table.update_item(
    Key={
        'srcFilename': transIDstr
    },
    UpdateExpression="set strIndiProgress = :u, rsltFilename = :o",
    ExpressionAttributeValues={
        ':u': '100%',
        ':o': outputFile
    },
    ReturnValues="UPDATED_NEW")
time.sleep(1)
response = curInstance.terminate('false')

处理完通知。

整个任务处理完成后发送 SNS 通知到对应的 SNS topic 目标,代码如下。

if ('Item' in missionRec):
        numTotal = missionRec['Item']['transNumber']
        numSucc = missionRec['Item']['transOK']
        listArr = missionRec['Item']['rsltList']
        noteMail = missionRec['Item']['notifyEmail']
     
        if (rsltFilename not in listArr):
            listArr.append(rsltFilename)
            numSucc = str(int(numSucc)+1)
            if ( int(numSucc)>=int(numTotal) and noteMail!='Yes'):
                #Send noti Mail
                strMsg ='HPC mission'+ idstr +'accomplished at'+trigTime +'.'
                sns = boto3.resource('sns')
                topic = sns.Topic('arn:aws:sns:us-east-1:1xxxxxxx55:NotifyMe')
                mailmess = topic.publish( Message= strMsg)
                print(mailmess)
                noteMail = 'Yes'
                    
            response = table.update_item(
                Key={ 'timePush' : idstr },
                UpdateExpression="set transOK = :succ, rsltList = :rlist, notifyEmail = :m",
                ExpressionAttributeValues={
                    ':succ': numSucc, ':rlist': listArr, ':m': noteMail
                },
                ReturnValues="UPDATED_NEW"
            )
  • 配置接口的开放

任务参数配置的接口在 API Gateway 中的 Method Execution 页面的配置如下图。

  • 运行截图

系统搭建完成后,测试任务运行时 tblMission tblTrans 表的数据记录如下图所示。

如果计算资源要求比较高,记得 Amazon Web Services 账号中实例运行数的 Limit 限制提前申请做提升。

扩展讨论

  • Spot 实例中断的处理

该架构中并未采取轮询式中断通知检测,这是基于发生中断概率非常低的实际,如果一个计算事务在过程中被中断,可通过 tblTrans 表中记录检查的方式再重新触发。如果事务切分颗粒度不够小的情况下实际项目中还可设置检查点的方式来减少重算的工作量。

  • 自定义成本策略

原型实现时仅采用价格符合的策略,实际项目中如果 Spot 资源数不足以满足计算需求时,而计算时限又有要求的情况下,可采取按 Spot 和按需实例混搭配置 EC2 Fleet 的方式来满足,可在 Lambda 处理函数中增加这部分处理逻辑。

  • 扩展日志保存功能

原型中的 DynamoDB 库表主要是当前计算状态的记录与更新,实际项目如果需要日志记录的功能,可在 Lambda 处理函数、计算节点脚本和 Lambda 结果处理函数三处分别增加 Cloud Watch Log 记录的推送。

相关文章