spark 开发考题!面试题! 根据IP地址查询归属地,统计归属地IP地址数
spark开发考题!面试题! 网络开发式运营人才选拔!
题目:
现有一批IP地址(详见文件ip.txt),需要根据IP地址库信息(详见文件iplib.txt),查询归属地信息,并统计每一个归属地IP地址的总数。
请编写spark任务并在测试环境上提交运行。统计结果请以文本文件格式请保存在xxxx目录下,源代码请保存在xxxx目录下。
运行结果格式示例:
江苏 移动 241
江西 电信 67
河南 联通 89
IPLib_test格式:
1.1.1.0 1.1.1.255 广东 电信
2.0.0.0 2.1.1.255 天津 电信
......
IPs_test 格式:
101.100.24.56
144.0.1.100
......
package com.dt.spark.chinatelecomExam; public class IpUtil { public static boolean ipExistsInRange(String ip, String ipSection) { ipSection = ipSection.trim(); ip = ip.trim(); int idx = ipSection.indexOf('-'); String beginIP = ipSection.substring(0, idx); String endIP = ipSection.substring(idx + 1); return getIp2long(beginIP) <= getIp2long(ip) && getIp2long(ip) <= getIp2long(endIP); } public static long getIp2long(String ip) { ip = ip.trim(); String[] ips = ip.split("\\."); long ip2long = 0L; for (int i = 0; i < 4; ++i) { ip2long = ip2long << 8 | Integer.parseInt(ips[i]); } return ip2long; } }
package com.dt.spark.Exam import scala.collection.immutable.HashSet import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import util.control.Breaks._ import scala.collection.mutable /** * */ object IPQueryForTest { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) var masterUrl = "local[8]" if (args.length > 0) { masterUrl = args(0) } val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("IPQueryForTest") val spark = SparkSession .builder() .config(sparkConf) .getOrCreate() val sc = spark.sparkContext //数据存放的目录; var dataPath = "data/Exam/" /** * 读取数据,用什么方式读取数据呢?在这里是使用RDD! */ val IPLibRDD: RDD[String] = sc.textFile(dataPath + "IPLib_test.txt") val IPsRDD: RDD[String] = sc.textFile(dataPath + "IPs_test.txt") /** * 在Spark中如何实现mapjoin呢,显然是要借助于Broadcast,会把数据广播到Executor级别让该Executor上的所有任务共享 * 该唯一的数据,而不是每次运行Task的时候都要发送一份数据的拷贝,这显著的降低了网络数据的传输和JVM内存的消耗 */ val IPLibSet: HashSet[String] = HashSet() ++ IPLibRDD.collect() val IPLibSetBroadcast: Broadcast[HashSet[String]] = sc.broadcast(IPLibSet) println("纯粹通过RDD的方式实现IP地址统计分析:") val resultIPQuery: RDD[(String, Long)] = IPsRDD.map(line => { var ip = line.trim var exists = false var locationKey: String = "NoMatchIPQuery" var IPValue: Long = 0L val iplibSets: HashSet[String] = IPLibSetBroadcast.value for (iplib <- iplibSets) { val ipSection: String = iplib.split("\t")(0).trim + "-" + iplib.split("\t")(1).trim breakable { exists = IpUtil.ipExistsInRange(ip, ipSection) if (exists == true) { locationKey = iplib.split("\t")(2).trim + "\t" + iplib.split("\t")(3).trim IPValue = 1L break() } } } (locationKey, IPValue) }) val result = resultIPQuery.filter(!_._1.contains("NoMatchIPQuery")).reduceByKey(_ + _) result.map(line => line._1 + "\t" + line._2).saveAsTextFile(dataPath + "IPQueryResult.txt") } }