一、先说注意事项吧:
1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:
?
static {
EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
family.setInMemory(true);
family.setMaxVersions(1);
EP_TABLE_DISCRIPTOR.addFamily(family);
try {
EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
} catch (IOException ioe) {
}
上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是 必须重启HBase才能把jar包载入进来。
?
2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了
二、说下步骤
2.1编写服务端代码:
1)接口类(固定格式)
?
package ict.wde.test;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import java.io.File;
import java.io.IOException;
/**
* Created by Michael on 2015/6/22.
*/
public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {
public long getRowCount() throws IOException;
public long getRowCount(Filter filter) throws IOException;
public String getStr() throws IOException;
//public long getKeyValue() throws IOException;
}
2)真正起作用的类
package ict.wde.test;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import java.io.IOException;
/**
* Created by Michael on 2015/6/27.
*/
public class RowCountServer implements RowCountProtocol {
@Override
public void start(CoprocessorEnvironment env) throws IOException {
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(3, null);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 3;
}
@Override
public long getRowCount() throws IOException {
return this.getRowCount(new FirstKeyOnlyFilter());
}
@Override
public long getRowCount(Filter filter) throws IOException {
return this.getRowCount(filter, false);
}
@Override
public String getStr() throws IOException {
String name = "Hello Doctor Michael Zhang, again!";
return name;
}
// @Override
// public long getKeyValueCount() {
// return 0;
// }
public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(1);
if (filter != null) {
scan.setFilter(filter);
}
return 1;
}
}
上述两个类打包jar后放入hbase的lib目录下
?
2.2客户端代码
?
import ict.wde.test.RowCountProtocol; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.clien