ES中的TransportClient学习——Java代码全记录

ES里有多种方式可以使用Java client对现有集群执行标准的index、get、delete和search操作,也可以在运行的集群上执行管理任务
获取client非常简单的。最常见的获取client的方式是创建TransportClient。
TransportClient的必须大版本是相同的,比如都是2.x或者5.x,小版本不同会有一些小问题。理想情况下,和ES的版本完全对应。
TransportClient在7.0会被声明不推荐deprecated,8.0就是完全移除。
建议用rest client,具体迁移有个说明,但还是要先懂TransportClient的功能,这也是这篇的意义。

TransportClient ——TCP连接Client

创建TransportClient:

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

PreBuiltTransportClient的构造函数可以读取Setting,所以一般都是下面这样写,两步,先new一个Settings,然后PreBuiltTransportClient构造,可以改成单例模式,这里不是重点:

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;

public class ElasticsearchConfig {
    private static TransportClient client;

    public TransportClient getElasticsearchClient() {
        try {
            Settings settings = Settings.builder()
                    .put("cluster.name", "my-esLearn")  //连接的集群名
                    .put("client.transport.ignore_cluster_name", true)  //如果集群名不对,也能连接
                    .put("client.transport.sniff", true)//嗅探功能开启
                    .build();
            //创建client
            client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300))
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));  
            return client;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

Single Document APIs —— 单文档操作

文档就是CURD,create、update、retrieve、delete四种操作,不过分两种情况,单文档和多文档,单文档是基础,一般用的都是多文档。

Index API

这个API的功能主要是将构造出来的Json文档可以放进一个索引里,让它在这个索引中可以被搜索,就是可以通过index和type放文档进去。

Json文档的四种构造方式

1.自己手工构建
String json = "{" +
    "\"user\":\"kimchy\"," +
    "\"postDate\":\"2013-01-30\"," +
    "\"message\":\"trying out Elasticsearch\"" +
"}";
2.使用map,es会自动进行转换
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
3.使用第三方json包构建,方便

比如官网说的jackson,或者我经常用的Gson,fastJson等等都可以。只举个小例子:

Friend friend = new Friend("yitian", 25);
ObjectMapper mapper = new ObjectMapper();
byte[] jackson = mapper.writeValueAsBytes(friend);
String fastJson = JSON.toJSONString(friend);
String gson = new Gson().toJson(friend);
4.XContentBuilder ES官方推荐
XContentBuilder builder = null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .field("user", "kimchy")
                    .field("postDate", new Date())
                    .field("message", "trying out Elasticsearch")
                    .endObject();

        } catch (IOException e) {
            e.printStackTrace();
        }

利用Json生成IndexResponse

IndexResponse responseXcontent = client.prepareIndex("twitter", "tweet", "4")
            .setSource(builder, XContentType.JSON)
            .get();

Get API

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

Delete API

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

Update API

可以构建UpdateRequest,然后直接执行响应操作

UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("twitter");
        updateRequest.type("tweet");
        updateRequest.id("1");
        updateRequest.doc(jsonBuilder()
                .startObject()
                .field("user", "aaaa")
                .endObject());
UpdateResponse response = client.update(updateRequest).get();

不过还是习惯利用prepareUpdate方法

UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
            .setDoc(jsonBuilder()
                    .startObject()
                    .field("user", "asdadsaf")
                    .endObject())
            .get();

关于UpdateRequest构建,重点是一个很实用的方法upsert,兼顾update和insert两者

UpdateRequest updateRequestUpsert = new UpdateRequest("index", "type", "4")
            .doc(json, XContentType.JSON)
            .upsert(indexRequest);

upsert方法的理解

doc用来更新,upsert用来插入,两者是异或的关系,只有一个会起作用
如果UpdateRequest的id的文档在索引里已经存在就是紧接的doc方法起作用,upsert方法没作用就是相当于get;
如果这个文档不存在,则doc就没用,upsert直接插入新文档。

Multi-document APIs——多文档操作

Multi Get API

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
            .add("twitter", "tweet", "1")
            .add("twitter", "tweet", "2", "3", "4")
            .add("another", "type", "foo")
            .get();

Bulk API

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
    bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
            .setSource(XContentFactory.jsonBuilder()
                    .startObject()
                    .field("user", "kimchy")
                    .field("postDate", new Date())
                    .field("message", "trying out Elasticsearch")
                    .endObject()
            )
    );

    bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
            .setSource(jsonBuilder()
                    .startObject()
                    .field("user", "kimchy")
                    .field("postDate", new Date())
                    .field("message", "another post")
                    .endObject()
            )
    );

    BulkResponse bulkResponse = bulkRequest.get();

BulkProcessor——重点

Bulk API的升级版,Bulk的创建很固定繁琐,所以可以使用BulkProcessor来一站式的设置好Bulk。

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println(bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println("success " + bulkResponse.hasFailures());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                System.out.println("failed " + throwable.getMessage());
            }
        })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(4, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();

BulkProcessor参数

1.BulkProcessor.builder创建第一个参数是client
2.new BulkProcessor.Listener()是builder的第二个参数
3.beforeBulk 通常方法使用request.numberOfActions()来查看有多少要处理的
4.afterBulk有两个,第一个是成功的返回response,一般接response.hasFailures(),另一个是request响应就有问题,一般打印throwable.getMessage()。
5.setBulkActions(10000) Request的个数每到10000就execute
6.setBulkSize(new ByteSizeValue(4, ByteSizeUnit.MB))Request所占用内存达到4MB就flush
7.setFlushInterval(TimeValue.timeValueSeconds(5))5秒flush一次,不管多少
8.setConcurrentRequests(1),1是同时执行,0是一个一个执行
9.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))这个是设置出现异常重新发送请求前这段时间的回避策略,这个是指数策略,具体可以查看org.elasticsearch.action.bulk.BackoffPolicy类
上面有三个地方注意:
一是flush和execute,ES的api内部是flush执行了execute,所以效果一致,flush一般是指刷新内存,将内存中的数据立刻写出。
二是有三个控制是并列关系,Actions、Size和时间都是到了就执行。
三是回避策略分两种:1、按照固定时间间隔重试,比如100毫秒;这种方式在网络不稳定时重连可能造成某一时间点流量同时发送,阻塞网络;或者造成发送一些无意义的请求;2、按照指数时间间隔重试,比如刚开始100毫秒,下一次200毫秒等;比如支付宝和第三方集成时就是类似方式。

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
bulkProcessor.close();
close遇见错就退出,awaitClose会等待所有的request都变成true。

测试状态下的BulkProcessor

测试最好setConcurrentRequests(0),这样是一个同步状态,你可以使用flush操作进行逐步控制

Reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
            .source("index")
            .destination("new_index")
            .filter(QueryBuilders.matchQuery("gender", "male"))
            .get();

Update By Query API

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
    updateByQuery
            .source("index")
            .size(3)
            .abortOnVersionConflict(false);
    BulkByScrollResponse response = updateByQuery.get();

Delete By Query API

get和execute(异步)两种方法,注意区别

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
            .filter(QueryBuilders.matchQuery("age", 25))
            .source("twitter")
            .get();
//要等一下才能运行成功,可以使用等待
            DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
            .filter(QueryBuilders.matchQuery("age", 25))
            .source("twitter")
            .execute(new ActionListener<BulkByScrollResponse>() {
                @Override
                public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
                    System.err.println(bulkByScrollResponse.getDeleted());
                }

                @Override
                public void onFailure(Exception e) {
                    System.err.println("没有删除");
                }
            });

Search API——搜索API

基本的Search

SearchResponse response = client.prepareSearch("index")
            .setTypes("type")
            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
            .setQuery(QueryBuilders.termQuery("gender", "male"))                 // Query
            .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
            .setFrom(0)
            .setSize(60)
            .setExplain(true)
            .get();
SearchResponse responseAll = client.prepareSearch().get();

Scroll Search

Es10000个最多返回,一般很多的时候就需要Scroll。Scroll本身一个必须的参数是存活时间,这个必须得有(一般用new TimeValue(60000)实现),Scroll使用以前需要先搜一次,想下面这样多加上一个setScroll(new TimeValue(60000)),然后返回ScrollId,利用prepareSearchScroll传入ScrollId参数进行循环读取。

SearchResponse scrollResp = client.prepareSearch("index")
            .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
            .setScroll(new TimeValue(60000))
            .setQuery(qb)
            .setSize(2 ).get();
    //Scroll until no hits are returned
    do {
        System.out.println("新一次循环" + new Date());
        for (SearchHit hit : scrollResp.getHits().getHits()) {
            System.out.println(hit.getSourceAsMap());
        }

        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
    }
    // Zero hits mark the end of the scroll and the while loop.
    while (scrollResp.getHits().getHits().length != 0); 

MultiSearch

SearchRequestBuilder srb1 = client
            .prepareSearch().setQuery(QueryBuilders.queryStringQuery("smith")).setSize(1);
    SearchRequestBuilder srb2 = client
            .prepareSearch().setQuery(QueryBuilders.matchQuery("name", "kimchy")).setSize(1);

MultiSearchResponse sr = client.prepareMultiSearch()
            .add(srb1)
            .add(srb2)
            .get();

Search中使用Aggregations

SearchResponse可以添加聚合信息,这个下面会具体放,专门讲:

SearchResponse sr = client.prepareSearch().setIndices("index")
            .setQuery(QueryBuilders.matchAllQuery())
            .addAggregation(
                    AggregationBuilders.terms("agg1").field("gender.keyword")
            )
            .addAggregation(
                    AggregationBuilders.dateHistogram("agg2")
                            .field("age")
                            .dateHistogramInterval(DateHistogramInterval.HOUR)
            )
            .get();

Aggregations——聚合API

聚合主要就是分为两种,指标型或者分桶操作。
桶(Buckets):符合条件的文档的集合,相当于SQL中的group by。比如,在users表中,按“地区”聚合,一个人将被分到北京桶或上海桶或其他桶里;按“性别”聚合,一个人将被分到男桶或女桶
指标(Metrics):基于Buckets的基础上进行统计分析,相当于SQL中的count,avg,sum等。比如,按“地区”聚合,计算每个地区的人数,平均年龄等。
SELECT COUNT(color) FROM table GROUP BY color
GROUP BY相当于做分桶的工作,COUNT是统计指标。

结构化的Aggregations

SearchResponse sr = client.prepareSearch()
            .addAggregation(
                    AggregationBuilders.terms("by_country").field("country")
                            .subAggregation(AggregationBuilders.dateHistogram("by_year")
                                    .field("dateOfBirth")
                                    .dateHistogramInterval(DateHistogramInterval.YEAR)
                                    .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
                            )
            )
            .execute().actionGet();

Metrics——指标型

import elastic.learn.transport.ElasticsearchConfig;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.elasticsearch.search.sort.SortOrder;

/**
 * @Author unclewang
 * @Date 2018/10/13 23:31
 */
public class Metrics {
    public static void main(String[] args) {
        TransportClient client = ElasticsearchConfig.getElasticsearchClient();
        SearchResponse sr = client.prepareSearch().setIndices("index")
                .addAggregation(min())
                .addAggregation(max())
                .addAggregation(sum())
                .addAggregation(avg())
                .addAggregation(stats())
                .addAggregation(extendedStats())
                .addAggregation(value())
                .addAggregation(percentiles())
                .addAggregation(percentilesValue())
                .addAggregation(percentileRanks())
                .addAggregation(cardinality())
                .addAggregation(top())
                .get();
//        Min min = sr.getAggregations().get("min");
//        Sum sum = sr.getAggregations().get("sum");
//
//        System.out.println(min);
//        System.out.println(sum);
        System.out.println(sr.getAggregations().getAsMap());
    }

    /**
     * 方法说明: 最小值
     */
    public static MinAggregationBuilder min() {
        MinAggregationBuilder aggregation =
                AggregationBuilders
                        .min("min")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回最大值
     */
    public static MaxAggregationBuilder max() {
        MaxAggregationBuilder aggregation =
                AggregationBuilders
                        .max("max")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回和
     */
    public static SumAggregationBuilder sum() {
        SumAggregationBuilder aggregation =
                AggregationBuilders
                        .sum("sum")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回平均数
     */
    public static AvgAggregationBuilder avg() {
        AvgAggregationBuilder aggregation =
                AggregationBuilders
                        .avg("avg")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回基本统计值
     */
    public static StatsAggregationBuilder stats() {
        StatsAggregationBuilder aggregation =
                AggregationBuilders
                        .stats("stats")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回更多的统计值,比如四分位数这种
     */
    public static ExtendedStatsAggregationBuilder extendedStats() {
        ExtendedStatsAggregationBuilder aggregation =
                AggregationBuilders
                        .extendedStats("extendedStats")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回有多少个文档有这个值
     */
    public static ValueCountAggregationBuilder value() {
        ValueCountAggregationBuilder aggregation =
                AggregationBuilders
                        .count("valueCount")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 以百分比返回
     */
    public static PercentilesAggregationBuilder percentiles() {
        PercentilesAggregationBuilder aggregation =
                AggregationBuilders
                        .percentiles("percentiles")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 以自定义的百分比返回
     */
    public static PercentilesAggregationBuilder percentilesValue() {
        PercentilesAggregationBuilder aggregation =
                AggregationBuilders
                        .percentiles("percentilesValue")
                        .field("age")
                        .percentiles(1.0, 5.0, 10.0, 20.0, 30.0, 75.0, 95.0, 99.0);
        return aggregation;
    }

    /**
     * 方法说明: 返回各种值所处在的比例
     */
    public static PercentileRanksAggregationBuilder percentileRanks() {
        PercentileRanksAggregationBuilder aggregation =
                AggregationBuilders
                        .percentileRanks("percentileRanks", new double[]{20, 30, 40})
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回基数,就是有多少个不同的值
     */
    public static CardinalityAggregationBuilder cardinality() {
        CardinalityAggregationBuilder aggregation =
                AggregationBuilders
                        .cardinality("cardinality")
                        .field("age");
        return aggregation;
    }

    /**
     * 方法说明: 返回地理信息
     */
    public static GeoBoundsAggregationBuilder geoBounds() {
        GeoBoundsAggregationBuilder aggregation =
                AggregationBuilders
                        .geoBounds("geoBounds")
                        .field("address.location")
                        .wrapLongitude(true);
        return aggregation;
    }

    /**
     * 方法说明: 返回各个聚合之后再按一定排序等规则进行聚合返回
     */
    public static AggregationBuilder top() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .terms("top").field("age")
                        .subAggregation(
                                AggregationBuilders.topHits("toptop")
                                        .sort("age", SortOrder.DESC)
//                                        .explain(true)
                                        .size(1)
//                                        .from(1)
                        );
        return aggregation;
    }
}

Bucket——分桶型

import elastic.learn.transport.ElasticsearchConfig;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.filter.Filters;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator;
import org.elasticsearch.search.aggregations.bucket.global.Global;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

public class Bucket {
    public static void main(String[] args) {
        TransportClient client = ElasticsearchConfig.getElasticsearchClient();
        SearchResponse sr = client.prepareSearch().setIndices("index")
//                .setQuery(QueryBuilders.termQuery("gender", "male"))
                .addAggregation(global())
                .addAggregation(filter())
                .addAggregation(filters())
                .addAggregation(missing())
                .addAggregation(terms())
                .addAggregation(range())
                .addAggregation(histogram())
//                .addAggregation(significantTerms())
                .get();
        Global global = sr.getAggregations().get("agg");
        Filters agg = sr.getAggregations().get("filters");

        for (Filters.Bucket entry : agg.getBuckets()) {
            String key = entry.getKeyAsString();
            long docCount = entry.getDocCount();
            System.out.println(key + " " + docCount);
        }
        System.out.println(sr.getAggregations().getAsMap());

        SearchResponse srNested = client.prepareSearch().setIndices("nestedindex")
                .addAggregation(nested())
                .addAggregation(reverseNested())
                .get();
        System.out.println(srNested.getAggregations().getAsMap());
        SearchResponse srChildren = client.prepareSearch().setIndices("childrenindex")
                .addAggregation(children())
                .get();
        System.out.println(srChildren.getAggregations().getAsMap());
    }

    /**
     * 方法说明: 所有文档的个数
     */
    public static AggregationBuilder global() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .global("global")
                        .subAggregation(AggregationBuilders.terms("genders").field("gender.keyword"));
        return aggregation;
    }

    /**
     * 方法说明: 过滤条件得到的文档个数
     */
    public static AggregationBuilder filter() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .filter("filter", QueryBuilders.termQuery("gender", "male"));
        return aggregation;
    }

    /**
     * PUT /logss/_doc/_bulk?refresh
     * {"index":{"_id":1}}
     * {"body":"warning: page could not be rendered"}
     * {"index":{"_id":2}}
     * {"body":"authentication error"}
     * {"index":{"_id":3}}
     * {"body":"warning: connection timed out"}
     * <p>
     * GET logss/_search
     * {
     * "size": 0,
     * "aggs" : {
     * "messages" : {
     * "filters" : {
     * "filters" : {
     * "errors" :   { "match" : { "body" : "error"   }},
     * "warnings" : { "match" : { "body" : "warning" }}
     * }
     * }
     * }
     * }
     * }
     * 多个filter组合
     */
    public static AggregationBuilder filters() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .filters("filters",
                                new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),
                                new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
        return aggregation;
    }

    /**
     * 方法说明: 缺失的个数
     */
    public static AggregationBuilder missing() {
        AggregationBuilder aggregation =
                AggregationBuilders.missing("missing").field("age");
        return aggregation;
    }

    /**
     * 方法说明: 文档nested的个数,是各个nested数组的个数的和
     */
    public static AggregationBuilder nested() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .nested("nested", "user");
        return aggregation;
    }


    /**
     * 方法说明: 通过nested内部的条件反过来看nested外部的个数
     */
    public static AggregationBuilder reverseNested() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .nested("reverseNested", "user")
                        .subAggregation(
                                AggregationBuilders
                                        .terms("first").field("user.first.keyword")
                                        .subAggregation(
                                                AggregationBuilders
                                                        .reverseNested("firstuser")
                                        )
                        );
        return aggregation;
    }

    /**
     * 方法说明: 某个children类型的文档个数
     */
    public static ChildrenAggregationBuilder children() {
        ChildrenAggregationBuilder aggregation = new ChildrenAggregationBuilder("children", "answer");
        return aggregation;
    }

    /**
     * 方法说明: terms各种词对应出现的文档个数
     */
    public static AggregationBuilder terms() {
        AggregationBuilder aggregation = AggregationBuilders
                .terms("terms")
                .field("gender.keyword")
                .order(BucketOrder.count(true))
                .order(BucketOrder.key(true));
        return aggregation;
    }


    /**
     * 53上连着才行
     * GET mag/mag/_search
     * {
     * "size": 0,
     * "query" : {
     * <p>
     * "terms" : {"lang" : [ "zh_chs"]}
     * },
     * "aggregations" : {
     * "pagestart" : {
     * "significant_terms" : { "field" : "page_start.keyword" }
     * }
     * }
     * }
     *
     * 这个挺有意思就是先在搜索条件下的得到的结果,然后指定某个字段分析结果中出现的次数很多的是什么
     */
    public static AggregationBuilder significantTerms() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .significantTerms("pagestart")
                        .field("page_start.keyword");
        return aggregation;
    }

    /**
     * 方法说明: 添加上下限,然后各个范围里的文档个数
     */
    public static AggregationBuilder range() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .range("range")
                        .field("age")
                        .addUnboundedTo(20)
                        .addRange(20, 35)
                        .addRange(35,56)
                        .addUnboundedFrom(56);
        return aggregation;
    }

    /**
     * 方法说明: 日期范围,同range
     */
    public static AggregationBuilder dateRange() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .dateRange("dateRange")
                        .field("dateOfBirth")
                        .format("yyyy")
                        .addUnboundedTo("1950")    // from -infinity to 1950 (excluded)
                        .addRange("1950", "1960")  // from 1950 to 1960 (excluded)
                        .addUnboundedFrom("1960"); // from 1960 to +infinity
        return aggregation;
    }

    /**
     * 方法说明: ip范围,同range
     */
    public static AggregationBuilder ipRange() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .ipRange("ipRange")
                        .field("ip")
                        .addUnboundedTo("192.168.1.0")             // from -infinity to 192.168.1.0 (excluded)
                        .addRange("192.168.1.0", "192.168.2.0")    // from 192.168.1.0 to 192.168.2.0 (excluded)
                        .addUnboundedFrom("192.168.2.0");          // from 192.168.2.0 to +infinity

        return aggregation;
    }


    /**
     * bucket_key = Math.floor((value - offset) / interval) * interval + offset
     * 直方图,interval是间隔,每个间隔段之间的文档
     */
    public static AggregationBuilder histogram() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .histogram("histogram")
                        .field("age")
                        .interval(2)
                        .minDocCount(1);
        return aggregation;
    }

    /**
     * 方法说明: 同histogram
     */
    public static AggregationBuilder dateHistogram() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .dateHistogram("dateHistogram")
                        .field("dateOfBirth")
                        .dateHistogramInterval(DateHistogramInterval.days(10));
        return aggregation;
    }

    /**
     * 方法说明: 距离
     */
    public static AggregationBuilder geoDistance() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .geoDistance("geoDistance", new GeoPoint(48.84237171118314, 2.33320027692004))
                        .field("address.location")
                        .unit(DistanceUnit.KILOMETERS)
                        .addUnboundedTo(3.0)
                        .addRange(3.0, 10.0)
                        .addRange(10.0, 500.0);
        return aggregation;
    }

    /**
     * 方法说明:精度控制
     */
    public static AggregationBuilder geohashGrid() {
        AggregationBuilder aggregation =
                AggregationBuilders
                        .geohashGrid("geohashGrid")
                        .field("address.location")
                        .precision(4);
        return aggregation;
    }
}

Query-DSL——ES专用构造各种查询的语言

import elastic.learn.transport.ElasticsearchConfig;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.builders.CoordinatesBuilder;
import org.elasticsearch.common.geo.builders.ShapeBuilders;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.join.query.HasChildQueryBuilder;
import org.elasticsearch.join.query.HasParentQueryBuilder;
import org.elasticsearch.join.query.JoinQueryBuilders;
import org.elasticsearch.percolator.PercolateQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders.exponentialDecayFunction;
import static org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders.randomFunction;

public class UseQueryDSL {
    public static void main(String[] args) throws IOException {
        TransportClient client = ElasticsearchConfig.getElasticsearchClient();
        QueryBuilder matchAllQuery = QueryBuilders.matchAllQuery();
        HasParentQueryBuilder hasParentQuery = JoinQueryBuilders.hasParentQuery(
                "question",
                matchQuery("text", "another"),
                false);
        SearchResponse sr = client.prepareSearch("childrenindex")
                .setQuery(hasParentQuery)//这个地方替换各种QueryBuilder就可以私了
                .get();
        System.out.println(sr);

        //Build a document to check against the percolator
        XContentBuilder docBuilder = XContentFactory.jsonBuilder().startObject();
        docBuilder.field("message", "A new bonsai tree in the office");
        docBuilder.endObject(); //End of the JSON root object
        PercolateQueryBuilder percolateQuery = new PercolateQueryBuilder("query", BytesReference.bytes(docBuilder), XContentType.JSON);

        SearchResponse sr1 = client.prepareSearch("percolatorindex")
                .setQuery(percolateQuery)
                .get();
        System.out.println(sr1);
    }

    public static void fulltext() {
        QueryBuilder matchQuery = QueryBuilders.matchQuery(
                "name",
                "kimchy elasticsearch");
        QueryBuilder multiMatchQuery = QueryBuilders.multiMatchQuery(
                "kimchy elasticsearch",
                "user", "message");
        //停用词会被设置成should
        QueryBuilder commonTermsQuery = QueryBuilders.commonTermsQuery("name",
                "the kimchy");
        //基本的query技巧
        QueryBuilder queryStringQuery = QueryBuilders.queryStringQuery("+kimchy -elasticsearch");
        //simple_query_string是query_string的另一种版本,其更适合为用户提供一个搜索框中,因为其使用+/|/- 分别替换AND/OR/NOT,如果用输入了错误的查询,其直接忽略这种情况而不是抛出异常
        QueryBuilder simpleQueryStringQuery = QueryBuilders.simpleQueryStringQuery("+kimchy -elasticsearch");
    }

    public static void term() {
        QueryBuilder termQuery = QueryBuilders.termQuery(
                "name",
                "kimchy");
        QueryBuilder termsQuery = QueryBuilders.termsQuery("tags",
                "blue", "pill");
        QueryBuilder rangeQuery1 = QueryBuilders.rangeQuery("price")
                .from(5)
                .to(10)
                .includeLower(true)
                .includeUpper(false);
        QueryBuilder rangeQuery2 = QueryBuilders.rangeQuery("price")
                .gte("10")
                .lt("20");
        QueryBuilder existsQuery = QueryBuilders.existsQuery("name");
        QueryBuilder prefixQuery = QueryBuilders.prefixQuery(
                "brand",
                "heine");
        QueryBuilder wildcardQuery = QueryBuilders.wildcardQuery(
                "user",
                "k?mch*");
        QueryBuilder regexpQuery = QueryBuilders.regexpQuery(
                "name.first",
                "s.*y");
        QueryBuilder fuzzyQuery = QueryBuilders.fuzzyQuery(
                "name",
                "kimchy");
        QueryBuilder typeQuery = QueryBuilders.typeQuery("my_type");
        QueryBuilder idsQuery = QueryBuilders.idsQuery("my_type", "type2")
                .addIds("1", "4", "100");
    }

    public static void compound() {
        //只要匹配到就是一个相同的分
        ConstantScoreQueryBuilder constantScoreQuery = QueryBuilders.constantScoreQuery
                (QueryBuilders.termQuery("name", "kimchy"))
                .boost(2);
        BoolQueryBuilder boolQuery = boolQuery()
                .must(termQuery("content", "test1"))
                .must(termQuery("content", "test4"))
                .mustNot(termQuery("content", "test2"))
                .should(termQuery("content", "test3"))
                .filter(termQuery("content", "test5"));
        //dismax不管评分,多个查询条件时,匹配更多的要比匹配更少的要相关性更强
        DisMaxQueryBuilder disMaxQuery = disMaxQuery()
                .add(termQuery("name", "kimchy"))
                .add(termQuery("name", "elasticsearch"))
                .boost(1.2f)
                .tieBreaker(0.7f);
        //如果查询条件的得分很消耗资源又对结果影响不大,可以自己修改打分策略
        FunctionScoreQueryBuilder.FilterFunctionBuilder[] functions = {
                new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                        matchQuery("name", "kimchy"),
                        randomFunction()),
                new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                        exponentialDecayFunction("age", 0L, 1L))
        };
        functionScoreQuery(functions);
        BoostingQueryBuilder boostingQuery = boostingQuery(
                termQuery("name", "kimchy"),
                termQuery("name", "dadoonet"))
                .negativeBoost(0.2f);
    }

    public static void join() {
        NestedQueryBuilder nestedQuery = nestedQuery(
                "obj1",
                boolQuery()
                        .must(matchQuery("obj1.name", "blue"))
                        .must(rangeQuery("obj1.count").gt(5)),
                ScoreMode.Avg);
        HasChildQueryBuilder hasChildQuery = JoinQueryBuilders.hasChildQuery(
                "blog_tag",
                termQuery("tag", "something"),
                ScoreMode.None);
        HasParentQueryBuilder hasParentQuery = JoinQueryBuilders.hasParentQuery(
                "question",
                matchQuery("text", "another"),
                false);
    }

    //https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-geo-queries.html
    public static void geo() {
        GeoShapeQueryBuilder qb = null;
        try {
            qb = geoShapeQuery(
                    "pin.location",
                    ShapeBuilders.newMultiPoint(
                            new CoordinatesBuilder()
                                    .coordinate(0, 0)
                                    .coordinate(0, 10)
                                    .coordinate(10, 10)
                                    .coordinate(10, 0)
                                    .coordinate(0, 0)
                                    .build()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        qb.relation(ShapeRelation.WITHIN);
        GeoShapeQueryBuilder qb1 = geoShapeQuery(
                "pin.location",
                "DEU",
                "countries");
        qb1.relation(ShapeRelation.WITHIN)
                .indexedShapeIndex("shapes")
                .indexedShapePath("location");
        geoBoundingBoxQuery("pin.location")
                .setCorners(40.73, -74.1,
                        40.717, -73.99);
    }

    public static void specialized() throws IOException {
        String[] fields = {"name.first", "name.last"};
        String[] texts = {"text like this one"};

        moreLikeThisQuery(fields, texts, null)
                .minTermFreq(1)
                .maxQueryTerms(12);
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("param1", 5);
        scriptQuery(new Script(
                ScriptType.STORED,
                null,
                "myscript",
                singletonMap("param1", 5)));

        //Build a document to check against the percolator
        XContentBuilder docBuilder = XContentFactory.jsonBuilder().startObject();
        docBuilder.field("message", "A new bonsai tree in the office");
        docBuilder.endObject(); //End of the JSON root object
        PercolateQueryBuilder percolateQuery = new PercolateQueryBuilder("query", BytesReference.bytes(docBuilder), XContentType.JSON);


        //json作为Query接收
        String query = "{\"term\": {\"user\": \"kimchy\"}}";
        wrapperQuery(query);
    }

    public static void span() {
        spanTermQuery(
                "user",
                "kimchy");
        spanMultiTermQueryBuilder(
                prefixQuery("user", "ki"));
        spanFirstQuery(
                spanTermQuery("user", "kimchy"),
                3
        );
        spanNearQuery(
                spanTermQuery("field", "value1"),
                12)
                .addClause(spanTermQuery("field", "value2"))
                .addClause(spanTermQuery("field", "value3"))
                .inOrder(false);
        spanOrQuery(spanTermQuery("field","value1"))
                .addClause(spanTermQuery("field","value2"))
                .addClause(spanTermQuery("field","value3"));
        spanNotQuery(
                spanTermQuery("field","value1"),
                spanTermQuery("field","value2"));
        spanContainingQuery(
                spanNearQuery(spanTermQuery("field1","bar"), 5)
                        .addClause(spanTermQuery("field1","baz"))
                        .inOrder(true),
                spanTermQuery("field1","foo"));
        spanWithinQuery(
                spanNearQuery(spanTermQuery("field1", "bar"), 5)
                        .addClause(spanTermQuery("field1", "baz"))
                        .inOrder(true),
                spanTermQuery("field1", "foo"));
    }

}

Java API Administration

主要分为对索引的管理和对集群的管理,我觉得这个还是用kibana结合API好一点。。。

Indices

    TransportClient client = ElasticsearchConfig.getElasticsearchClient();
    IndicesAdminClient indicesAdminClient = client.admin().indices();
    //create an index
    client.admin().indices().prepareCreate("twitter").get();
    //Index Settings
    client.admin().indices().prepareCreate("twitter")
            .setSettings(Settings.builder()
                    .put("index.number_of_shards", 3)
                    .put("index.number_of_replicas", 2)
            )
            .get();
    //Put Mapping
    client.admin().indices().prepareCreate("twitter")
            .addMapping("tweet", "message", "type=text")
            .get();
    //get settings
    GetSettingsResponse response = client.admin().indices()
            .prepareGetSettings("company", "employee").get();
    for (ObjectObjectCursor<String, Settings> cursor : response.getIndexToSettings()) {
        String index = cursor.key;
        Settings settings = cursor.value;
        Integer shards = settings.getAsInt("index.number_of_shards", null);
        Integer replicas = settings.getAsInt("index.number_of_replicas", null);
    }
    //Update Indices Settings
    client.admin().indices().prepareUpdateSettings("twitter")
            .setSettings(Settings.builder()
                    .put("index.number_of_replicas", 0)
            )
            .get();

Cluster

    TransportClient client = ElasticsearchConfig.getElasticsearchClient();
    //health
    ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get();
    String clusterName = healths.getClusterName();
    int numberOfDataNodes = healths.getNumberOfDataNodes();
    int numberOfNodes = healths.getNumberOfNodes();

    for (ClusterIndexHealth health : healths.getIndices().values()) {
        String index = health.getIndex();
        int numberOfShards = health.getNumberOfShards();
        int numberOfReplicas = health.getNumberOfReplicas();
        ClusterHealthStatus status = health.getStatus();
        System.out.println(status);
    }
    PutStoredScriptResponse response = client.admin().cluster().preparePutStoredScript()
            .setId("script1")
            .setContent(new BytesArray("{\"script\": {\"lang\": \"painless\", \"source\": \"_score * doc['my_numeric_field'].value\"} }"), XContentType.JSON)
            .get();

    GetStoredScriptResponse response1 = client.admin().cluster().prepareGetStoredScript()
            .setId("script1")
            .get();

    DeleteStoredScriptResponse response2 = client.admin().cluster().prepareDeleteStoredScript()
            .setId("script1")
            .get();

发表评论

电子邮件地址不会被公开。