分治和hash-从海量数据大文件中查出某时间段内登陆超过阈值的ip地址 (二)

2014-11-23 22:19:44 ? 作者: ? 浏览: 19
intln(entry.getKey()); for(Date date : entry.getValue()){ System.out.println(date); } } return true; } /** * 递归执行,将5分钟内访问超过阈值的ip找出来 * * @param parent * @return * @throws IOException */ private void recurseFile(Path parent,Map> resMap) throws IOException{ //Path target = Paths.get(dir); if(!Files.exists(parent) || !Files.isDirectory(parent)){ return; } Iterator targets = parent.iterator(); for(;targets.hasNext();){ Path path = targets.next(); if(Files.isDirectory(parent)){ //如果还是目录,递归 recurseFile(path.toAbsolutePath(),resMap); }else { //将一个文件中所有的行读上来 List lines = Files.readAllLines(path, Charset.forName("UTF-8")); judgeAndcollection(lines,resMap); } } } /** * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip * 并放入resMap * * @param lines * @param resMap */ private void judgeAndcollection(List lines,Map> resMap) { if(lines != null){ //ip->ListaccessTimes Map> judgeMap = new HashMap>(); for(String line : lines){ line = line.trim(); int space = line.indexOf(delimiter); String ip = line.substring(0, space); List accessTimes = judgeMap.get(ip); if(accessTimes == null){ accessTimes = new ArrayList(); } accessTimes.add(line.substring(space + 1).trim()); judgeMap.put(ip, accessTimes); } if(judgeMap.size() == 0){ return; } for(Map.Entry> entry : judgeMap.entrySet()){ List acessTimes = entry.getValue(); //相同ip,先判断整体大于10个 if(acessTimes != null && acessTimes.size() >= MAGIC){ //开始判断在List集合中,5分钟内访问超过MAGIC=10 List attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); if(attackTimes != null){ resMap.put(entry.getKey(), attackTimes); } } } } } /** * @param args */ public static void main(String[] args) { String filename = "/DuplicateIP/log.txt"; DuplicateIP dip = new DuplicateIP(filename); try { dip.preProcess(); dip.process(); } catch (IOException e) { e.printStackTrace(); } } } public class DuplicateIP { private String delimiter = " "; private String FILE_PRE = "ip_"; private int MAGIC = 10,BATCH_MAGIC = 500; private String root = "/DuplicateIP/"; private String filename = ""; public DuplicateIP(final String filename) { this.filename = filename; } /** * 将大文件拆分成较小的文件,进行预处理 * @throws IOException */ private void preProcess() throws IOException { //Path newfile = FileSystems.getDefault().getPath(filename); BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename))); // 用5M的缓冲读取文本文件 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); //假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断 //如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时, //当来访的ip地址足够多的情况下,内存开销吃不消 // List entities = new ArrayList(); //存放ip的hashcode->accessTimes集合 Map> hashcodeMap = new HashMap>(); String line = ""; int count = 0; while((line = reader.readLine()) != null){ String split[] = line.split(delimiter); if(split != null && split.length >= 2){ //根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动 //极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件 int serial = split[0].trim().hashCode() % MAGIC; String splitFilename = FILE_PRE + serial; List lines = hashcodeMap.get(splitFilename); if(lines == null){ lines = new ArrayList(); hashc
-->

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: