PerfectHadoop: MapReduce 历史服务器

本项目封装了MapReduce 历史服务器 REST API:

  • MapReduceHistory(): 用于访问 MapReduce 历史服务器获取应用程序的 Swift 对象

连接到MapReduce历史服务器

如果希望连接到 Hadoop 的 Map Reduce 历史服务器,请初始化一个MapReduceHistory()对象,并确保参数准确无误:

// 该连接只能保证基本的操作权限
let history = MapReduceHistory(host: "mapReduceHistory.somedomain.com", port: 19888)

或者连接到 Hadoop Map / Reduce 历史服务器时包括一个有效的用户名:

// 如有需要,在此处增加用户名
let history = MapReduceHistory(host: "mapReduceHistory.somedomain.com", port: 19888, user: "your user name")

用户身份验证

如果您的服务器需要 Kerberos 验证,请增加身份验证选项:

// 设置当前验证方式为 Kerberos
let history = MapReduceHistory(host: "mapReduceHistory.somedomain.com", port: 19888, user: "username", auth: .krb5)

MapReduceHistory 对象参数详解

变量 类型 描述
service String Web 请求的类型 - http / https
host String 目标Hadoop Map Reduce 历史服务器主机名或者IP地址
port Int 默认的历史服务器端口,默认为 19888
auth Authorization 身份认证方式,.off 表示无要求(默认),.krb5 则表示采用Kerberos验证
proxyUser String 代理服务器用户信息(可选项)
apibase String 目标历史服务器的接口函数地址。⚠️只有⚠️在您的配置中其目标接口函数地址不是/ws/v1/history的时候才需要填写。
timeout Int 连接超时限制,单位是秒。默认是零,也就是传输过程中始终保持等待。

获取服务器基本配置信息

调用 checkInfo() 能够获得历史服务器的基本信息,返回的数据将保存在一个 MapReduceHistory.Info 结构之内:

guard let inf = try history.checkInfo() else {
    // 获取信息不成功
}
print(inf.startedOn)
print(inf.hadoopVersion)
print(inf.hadoopBuildVersion)
print(inf.hadoopVersionBuiltOn)

MapReduceHistory.Info 数据结构

变量 类型 描述
startedOn Int 历史服务器启动时的时间戳(Unix纪元时间,单位是秒)
hadoopVersion String Hadoop 通用函数库的版本
hadoopBuildVersion String Hadoop 通用函数库编译信息,包括版本号,用户和校验码
hadoopVersionBuiltOn String Hadoop 通用函数库编译时间戳

MapReduce 作业历史档案

调用 checkJobs() 可以返回一个以 Job 数据结构为元素的数组。 该数组列出了所有执行完毕的 MapReduce 作业清单;⚠️注意⚠️,受限于Hadoop的版本,目前暂时不能返回任务有关的所有参数。最简单的形式是调用 checkJobs(),不带任何参数即可。此时将返回所有变量:

let jobs = try history.checkJobs()
// 如果不希望把有用的没用的都抓过来,可以通过设置下列参数筛选结果:
// let jobs = try his.checkJobs(state: .SUCCEEDED, queue: "default", limit: 10)
jobs.forEach { j in
  print(j.id) // 作业编号
  print(j.name) // 作业名称
  print(j.queue) // 作业队列
  print(j.state) // 作业状态
}

checkJobs() 参数说明

变量 类型 描述
user String 用户名
state APP.FinalStatus 作业状态 —— UNDEFINED(未定义), SUCCEEDED(成功), FAILED(失败) 和 KILLED(终止)
queue String 队列名称
limit Int 本次操作最多返回的任务数量
startedTimeBegin Int 从某个时刻之后开始的所有作业,采用Unix时间,单位是毫秒。
startedTimeEnd Int 从某个时间段内运行的所有作业(与startedTimeBegin配合),采用Unix时间,单位是毫秒。
finishedTimeBegin Int 所有在某个时间段内结束的作业,该参数为时间段的开始时间,采用Unix时间,单位是毫秒。
finishedTimeEnd Int 所有在某个时间段内结束的作业,该参数为时间段的结束时间,采用Unix时间,单位是毫秒。

Job (作业)数据结构

变量 类型 描述
id String 作业编码
name String 作业名称
queue String 作业所提交到的目标队列
user String 用户名
state String 作业状态 - 有效值包括 NEW(新建), INITED(完成初始化), RUNNING(正在运行), SUCCEEDED(执行成功), FAILED(执行失败), KILL_WAIT(正在结束处理), KILLED(已被关闭), ERROR(错误)
diagnostics String 诊断信息
submitTime Int 作业提交时刻(毫秒,Unix纪元)
startTime Int 作业启动时刻(毫秒,Unix纪元)
finishTime Int 作业结束时刻(毫秒,Unix纪元)
mapsTotal Int 映射(map)总数
mapsCompleted Int 已完成映射(map)数量
reducesTotal Int 归并(reduce)总数
reducesCompleted Int 已完成归并(reduce)数量
uberized Boolean 该作业是否为uber类型——完全在Application Master应用程序主服务器上完成
avgMapTime Int 每个分项映射作业的平均耗时(毫秒)
avgReduceTime Int 每个分项归并作业的平均耗时(毫秒)
avgShuffleTime Int 每个分项作业并行分派(shuffle)的平均耗时(毫秒)
avgMergeTime Int 每个分享作业结果合并(merge)的平均耗时(毫秒)
failedReduceAttempts Int 归并操作(reduce)失败次数统计
killedReduceAttempts Int 归并操作(reduce)被终止的总数统计
successfulReduceAttempts Int 归并操作(reduce)尝试成功的次数
failedMapAttempts Int 失败的映射(map)尝试次数
killedMapAttempts Int 被终止的映射(map)尝试次数
successfulMapAttempts Int 成功的映射(map)尝试次数
acls [ACL] 访问控制列表(ACL)数组

访问控制列表对象 ACL 数据结构

变量 类型 描述
name String 安全访问列表名称
value String 安全访问列表项的值

检查特定作业(Job)

可以通过作业代码(Job ID)检查历史服务器上的特定作业信息:

let job = try history.checkJob(jobId: "job_1484231633049_0005")

详见 作业(Job)数据结构

作业尝试 JobAttempt

以下命令能够获得关于某个作业的尝试列表信息:

guard let attempts = try history.checkJobAttempts(jobId: "job_1484231633049_0005") else {
    // 出错了
}
attempts.forEach { attempt in
    print(attempt.id)
    print(attempt.containerId)
    print(attempt.nodeHttpAddress)
    print(attempt.nodeId)
    print(attempt.startTime)
}

JobAttempt 数据结构

变量 类型 描述
id String 作业尝试 (Job Attempt) 编码
nodeId String 该尝试执行时所在的节点编号
nodeHttpAddress String 该尝试执行时所在的节点地址
logsLink String 作业尝试的日志超链接
containerId String 该尝试执行时所在的容器编号
startTime Long 该作业尝试启动的时间(Unix纪元,单位是毫秒)

作业指标统计(JobCounter)

使用作业指标函数库,您可以获得关于该作业的所有指标信息。

guard let js = try his.checkJobCounters(jobId: "job_1484231633049_0005") else {
    // 出错了
}//end guard
js.counterGroup.forEach{ group in
  print(group.counterGroupName)
  group.counters.forEach { counter in
    print(counter.name)
    print(counter.mapCounterValue)
    print(counter.reduceCounterValue)
    print(counter.totalCounterValue)
  }//next counter
}//next group

JobCounter 作业指标对象

变量 类型 描述
id String 作业编码
counterGroup [CounterGroup] 指标清单分组集合

CounterGroup 指标分组对象

变量 类型 描述
counterGroupName string 该组指标的名称
counter [Counter] 指标对象构成的数组

Counter 指标对象

变量 类型 描述
name String 指标名称
reduceCounterValue Int 归并任务计数
mapCounterValue Int 映射任务计数
totalCounterValue Int 所有任务总数统计

查看作业配置

函数 checkJobConfig() 可以用来检查作业配置情况:

guard let config = try his.checkJobConfig(jobId: "job_1484231633049_0005") else {
    /// 出错了
}
// 打印配置目录
print(config.path)
// check properties of configuration file
for p in config.property {
  print(p.name)
  print(p.value)
  print(p.source)
}

JobConfig 作业配置对象

作业配置对象包含了该作业的资源使用信息。

变量 类型 描述
path String 作业配置文件的路径
property [Property] 属性数组

Property 属性对象

变量 类型 描述
name String 配置项名称
value String 配置项取值
source String 配置对象所在路径。如果不止一个路径,则意味着占用资源的历史,清单末尾是最新的内容。

作业子任务(Tasks of A Job)

方法 checkJobTasks() 用于获取特定作业的子任务信息:

// 获得特定作业的所有子任务信息
let tasks = try his.checkJobTasks(jobId: "job_1484231633049_0005")

// 打印每个子任务的属性
for t in tasks {
  print(t.progress)
  print(t.elapsedTime)
  print(t.state)
  print(t.startTime)
  print(t.id)
  print(t.type)
  print(t.successfulAttempt)
  print(t.finishTime)
}//next t

checkJobTasks() 函数参数

调用 checkJobTasks() 总是需要一个作业编码 jobId ,但是也可以指定第二个附加参数,即子任务类型用于进一步过滤查询结果:

  • jobId: 作业编码
  • taskType: 可选的子任务类型,即 .MAP 映射任务或者 .REDUCE 归并任务

JobTask 作业子任务对象

函数checkJobTasks()用于返回一个作业子任务数组 JobTask 对象,包括下列属性:

变量 类型 描述
id string 作业子任务编码
state String 作业子任务状态 - 有效值包括: NEW(新建)、SCHEDULED(已列入计划)、RUNNING(运行)、SUCCEEDED(成功)、FAILED(失败)、KILL_WAIT(等待终止/关闭)、KILLED(已经终止/关闭)
type String 作业子任务类型 - MAP(映射)或者 REDUCE(归并)
successfulAttempt String 最后一次成功尝试的作业子任务编码
progress Double 作业子任务执行程度,百分比
startTime Int 作业子任务启动时间(毫秒,Unix纪元)。如果还没有启动,则值为-1。
finishTime Int 作业子任务结束时间(毫秒,Unix纪元)。
elapsedTime Int 应用程序自启动至今所耗时间,单位是毫秒

查询特定的作业子任务 JobTask

如果已经获取了有效的 jobId 作业编码和 jobTaskId 作业子任务编码,那么可以使用函数 checkJobTask() 检查特定的作业子任务:

guard let task = try his.checkJobTask(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0") else {
    // 出错了
}
print(task.progress)

该函数 checkJobTask 返回值为一个 JobTask 作业子任务 对象,如上所述。

作业子任务统计指标

方法 checkJobTaskCounters() 用于返回特定作业子任务的统计指标信息,如以下范例所示:

guard let js = try his.checkJobTaskCounters(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0") else {
    // 出错了
}
// 打印作业子任务指标编码
print(js.id)
// 检查每个统计指标编组
js.taskCounterGroup.forEach{ group in
  print(group.counterGroupName)
  // 打印每个分组中的详细指标信息
  group.counters.forEach { counter in
    print(counter.name)
    print(counter.value)
  }
}

JobTaskCounter 作业子任务指标对象

变量 类型 描述
id String 作业编码
taskCounterGroup [CounterGroup] 分组对象集合,详见 CounterGroup 作业指标分组

Task Attempts 作业子任务尝试

函数 checkJobTaskAttempts() 用于检查特定作业子任务的尝试信息。

let jobTaskAttempts = try his.checkJobTaskAttempts(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0")
for attempt in jobTaskAttempts {
  print(attempt.id)
  print(attempt.diagnostics)
  print(attempt.assignedContainerId)
  print(attempt.rack)
  print(attempt.state)
  print(attempt.progress)
}//next

一旦成功调用 checkJobTaskAttempts() 会返回一个 TaskAttempt 作业子任务尝试对象,如下所述:

TaskAttempt 作业子任务尝试对象

作业子任务尝试对象 TaskAttempt 包括下列属性:

变量 类型 描述
id String 作业子任务编码
rack String 所在机架信息
state TaskAttempt.State 作业子任务尝试状态——有效值包括: NEW(新建)、UNASSIGNED(未指派)、 ASSIGNED(已指派)、RUNNING(运行)、COMMITPENDING(等待提交)、SUCCESSCONTAINERCLEANUP(成功并准备清理容器)、 SUCCEEDED(成功)、FAILCONTAINERCLEANUP(清理容器失败)、FAILTASKCLEANUP(作业子任务清理失败)、FAILED(失败)、KILLCONTAINERCLEANUP(终止清理容器)、KILLTASK_CLEANUP、(终止作业子任务清理)、KILLED(已终止)
type TaskType 作业子任务类型 - MAP 映射 或者 REDUCE 归并
assignedContainerId String 该作业子任务尝试所在的容器编码
nodeHttpAddress String 该作业子任务尝试坐在的http服务器节点地址
diagnostics String 诊断信息
progress Double 该作业子任务尝试的执行进度,百分比
startTime Int 该作业子任务尝试启动时间(毫秒,Unix纪元)
finishTime Int 该作业子任务尝试结束时间(毫秒,Unix纪元)
elapsedTime Int 自该作业自任务尝试开始运行时所消耗时间(毫秒)

另外,对于 reduce (归并)作业子任务尝试来说,还可以包括下列属性:

变量 类型 描述
shuffleFinishTime Int 重新排序结束时间(毫秒,Unix纪元)
mergeFinishTime Int 合并工作结束时间(毫秒,Unix纪元)
elapsedShuffleTime Int 重新排序所耗时间(从归并子任务启动开始后到排序结束所用的时间)
elapsedMergeTime Int 合并操作所用时间(从排序结束到合并完成总共的毫秒数)
elapsedReduceTime Int 整个归并阶段所耗时间(从合并结束到整个归并子任务结束的总时间)

查询特定的 TaskAttempt 作业子任务尝试

如果作业子任务尝试编码有效,则可以执行checkJobTaskAttempt()查询该作业子任务的尝试信息,如下所述:

guard let jobTaskAttempts = try his.checkJobTaskAttempt(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0") else {
    // 出错了
}//end guard
print(attempt.id)
print(attempt.diagnostics)
print(attempt.assignedContainerId)
print(attempt.rack)
print(attempt.state)
print(attempt.progress)

函数checkJobTaskAttempt() 调用后会返回一个 TaskAttempt 作业子任务尝试对象 如上文所述。

作业子任务尝试指标统计

方法 checkJobTaskAttemptCounters() 可以获得关于特定作业子任务尝试的详细指标统计,如下文所述:

guard let counters = try his.checkJobTaskAttemptCounters(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0") else {
    // 出错了
}
// 打印指标计数编码
print(counters.id)
// 遍历所有指标分组
for group in counters.taskAttemptCounterGroup {
  // 打印分组名称
  print(group.counterGroupName)
  // 打印该分组下的所有详细指标
  for counter in group.counters {
    print(counter.name)
    print(counter.value)
  }//next counter
}//next group