首页 / 邮件 / Mapreduce任务实现邮件监控
Mapreduce任务实现邮件监控
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Mapreduce任务实现邮件监控,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6427字,纯文字阅读大概需要10分钟。
内容图文
Mapreduce任务实现邮件监控
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下
import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.URI; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.xml.XMLSerializer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; public class Email { private static final String USERNAME = "123456@qq.com";//发送邮件的用户名 private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码 private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host public static void main(String args[]) { try { sendEmail("测试邮件", "测试邮件内容!", "test@qq.com"); System.out.println("email ok !"); } catch (MessagingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取 * @param to 目标邮箱(可以多个邮箱,用,号隔开) * @param job 通过mapreduce的job获取jobID * @param time 通过时间戳访问错误日志路径 * @throws Exception */ public static void sendErrMail(String to, Job job, String time) throws Exception { String subject = job.getJobName(); String message = getErr(job, time); LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(message, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写 * @param subject 主题 * @param body 内容 * @param to 目标邮箱 * @throws MessagingException */ public static void sendEmail(String subject, String body, String to) throws MessagingException { LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(body, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 获取日志文件 * @param job * @param time * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream getFile(Job job, String time) throws IOException { String year = time.substring(0, 4); String month = time.substring(4, 6); String day = time.substring(6, 8); String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/" + year + "/" + month + "/" + day + "/000000"; FileSystem fs = FileSystem.get(URI.create(dst), new Configuration()); FileStatus[] status = fs.listStatus(new Path(dst)); FSDataInputStream in = null; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName() .contains(job.getJobID().toString()) && status[i].getPath().getName().endsWith("jhist")) { in = new FSDataInputStream(fs.open(status[i].getPath())); } } return in; } /** * @category 解析文件类容为xml * @param job * @param time * @return xml * @throws IOException * @throws InterruptedException */ public static String getErr(Job job, String time) throws IOException, InterruptedException { FSDataInputStream in = getFile(job, time); Thread t1 = new Thread(); while (in == null) { t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成 t1.join(); in = getFile(job, time); } BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = ""; JSONObject jo; JSONArray jsa = new JSONArray(); String xml = ""; XMLSerializer xmlSerializer = new XMLSerializer(); while ((line = br.readLine()) != null) { if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) { jo = JSONObject.fromObject(line); jsa.add(jo); } } xml = xmlSerializer.write(jsa); in.close(); br.close(); return xml; } /** * @category 获取try-catch中的异常内容 * @param e Exception * @return 异常内容 */ public static String getException(Exception e) { ByteArrayOutputStream out = new ByteArrayOutputStream(); PrintStream pout = new PrintStream(out); e.printStackTrace(pout); String ret = new String(out.toByteArray()); pout.close(); try { out.close(); } catch (Exception ex) { } return ret; } } class LoginMail extends Authenticator { private String username; private String password; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } public LoginMail(String username, String password) { this.username = username; this.password = password; } }
原文:http://my.oschina.net/mkh/blog/493885
内容总结
以上是互联网集市为您收集整理的Mapreduce任务实现邮件监控全部内容,希望文章能够帮你解决Mapreduce任务实现邮件监控所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。