分治和hash-从海量数据大文件中查出某时间段内登陆超过阈值的ip地址 (四)

2014-11-23 22:19:44 ? 作者: ? 浏览: 22
odeMap.put(splitFilename, lines); } lines.add(line); } count ++; if(count > 0 && count % BATCH_MAGIC == 0){ for(Map.Entry> entry : hashcodeMap.entrySet()){ //System.out.println(entry.getKey()+"--->"+entry.getValue()); DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8")); } //一次操作500之后清空,重新执行 hashcodeMap.clear(); } } reader.close(); fis.close(); } private boolean process() throws IOException{ Path target = Paths.get(root); //ip -> List Map> resMap = new HashMap>(); this.recurseFile(target,resMap); for(Map.Entry> entry : resMap.entrySet()){ System.out.println(entry.getKey()); for(Date date : entry.getValue()){ System.out.println(date); } } return true; } /** * 递归执行,将5分钟内访问超过阈值的ip找出来 * * @param parent * @return * @throws IOException */ private void recurseFile(Path parent,Map> resMap) throws IOException{ //Path target = Paths.get(dir); if(!Files.exists(parent) || !Files.isDirectory(parent)){ return; } Iterator targets = parent.iterator(); for(;targets.hasNext();){ Path path = targets.next(); if(Files.isDirectory(parent)){ //如果还是目录,递归 recurseFile(path.toAbsolutePath(),resMap); }else { //将一个文件中所有的行读上来 List lines = Files.readAllLines(path, Charset.forName("UTF-8")); judgeAndcollection(lines,resMap); } } } /** * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip * 并放入resMap * * @param lines * @param resMap */ private void judgeAndcollection(List lines,Map> resMap) { if(lines != null){ //ip->ListaccessTimes Map> judgeMap = new HashMap>(); for(String line : lines){ line = line.trim(); int space = line.indexOf(delimiter); String ip = line.substring(0, space); List accessTimes = judgeMap.get(ip); if(accessTimes == null){ accessTimes = new ArrayList(); } accessTimes.add(line.substring(space + 1).trim()); judgeMap.put(ip, accessTimes); } if(judgeMap.size() == 0){ return; } for(Map.Entry> entry : judgeMap.entrySet()){ List acessTimes = entry.getValue(); //相同ip,先判断整体大于10个 if(acessTimes != null && acessTimes.size() >= MAGIC){ //开始判断在List集合中,5分钟内访问超过MAGIC=10 List attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); if(attackTimes != null){ resMap.put(entry.getKey(), attackTimes); } } } } } /** * @param args */ public static void main(String[] args) { String filename = "/DuplicateIP/log.txt"; DuplicateIP dip = new DuplicateIP(filename); try { dip.preProcess(); dip.process(); } catch (IOException e) { e.printStackTrace(); } } }

下面是工具类,提供了一些文件读写及查找的功能


public class DuplicateUtils { 
    /**
     * 根据给出的数据,往给定的文件形参中追加一行或者几行数据
     * 
     * @param file
     * @throws IOException 
     */ 
    public static Path appendFile(String splitFilename, Iterable<  extends CharSequence> accessTimes,Charset cs) throws IOException { 
        if(accessTimes != null){ 
            Path target = Paths.get(splitFilename); 
            if(target == null){ 
                createFile(splitFilename); 
            } 
            return Files.write(target, accessTimes, cs);//, options)  
        } 
         
        return null; 
    } 
     
    /**
     * 创建文件
     * @throws IOException 
     */ 
    public static void createFile(String splitFilename) throws IOException { 
        Path target = Paths.get(splitFilename); 
        Set perms = PosixFilePermissions.fromString("rw-rw-rw-"); 

            
-->

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: