一个很大的文件,例如10G,仅包含ip地址和访问时间二列,格式如下:
127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ... 127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ...
从文件里查出在5分钟内连续登陆10次以上的ip地址集合并输出。这类问题是一个很常见的应用,通常都是从大的log日志文件中找出有攻击嫌疑的ip。
这类应用因为要处理分析的文件非常大,显然不能将整个文件全部读入内存,然后进行下一步工作。常见的比较成熟的解决方案有:分治+Hash,Bloom filter,2-Bitmap等。 这里就使用第一种方式来解决。
下面是分治与hash的代码
public class DuplicateIP {
private String delimiter = " ";
private String FILE_PRE = "ip_";
private int MAGIC = 10,BATCH_MAGIC = 500;
private String root = "/DuplicateIP/";
private String filename = "";
public DuplicateIP(final String filename) {
this.filename = filename;
}
/**
* 将大文件拆分成较小的文件,进行预处理
* @throws IOException
*/
private void preProcess() throws IOException {
//Path newfile = FileSystems.getDefault().getPath(filename);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的缓冲读取文本文件
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
//假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断
//如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,
//当来访的ip地址足够多的情况下,内存开销吃不消
// List entities = new ArrayList();
//存放ip的hashcode->accessTimes集合
Map> hashcodeMap = new HashMap>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
String split[] = line.split(delimiter);
if(split != null && split.length >= 2){
//根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动
//极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件
int serial = split[0].trim().hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList();
hashcodeMap.put(splitFilename, lines);
}
lines.add(line);
}
count ++;
if(count > 0 && count % BATCH_MAGIC == 0){
for(Map.Entry> entry : hashcodeMap.entrySet()){
//System.out.println(entry.getKey()+"--->"+entry.getValue());
DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
}
//一次操作500之后清空,重新执行
hashcodeMap.clear();
}
}
reader.close();
fis.close();
}
private boolean process() throws IOException{
Path target = Paths.get(root);
//ip -> List
Map> resMap = new HashMap>();
this.recurseFile(target,resMap);
for(Map.Entry> entry : resMap.entrySet()){
System.out.pr