首页 / 更多教程 / sql 重写ipCount
sql 重写ipCount
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了sql 重写ipCount,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1601字,纯文字阅读大概需要3分钟。
内容图文
package com.ws.sparksql
import com.ws.spark.IpFromUtils
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 统计日志中ip归属地出现次数
*/
object SqlIpFromCount {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("SqlIpFromCount").master("local[*]").getOrCreate()
import sparkSession.implicits._
//读取规则
val rulesData: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\ip.txt")
val rules: Dataset[(Long, Long, String)] = rulesData.map(l => {
val fields = l.split("[|]")
val beginNum = fields(2).toLong
val endNum = fields(3).toLong
val province = fields(6)
(beginNum, endNum, province)
})
val rulesDataFrame = rules.toDF("start_num","end_num","province")
rulesDataFrame.createTempView("t_rules")
//广播变量,只能用sc实例
//val broadCast: Broadcast[Array[(Long, Long, String)]] = sparkSession.sparkContext.broadcast(rules)
//读取数据
val data: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\access.log")
val ipNum: Dataset[Long] = data.map(l => {
val fields = l.split("[|]")
val ip = fields(1)
//ip转十进制
val ipNum = IpFromUtils.ipToLong(ip)
ipNum
})
val ipNumDataFrame: DataFrame = ipNum.toDF("ip_num")
ipNumDataFrame.createTempView("t_ips")
val result = sparkSession.sql("select province , count(*) as times from t_rules right join t_ips on (ip_num >= start_num and ip_num <= end_num) group by province order by times desc")
result.show()
sparkSession.stop()
}
}
结果 :
+--------+-----+
|province|times|
+--------+-----+
| 陕西| 1824|
| 北京| 1535|
| 重庆| 868|
| 河北| 383|
| 云南| 126|
+--------+-----+
内容总结
以上是互联网集市为您收集整理的sql 重写ipCount全部内容,希望文章能够帮你解决sql 重写ipCount所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。