4747import org .apache .usergrid .persistence .index .utils .IndexValidationUtils ;
4848import org .apache .usergrid .persistence .model .entity .Id ;
4949import org .apache .usergrid .persistence .model .util .UUIDGenerator ;
50- import org .elasticsearch .action .ActionFuture ;
51- import org .elasticsearch .action .ActionListener ;
52- import org .elasticsearch .action .ListenableActionFuture ;
53- import org .elasticsearch .action .ShardOperationFailedException ;
50+ import org .elasticsearch .ResourceAlreadyExistsException ;
51+ import org .elasticsearch .action .*;
5452import org .elasticsearch .action .admin .cluster .health .ClusterHealthRequest ;
5553import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
5654import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequestBuilder ;
5957import org .elasticsearch .action .admin .indices .refresh .RefreshResponse ;
6058import org .elasticsearch .action .admin .indices .stats .CommonStats ;
6159import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
62- import org .elasticsearch .action .deletebyquery .DeleteByQueryResponse ;
63- import org .elasticsearch .action .deletebyquery .IndexDeleteByQueryResponse ;
60+ import org .elasticsearch .action .bulk .BulkItemResponse ;
6461import org .elasticsearch .action .search .SearchRequestBuilder ;
6562import org .elasticsearch .action .search .SearchResponse ;
6663import org .elasticsearch .client .AdminClient ;
67- import org .elasticsearch .common .settings .ImmutableSettings ;
6864import org .elasticsearch .common .settings .Settings ;
6965import org .elasticsearch .common .unit .TimeValue ;
66+ import org .elasticsearch .index .IndexNotFoundException ;
7067import org .elasticsearch .index .query .*;
7168import org .elasticsearch .index .query .QueryBuilder ;
72- import org .elasticsearch .indices . IndexAlreadyExistsException ;
73- import org .elasticsearch .indices . IndexMissingException ;
69+ import org .elasticsearch .index . reindex . BulkByScrollResponse ;
70+ import org .elasticsearch .index . reindex . DeleteByQueryAction ;
7471import org .elasticsearch .search .SearchHit ;
7572import org .elasticsearch .search .SearchHits ;
7673import org .elasticsearch .search .aggregations .metrics .sum .Sum ;
77- import org .elasticsearch .search .aggregations .metrics .sum .SumBuilder ;
74+ import org .elasticsearch .search .aggregations .metrics .sum .SumAggregationBuilder ;
7875import org .elasticsearch .search .sort .SortOrder ;
7976import org .slf4j .Logger ;
8077import org .slf4j .LoggerFactory ;
@@ -206,13 +203,13 @@ public void addIndex(final String indexName,
206203 //Create index
207204 try {
208205 final AdminClient admin = esProvider .getClient ().admin ();
209- Settings settings = ImmutableSettings . settingsBuilder ()
206+ Settings settings = Settings . builder ()
210207 .put ("index.number_of_shards" , numberOfShards )
211208 .put ("index.number_of_replicas" , numberOfReplicas )
212209 //dont' allow unmapped queries, and don't allow dynamic mapping
213210 .put ("index.query.parse.allow_unmapped_fields" , false )
214211 .put ("index.mapper.dynamic" , false )
215- .put ("action.write_consistency" , writeConsistency )
212+ // .put("action.write_consistency", writeConsistency)
216213 .build ();
217214
218215 //Added For Graphite Metrics
@@ -230,7 +227,7 @@ public void addIndex(final String indexName,
230227
231228
232229 logger .info ("Created new Index Name [{}] ACK=[{}]" , indexName , cir .isAcknowledged ());
233- } catch (IndexAlreadyExistsException e ) {
230+ } catch (ResourceAlreadyExistsException e ) {
234231 logger .info ("Index Name [{}] already exists" , indexName );
235232 }
236233 /**
@@ -244,7 +241,7 @@ public void addIndex(final String indexName,
244241
245242 testNewIndex ();
246243
247- } catch (IndexAlreadyExistsException expected ) {
244+ } catch (ResourceAlreadyExistsException expected ) {
248245 // this is expected to happen if index already exists, it's a no-op and swallow
249246 } catch (IOException e ) {
250247 throw new RuntimeException ("Unable to initialize index" , e );
@@ -452,8 +449,7 @@ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes s
452449 for (SortPredicate sortPredicate : parsedQuery .getSortPredicates () ){
453450 hasGeoSortPredicates = visitor .getGeoSorts ().contains (sortPredicate .getPropertyName ());
454451 }
455-
456-
452+
457453 final String cacheKey = applicationScope .getApplication ().getUuid ().toString ()+"_" +searchEdge .getEdgeName ();
458454 final Object totalEdgeSizeFromCache = sizeCache .getIfPresent (cacheKey );
459455 long totalEdgeSizeInBytes ;
@@ -498,6 +494,9 @@ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes s
498494 final Timer .Context timerContext = searchTimer .time ();
499495
500496 try {
497+ if (logger .isDebugEnabled ()) {
498+ logger .debug ("Query to execute = {}" , srb .toString ());
499+ }
501500
502501 searchResponse = srb .execute ().actionGet ();
503502 }
@@ -594,21 +593,25 @@ public Observable deleteApplication() {
594593 //Added For Graphite Metrics
595594 return Observable .from ( indexes ).flatMap ( index -> {
596595
597- final ListenableActionFuture <DeleteByQueryResponse > response =
598- esProvider .getClient ().prepareDeleteByQuery ( alias .getWriteAlias () ).setQuery ( tqb ).execute ();
596+ ListenableActionFuture <BulkByScrollResponse > response =
597+ DeleteByQueryAction .INSTANCE .newRequestBuilder ( esProvider .getClient ())
598+ .filter (tqb )
599+ .source (indexes )
600+ .execute ();
601+
599602
600- response .addListener ( new ActionListener <DeleteByQueryResponse >() {
603+ response .addListener ( new ActionListener <BulkByScrollResponse >() {
601604
602605 @ Override
603- public void onResponse ( DeleteByQueryResponse response ) {
606+ public void onResponse ( BulkByScrollResponse response ) {
604607 checkDeleteByQueryResponse ( tqb , response );
605608 }
606609
607-
608610 @ Override
609- public void onFailure ( Throwable e ) {
611+ public void onFailure (Exception e ) {
610612 logger .error ( "Failed on delete index" , e .getMessage () );
611613 }
614+
612615 } );
613616 return Observable .from ( response );
614617 } ).doOnError ( t -> logger .error ( "Failed on delete application" , t .getMessage () ) );
@@ -618,17 +621,14 @@ public void onFailure( Throwable e ) {
618621 /**
619622 * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
620623 */
621- private void checkDeleteByQueryResponse ( final QueryBuilder query , final DeleteByQueryResponse response ) {
624+ private void checkDeleteByQueryResponse ( final QueryBuilder query , final BulkByScrollResponse response ) {
622625
623- for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
624- final ShardOperationFailedException [] failures = indexDeleteByQueryResponse .getFailures ();
626+ List <BulkItemResponse .Failure > failures = response .getBulkFailures ();
625627
626- for ( ShardOperationFailedException failedException : failures ) {
627- logger .error ("Unable to delete by query {}. Failed with code {} and reason {} on shard {} in index {}" ,
628- query .toString (),
629- failedException .status ().getStatus (), failedException .reason (),
630- failedException .shardId (), failedException .index () );
631- }
628+
629+ for ( BulkItemResponse .Failure failure : failures ) {
630+ logger .error ("Unable to delete by query {}. Failed with code {} and reason {} in index {}" ,
631+ query .toString (), failure .getStatus () , failure .getMessage (), failure .getIndex ());
632632 }
633633 }
634634
@@ -817,7 +817,7 @@ private long getIndexSize(){
817817 .actionGet ();
818818 final CommonStats indexStats = statsResponse .getIndex (indexName ).getTotal ();
819819 indexSize = indexStats .getStore ().getSizeInBytes ();
820- } catch (IndexMissingException e ) {
820+ } catch (IndexNotFoundException e ) {
821821 // if for some reason the index size does not exist,
822822 // log an error and we can assume size is 0 as it doesn't exist
823823 logger .error ("Unable to get size for index {} due to IndexMissingException for app {}" ,
@@ -836,7 +836,7 @@ public long getTotalEntitySizeInBytes(final SearchEdge edge){
836836
837837 private long getEntitySizeAggregation ( final SearchRequestBuilder builder ) {
838838 final String key = "entitySize" ;
839- SumBuilder sumBuilder = new SumBuilder (key );
839+ SumAggregationBuilder sumBuilder = new SumAggregationBuilder (key );
840840 sumBuilder .field ("entitySize" );
841841 builder .addAggregation (sumBuilder );
842842
0 commit comments