public static BulkProcessor esBulk(Client client) throws UnknownHostException, IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
// System.out.println("@before bulk indexing");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
log.error("{}", request.getFromContext("id"));
}
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
log.error("bulk fail",failure);
}
})
// .setBulkActions(10000)
// .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
// .setFlushInterval(TimeValue.timeValueSeconds(5))
// .setConcurrentRequests(1)
// .setBackoffPolicy(
// BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
// .build();
/* for test */
.setBulkActions(1)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
return bulkProcessor;
}
public boolean bulkIndex(String txtPath) throws UnknownHostException, IOException {
// TODO Auto-generated method stub
BulkProcessor bp = EsUtil.esBulk(esConf.client());
BufferedReader br = Files.newBufferedReader(Paths.get(txtPath), Charset.defaultCharset());
String line = null;
while ((line = br.readLine()) != null) {
String[] tokens = line.split("\\t");
bp.add(new IndexRequest(IDX_NAME, TERM_EXT).source(
XContentFactory.jsonBuilder().startObject().field("a", tokens[0]).field("b", tokens[1])
.field("d", tokens[2]).field("c", tokens[3]).endObject()));
}
br.close();
bp.flush();
bp.close();
log.info("bulk indexing completed");
return true;
}
댓글