1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.numbers.arrays;
19
20 /**
21 * Partition array data.
22 *
23 * <p>Note: Requires that the floating-point data contains no NaN values; sorting does not
24 * respect the order of signed zeros imposed by {@link Double#compare(double, double)};
25 * mixed signed zeros may be destroyed (the mixture updated during partitioning). The
26 * caller is responsible for counting a mixture of signed zeros and restoring them if
27 * required.
28 *
29 * @see Selection
30 * @since 1.2
31 */
32 final class QuickSelect {
33 // Implementation Notes
34 //
35 // Selection is performed using a quickselect variant to recursively divide the range
36 // to select the target index, or indices. Partition sizes or recursion are monitored
37 // will fall-backs on poor convergence of a linearselect (single index) or heapselect.
38 //
39 // Many implementations were tested, each with strengths and weaknesses on different
40 // input data containing random elements, repeat elements, elements with repeat
41 // patterns, and constant elements. The final implementation performs well across data
42 // types for single and multiple indices with no obvious weakness.
43 // See: o.a.c.numbers.examples.jmh.arrays for benchmarking implementations.
44 //
45 // Single indices are selected using a quickselect adaptive method based on Alexandrescu.
46 // The algorithm is a quickselect around a pivot identified using a
47 // sample-of-sample-of-samples created from the entire range data. This pivot will
48 // have known lower and upper margins and ensures elimination of a minimum fraction of
49 // data at each step. To increase speed the pivot can be identified using less of the data
50 // but without margin guarantees (sampling mode). The algorithm monitors partition sizes
51 // against the known margins. If the reduction in the partition size is not large enough
52 // then the algorithm can disable sampling mode and ensure linear performance by removing
53 // a set fraction of the data each iteration.
54 //
55 // Modifications from Alexandrescu are:
56 // 1. Initialise sampling mode using the Floyd-Rivest (FR) SELECT algorithm.
57 // 2. Adaption is adjusted to force use of the lower margin in the far-step method when
58 // sampling is disabled.
59 // 3. Change the far-step method to a min-of-4 then median-of-3 into the 2nd 12th-tile.
60 // The original method uses a lower-median-of-4, min-of-3 into the 4th 12th-tile.
61 // 4. Position the sample around the target k when in sampling mode for the non-far-step
62 // methods.
63 //
64 // The far step method is used when target k is within 1/12 of the end of the data A.
65 // The differences in the far-step method are:
66 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
67 // - The position of the sample is closer to the expected location of k < |A|/12.
68 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
69 // Note the original min-of-3 sample is more likely to create a pivot too small if used
70 // with adaption leaving k in the larger partition and a wasted iteration.
71 //
72 // The Floyd-Rivest (FR) SELECT algorithm is preferred for sampling over using quickselect
73 // adaptive sampling. It uses a smaller sample and has improved heuristics to place the sample
74 // pivot. However the FR sample is a small range of the data and pivot selection can be poor
75 // if the sample is not representative. This can be mitigated by creating a random sample
76 // of the entire range for the pivot selection. This implementation does not use random
77 // sampling for the FR mode. Performance is identical on random data (randomisation is a
78 // negligible overhead) and faster on sorted data. Any data not suitable for the FR algorithm
79 // are immediately switched to the quickselect adaptive algorithm with sampling. Performance
80 // across a range of data shows this strategy is approximately mid-way in performance between
81 // FR with random sampling, and quickselect adaptive in sampling mode. The benefit is that
82 // sorted or partially partitioned data are not intentionally unordered as the method will
83 // only move elements known to be incorrectly placed in the array.
84 //
85 // Multiple indices are selected using a dual-pivot partition method by
86 // Yaroslavskiy to divide the interval containing the indices. Recursion is performed for
87 // regions containing target indices. The method is thus a partial quicksort into regions of
88 // interest. Excess recursion triggers use of a heapselect. When indices are effectively
89 // a single index the method can switch to the single index selection to use the FR algorithm.
90 //
91 // Alternative schemes to partition multiple indices are to repeat call single index select
92 // with cached pivots, or without cached pivots if processing indices in order as the previous
93 // index brackets the range for the next search. Caching pivots is the most effective
94 // alternative. It requires storing all pivots during select, and using the cache to look-up
95 // the search bounds (sub-range) for each target index. This requires 2n searches for n indices.
96 // All pivots must be stored to avoid destroying previously partitioned data on repeat entry
97 // to the array. The current scheme inverts this by requiring at most n-1 divides of the
98 // indices during recursion and has the advantage of tracking recursion depth during selection
99 // for each sub-range. Division of indices is a small overhead for the common case where
100 // the number of indices is far smaller than the size of the data.
101 //
102 // Dual-pivot partitioning adapted from Yaroslavskiy
103 // http://codeblab.com/wp-content/uploads/2009/09/DualPivotQuicksort.pdf
104 //
105 // Modified to allow partial sorting (partitioning):
106 // - Ignore insertion sort for tiny array (handled by calling code).
107 // - Ignore recursive calls for a full sort (handled by calling code).
108 // - Change to fast-forward over initial ascending / descending runs.
109 // - Change to fast-forward great when v > v2 and either break the sorting
110 // loop, or move a[great] direct to the correct location.
111 // - Change to use the 2nd and 4th of 5 elements for the pivots.
112 // - Identify a large central region using ~5/8 of the length to trigger search for
113 // equal values.
114 //
115 // For some indices and data a full sort of the data will be faster; this is impossible to
116 // predict on unknown data input and attempts to analyse the indices and data impact
117 // performance for the majority of use cases where sorting is not a suitable choice.
118 // Use of the sortselect finisher allows the current multiple indices method to degrade
119 // to a (non-optimised) dual-pivot quicksort (see below).
120 //
121 // heapselect vs sortselect
122 //
123 // Quickselect can switch to an alternative when: the range is very small
124 // (e.g. insertion sort); or the target index is close to the end (e.g. heapselect).
125 // Small ranges and a target index close to the end are handled using a hybrid of insertion
126 // sort and selection (sortselect). This is faster than heapselect for small distance from
127 // the edge (m) for a single index and has the advantage of sorting all upstream values from
128 // the target index (heapselect requires push-down of each successive value to sort). This
129 // allows the dual-pivot quickselect on multiple indices that saturate the range to degrade
130 // to a (non-optimised) dual-pivot quicksort. However sortselect is worst case Order(m * (r-l))
131 // for range [l, r] so cannot be used when quickselect fails to converge as m may be very large.
132 // Thus heapselect is used as the stopper algorithm when quickselect progress is slow on
133 // multiple indices. If heapselect is used for small range handling the performance on
134 // saturated indices is significantly slower. Hence the presence of two final selection
135 // methods for different purposes.
136
137 /** Sampling mode using Floyd-Rivest sampling. */
138 static final int MODE_FR_SAMPLING = -1;
139 /** Sampling mode. */
140 static final int MODE_SAMPLING = 0;
141 /** No sampling but use adaption of the target k. */
142 static final int MODE_ADAPTION = 1;
143 /** No sampling and no adaption of target k (strict margins). */
144 static final int MODE_STRICT = 2;
145
146 /** Minimum size for sortselect.
147 * Below this perform a sort rather than selection. This is used to avoid
148 * sort select on tiny data. */
149 private static final int MIN_SORTSELECT_SIZE = 4;
150 /** Single-pivot sortselect size for quickselect adaptive. Note that quickselect adaptive
151 * recursively calls quickselect so very small lengths are included with an initial medium
152 * length. Using lengths of 1023-5 and 2043-53 indicate optimum performance around 20-30.
153 * Note: The expand partition function assumes a sample of at least length 2 as each end
154 * of the sample is used as a sentinel; this imposes a minimum length of 24 on the range
155 * to ensure it contains a 12-th tile of length 2. Thus the absolute minimum for the
156 * distance from the edge is 12. */
157 private static final int LINEAR_SORTSELECT_SIZE = 24;
158 /** Dual-pivot sortselect size for the distance of a single k from the edge of the
159 * range length n. Benchmarking in range [81+81, 243+243] suggests a value of ~20 (or
160 * higher on some hardware). Ranges are chosen based on third interval spacing between
161 * powers of 3.
162 *
163 * <p>Sortselect is faster at this small size than heapselect. A second advantage is
164 * that all indices closer to the edge than the target index are also sorted. This
165 * allows selection of multiple close indices to be performed with effectively the
166 * same speed. High density indices will result in recursion to very short fragments
167 * which also trigger use of sort select. The threshold for sorting short lengths is
168 * configured in {@link #dualPivotSortSelectSize(int, int)}. */
169 private static final int DP_SORTSELECT_SIZE = 20;
170 /** Threshold to use Floyd-Rivest sub-sampling. This partitions a sample of the data to
171 * identify a pivot so that the target element is in the smaller set after partitioning.
172 * The original FR paper used 600 otherwise reverted to the target index as the pivot.
173 * This implementation reverts to quickselect adaptive which increases robustness
174 * at small size on a variety of data and allows raising the original FR threshold. */
175 private static final int FR_SAMPLING_SIZE = 1200;
176
177 /** Increment used for the recursion counter. The counter will overflow to negative when
178 * recursion has exceeded the maximum level. The counter is maintained in the upper bits
179 * of the dual-pivot control flags. */
180 private static final int RECURSION_INCREMENT = 1 << 20;
181 /** Mask to extract the sort select size from the dual-pivot control flags. Currently
182 * the bits below those used for the recursion counter are only used for the sort select size
183 * so this can use a mask with all bits below the increment. */
184 private static final int SORTSELECT_MASK = RECURSION_INCREMENT - 1;
185
186 /** Threshold to use repeated step left: 7 / 16. */
187 private static final double STEP_LEFT = 0.4375;
188 /** Threshold to use repeated step right: 9 / 16. */
189 private static final double STEP_RIGHT = 0.5625;
190 /** Threshold to use repeated step far-left: 1 / 12. */
191 private static final double STEP_FAR_LEFT = 0.08333333333333333;
192 /** Threshold to use repeated step far-right: 11 / 12. */
193 private static final double STEP_FAR_RIGHT = 0.9166666666666666;
194
195 /** No instances. */
196 private QuickSelect() {}
197
198 /**
199 * Partition the elements between {@code ka} and {@code kb} using a heap select
200 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
201 *
202 * @param a Data array to use to find out the K<sup>th</sup> value.
203 * @param left Lower bound (inclusive).
204 * @param right Upper bound (inclusive).
205 * @param ka Lower index to select.
206 * @param kb Upper index to select.
207 */
208 static void heapSelect(double[] a, int left, int right, int ka, int kb) {
209 if (right <= left) {
210 return;
211 }
212 // Use the smallest heap
213 if (kb - left < right - ka) {
214 heapSelectLeft(a, left, right, ka, kb);
215 } else {
216 heapSelectRight(a, left, right, ka, kb);
217 }
218 }
219
220 /**
221 * Partition the elements between {@code ka} and {@code kb} using a heap select
222 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
223 *
224 * <p>For best performance this should be called with {@code k} in the lower
225 * half of the range.
226 *
227 * @param a Data array to use to find out the K<sup>th</sup> value.
228 * @param left Lower bound (inclusive).
229 * @param right Upper bound (inclusive).
230 * @param ka Lower index to select.
231 * @param kb Upper index to select.
232 */
233 static void heapSelectLeft(double[] a, int left, int right, int ka, int kb) {
234 // Create a max heap in-place in [left, k], rooted at a[left] = max
235 // |l|-max-heap-|k|--------------|
236 // Build the heap using Floyd's heap-construction algorithm for heap size n.
237 // Start at parent of the last element in the heap (k),
238 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left
239 int end = kb + 1;
240 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) {
241 maxHeapSiftDown(a, a[p], p, left, end);
242 }
243 // Scan the remaining data and insert
244 // Mitigate worst case performance on descending data by backward sweep
245 double max = a[left];
246 for (int i = right; i > kb; i--) {
247 final double v = a[i];
248 if (v < max) {
249 a[i] = max;
250 maxHeapSiftDown(a, v, left, left, end);
251 max = a[left];
252 }
253 }
254 // Partition [ka, kb]
255 // |l|-max-heap-|k|--------------|
256 // | <-swap-> | then sift down reduced size heap
257 // Avoid sifting heap of size 1
258 final int last = Math.max(left, ka - 1);
259 while (--end > last) {
260 maxHeapSiftDown(a, a[end], left, left, end);
261 a[end] = max;
262 max = a[left];
263 }
264 }
265
266 /**
267 * Sift the element down the max heap.
268 *
269 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root.
270 *
271 * @param a Heap data.
272 * @param v Value to sift.
273 * @param p Start position.
274 * @param root Root of the heap.
275 * @param end End of the heap (exclusive).
276 */
277 private static void maxHeapSiftDown(double[] a, double v, int p, int root, int end) {
278 // child2 = root + 2 * (parent - root) + 2
279 // = 2 * parent - root + 2
280 while (true) {
281 // Right child
282 int c = (p << 1) - root + 2;
283 if (c > end) {
284 // No left child
285 break;
286 }
287 // Use the left child if right doesn't exist, or it is greater
288 if (c == end || a[c] < a[c - 1]) {
289 --c;
290 }
291 if (v >= a[c]) {
292 // Parent greater than largest child - done
293 break;
294 }
295 // Swap and descend
296 a[p] = a[c];
297 p = c;
298 }
299 a[p] = v;
300 }
301
302 /**
303 * Partition the elements between {@code ka} and {@code kb} using a heap select
304 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
305 *
306 * <p>For best performance this should be called with {@code k} in the upper
307 * half of the range.
308 *
309 * @param a Data array to use to find out the K<sup>th</sup> value.
310 * @param left Lower bound (inclusive).
311 * @param right Upper bound (inclusive).
312 * @param ka Lower index to select.
313 * @param kb Upper index to select.
314 */
315 static void heapSelectRight(double[] a, int left, int right, int ka, int kb) {
316 // Create a min heap in-place in [k, right], rooted at a[right] = min
317 // |--------------|k|-min-heap-|r|
318 // Build the heap using Floyd's heap-construction algorithm for heap size n.
319 // Start at parent of the last element in the heap (k),
320 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k
321 int end = ka - 1;
322 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) {
323 minHeapSiftDown(a, a[p], p, right, end);
324 }
325 // Scan the remaining data and insert
326 // Mitigate worst case performance on descending data by backward sweep
327 double min = a[right];
328 for (int i = left; i < ka; i++) {
329 final double v = a[i];
330 if (v > min) {
331 a[i] = min;
332 minHeapSiftDown(a, v, right, right, end);
333 min = a[right];
334 }
335 }
336 // Partition [ka, kb]
337 // |--------------|k|-min-heap-|r|
338 // | <-swap-> | then sift down reduced size heap
339 // Avoid sifting heap of size 1
340 final int last = Math.min(right, kb + 1);
341 while (++end < last) {
342 minHeapSiftDown(a, a[end], right, right, end);
343 a[end] = min;
344 min = a[right];
345 }
346 }
347
348 /**
349 * Sift the element down the min heap.
350 *
351 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root.
352 *
353 * @param a Heap data.
354 * @param v Value to sift.
355 * @param p Start position.
356 * @param root Root of the heap.
357 * @param end End of the heap (exclusive).
358 */
359 private static void minHeapSiftDown(double[] a, double v, int p, int root, int end) {
360 // child2 = root - 2 * (root - parent) - 2
361 // = 2 * parent - root - 2
362 while (true) {
363 // Right child
364 int c = (p << 1) - root - 2;
365 if (c < end) {
366 // No left child
367 break;
368 }
369 // Use the left child if right doesn't exist, or it is less
370 if (c == end || a[c] > a[c + 1]) {
371 ++c;
372 }
373 if (v <= a[c]) {
374 // Parent less than smallest child - done
375 break;
376 }
377 // Swap and descend
378 a[p] = a[c];
379 p = c;
380 }
381 a[p] = v;
382 }
383
384 /**
385 * Partition the elements between {@code ka} and {@code kb} using a sort select
386 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
387 *
388 * @param a Data array to use to find out the K<sup>th</sup> value.
389 * @param left Lower bound (inclusive).
390 * @param right Upper bound (inclusive).
391 * @param ka Lower index to select.
392 * @param kb Upper index to select.
393 */
394 static void sortSelect(double[] a, int left, int right, int ka, int kb) {
395 // Combine the test for right <= left with
396 // avoiding the overhead of sort select on tiny data.
397 if (right - left <= MIN_SORTSELECT_SIZE) {
398 Sorting.sort(a, left, right);
399 return;
400 }
401 // Sort the smallest side
402 if (kb - left < right - ka) {
403 sortSelectLeft(a, left, right, kb);
404 } else {
405 sortSelectRight(a, left, right, ka);
406 }
407 }
408
409 /**
410 * Partition the minimum {@code n} elements below {@code k} where
411 * {@code n = k - left + 1}. Uses an insertion sort algorithm.
412 *
413 * <p>Works with any {@code k} in the range {@code left <= k <= right}
414 * and performs a full sort of the range below {@code k}.
415 *
416 * <p>For best performance this should be called with
417 * {@code k - left < right - k}, i.e.
418 * to partition a value in the lower half of the range.
419 *
420 * @param a Data array to use to find out the K<sup>th</sup> value.
421 * @param left Lower bound (inclusive).
422 * @param right Upper bound (inclusive).
423 * @param k Index to select.
424 */
425 static void sortSelectLeft(double[] a, int left, int right, int k) {
426 // Sort
427 for (int i = left; ++i <= k;) {
428 final double v = a[i];
429 // Move preceding higher elements above (if required)
430 if (v < a[i - 1]) {
431 int j = i;
432 while (--j >= left && v < a[j]) {
433 a[j + 1] = a[j];
434 }
435 a[j + 1] = v;
436 }
437 }
438 // Scan the remaining data and insert
439 // Mitigate worst case performance on descending data by backward sweep
440 double m = a[k];
441 for (int i = right; i > k; i--) {
442 final double v = a[i];
443 if (v < m) {
444 a[i] = m;
445 int j = k;
446 while (--j >= left && v < a[j]) {
447 a[j + 1] = a[j];
448 }
449 a[j + 1] = v;
450 m = a[k];
451 }
452 }
453 }
454
455 /**
456 * Partition the maximum {@code n} elements above {@code k} where
457 * {@code n = right - k + 1}. Uses an insertion sort algorithm.
458 *
459 * <p>Works with any {@code k} in the range {@code left <= k <= right}
460 * and can be used to perform a full sort of the range above {@code k}.
461 *
462 * <p>For best performance this should be called with
463 * {@code k - left > right - k}, i.e.
464 * to partition a value in the upper half of the range.
465 *
466 * @param a Data array to use to find out the K<sup>th</sup> value.
467 * @param left Lower bound (inclusive).
468 * @param right Upper bound (inclusive).
469 * @param k Index to select.
470 */
471 static void sortSelectRight(double[] a, int left, int right, int k) {
472 // Sort
473 for (int i = right; --i >= k;) {
474 final double v = a[i];
475 // Move succeeding lower elements below (if required)
476 if (v > a[i + 1]) {
477 int j = i;
478 while (++j <= right && v > a[j]) {
479 a[j - 1] = a[j];
480 }
481 a[j - 1] = v;
482 }
483 }
484 // Scan the remaining data and insert
485 // Mitigate worst case performance on descending data by backward sweep
486 double m = a[k];
487 for (int i = left; i < k; i++) {
488 final double v = a[i];
489 if (v > m) {
490 a[i] = m;
491 int j = k;
492 while (++j <= right && v > a[j]) {
493 a[j - 1] = a[j];
494 }
495 a[j - 1] = v;
496 m = a[k];
497 }
498 }
499 }
500
501 /**
502 * Partition the array such that index {@code k} corresponds to its correctly
503 * sorted value in the equivalent fully sorted array.
504 *
505 * <p>Assumes {@code k} is a valid index into [left, right].
506 *
507 * @param a Values.
508 * @param left Lower bound of data (inclusive).
509 * @param right Upper bound of data (inclusive).
510 * @param k Index.
511 */
512 static void select(double[] a, int left, int right, int k) {
513 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING);
514 }
515
516 /**
517 * Partition the array such that indices {@code k} correspond to their correctly
518 * sorted value in the equivalent fully sorted array.
519 *
520 * <p>The count of the number of used indices is returned. If the keys are sorted in-place,
521 * the count is returned as a negative.
522 *
523 * @param a Values.
524 * @param left Lower bound of data (inclusive).
525 * @param right Upper bound of data (inclusive).
526 * @param k Indices (may be destructively modified).
527 * @param n Count of indices.
528 * @return the count of used indices
529 */
530 static int select(double[] a, int left, int right, int[] k, int n) {
531 if (n < 1) {
532 return 0;
533 }
534 if (n == 1) {
535 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING);
536 return -1;
537 }
538
539 // Interval creation validates the indices are in [left, right]
540 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(k, n);
541
542 // Save number of used indices
543 final int count = IndexSupport.countIndices(keys, n);
544
545 // Note: If the keys are not separated then they are effectively a single key.
546 // Any split of keys separated by the sort select size
547 // will be finished on the next iteration.
548 final int k1 = keys.left();
549 final int kn = keys.right();
550 if (kn - k1 < DP_SORTSELECT_SIZE) {
551 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING);
552 } else {
553 // Dual-pivot mode with small range sort length configured using index density
554 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn));
555 }
556 return count;
557 }
558
559 /**
560 * Partition the array such that indices {@code k} correspond to their
561 * correctly sorted value in the equivalent fully sorted array.
562 *
563 * <p>For all indices {@code [ka, kb]} and any index {@code i}:
564 *
565 * <pre>{@code
566 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i]
567 * }</pre>
568 *
569 * <p>This function accepts indices {@code [ka, kb]} that define the
570 * range of indices to partition. It is expected that the range is small.
571 *
572 * <p>The {@code flags} are used to control the sampling mode and adaption of
573 * the index within the sample.
574 *
575 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher
576 * than the keys if equal values are present in the data.
577 *
578 * @param a Values.
579 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
580 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
581 * @param ka First key of interest.
582 * @param kb Last key of interest.
583 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive).
584 * @param flags Adaption flags.
585 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive).
586 */
587 static int quickSelectAdaptive(double[] a, int left, int right, int ka, int kb,
588 int[] bounds, int flags) {
589 int l = left;
590 int r = right;
591 int m = flags;
592 while (true) {
593 // Select when ka and kb are close to the same end
594 // |l|-----|ka|kkkkkkkk|kb|------|r|
595 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) {
596 sortSelect(a, l, r, ka, kb);
597 bounds[0] = kb;
598 return ka;
599 }
600
601 // Only target ka; kb is assumed to be close
602 int p0;
603 final int n = r - l;
604 // f in [0, 1]
605 final double f = (double) (ka - l) / n;
606 // Record the larger margin (start at 1/4) to create the estimated size.
607 // step L R
608 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328)
609 // left 1/6 1/4
610 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219)
611 int margin = n >> 2;
612 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) {
613 // Floyd-Rivest sample step uses the same margins
614 p0 = sampleStep(a, l, r, ka, bounds);
615 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) {
616 margin += (n >> 5) + (n >> 6);
617 } else if (f > STEP_LEFT && f < STEP_RIGHT) {
618 margin -= n >> 5;
619 }
620 } else if (f <= STEP_LEFT) {
621 if (f <= STEP_FAR_LEFT) {
622 margin += (n >> 5) + (n >> 6);
623 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m);
624 } else {
625 p0 = repeatedStepLeft(a, l, r, ka, bounds, m);
626 }
627 } else if (f >= STEP_RIGHT) {
628 if (f >= STEP_FAR_RIGHT) {
629 margin += (n >> 5) + (n >> 6);
630 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m);
631 } else {
632 p0 = repeatedStepRight(a, l, r, ka, bounds, m);
633 }
634 } else {
635 margin -= n >> 5;
636 p0 = repeatedStep(a, l, r, ka, bounds, m);
637 }
638
639 // Note: Here we expect [ka, kb] to be small and splitting is unlikely.
640 // p0 p1
641 // |l|--|ka|kkkk|kb|--|P|-------------------|r|
642 // |l|----------------|P|--|ka|kkk|kb|------|r|
643 // |l|-----------|ka|k|P|k|kb|--------------|r|
644 final int p1 = bounds[0];
645 if (kb < p0) {
646 // Entirely on left side
647 r = p0 - 1;
648 } else if (ka > p1) {
649 // Entirely on right side
650 l = p1 + 1;
651 } else {
652 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish.
653 // Here we set the bounds for use after median-of-medians pivot selection.
654 // In the event there are many equal values this allows collecting those
655 // known to be equal together when moving around the medians sample.
656 if (kb > p1) {
657 sortSelectLeft(a, p1 + 1, r, kb);
658 bounds[0] = kb;
659 }
660 if (ka < p0) {
661 sortSelectRight(a, l, p0 - 1, ka);
662 p0 = ka;
663 }
664 return p0;
665 }
666 // Update mode based on target partition size
667 if (r - l > n - margin) {
668 m++;
669 }
670 }
671 }
672
673 /**
674 * Partition an array slice around a pivot. Partitioning exchanges array elements such
675 * that all elements smaller than pivot are before it and all elements larger than
676 * pivot are after it.
677 *
678 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will
679 * fall in the smaller partition when the entire range is partitioned.
680 *
681 * <p>Assumes the range {@code r - l} is large.
682 *
683 * @param a Data array.
684 * @param l Lower bound (inclusive).
685 * @param r Upper bound (inclusive).
686 * @param k Target index.
687 * @param upper Upper bound (inclusive) of the pivot range.
688 * @return Lower bound (inclusive) of the pivot range.
689 */
690 private static int sampleStep(double[] a, int l, int r, int k, int[] upper) {
691 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate
692 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the
693 // (k-l+1)-th element is expected to lie in the smaller set after partitioning.
694 final int n = r - l + 1;
695 final int ith = k - l + 1;
696 final double z = Math.log(n);
697 // sample size = 0.5 * n^(2/3)
698 final double s = 0.5 * Math.exp(0.6666666666666666 * z);
699 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1));
700 final int ll = Math.max(l, (int) (k - ith * s / n + sd));
701 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd));
702 // Sample recursion restarts from [ll, rr]
703 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING);
704 return expandPartition(a, l, r, ll, rr, p, upper[0], upper);
705 }
706
707 /**
708 * Partition an array slice around a pivot. Partitioning exchanges array elements such
709 * that all elements smaller than pivot are before it and all elements larger than
710 * pivot are after it.
711 *
712 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller
713 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}.
714 *
715 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
716 * with the median of 3 then median of 3; the final sample is placed in the
717 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
718 *
719 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9.
720 *
721 * @param a Data array.
722 * @param l Lower bound (inclusive).
723 * @param r Upper bound (inclusive).
724 * @param k Target index.
725 * @param upper Upper bound (inclusive) of the pivot range.
726 * @param flags Control flags.
727 * @return Lower bound (inclusive) of the pivot range.
728 */
729 private static int repeatedStep(double[] a, int l, int r, int k, int[] upper, int flags) {
730 // Adapted from Alexandrescu (2016), algorithm 8.
731 final int fp;
732 final int s;
733 int p;
734 if (flags <= MODE_SAMPLING) {
735 // Median into a 12th-tile
736 fp = (r - l + 1) / 12;
737 // Position the sample around the target k
738 s = k - mapDistance(k - l, l, r, fp);
739 p = k;
740 } else {
741 // i in tertile [3f':6f')
742 fp = (r - l + 1) / 9;
743 final int f3 = 3 * fp;
744 final int end = l + (f3 << 1);
745 for (int i = l + f3; i < end; i++) {
746 Sorting.sort3(a, i - f3, i, i + f3);
747 }
748 // 5th 9th-tile: [4f':5f')
749 s = l + (fp << 2);
750 // No adaption uses the middle to enforce strict margins
751 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
752 }
753 final int e = s + fp - 1;
754 for (int i = s; i <= e; i++) {
755 Sorting.sort3(a, i - fp, i, i + fp);
756 }
757 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
758 return expandPartition(a, l, r, s, e, p, upper[0], upper);
759 }
760
761 /**
762 * Partition an array slice around a pivot. Partitioning exchanges array elements such
763 * that all elements smaller than pivot are before it and all elements larger than
764 * pivot are after it.
765 *
766 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
767 * range.
768 *
769 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
770 * with the lower median of 4 then either median of 3 with the final sample placed in the
771 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile;
772 * the pivot chosen from the sample is adaptive using the input {@code k}.
773 *
774 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4.
775 *
776 * @param a Data array.
777 * @param l Lower bound (inclusive).
778 * @param r Upper bound (inclusive).
779 * @param k Target index.
780 * @param upper Upper bound (inclusive) of the pivot range.
781 * @param flags Control flags.
782 * @return Lower bound (inclusive) of the pivot range.
783 */
784 private static int repeatedStepLeft(double[] a, int l, int r, int k, int[] upper, int flags) {
785 // Adapted from Alexandrescu (2016), algorithm 9.
786 final int fp;
787 final int s;
788 int p;
789 if (flags <= MODE_SAMPLING) {
790 // Median into a 12th-tile
791 fp = (r - l + 1) / 12;
792 // Position the sample around the target k
793 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12
794 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp);
795 p = k;
796 } else {
797 // i in 2nd quartile
798 final int f = (r - l + 1) >> 2;
799 final int f2 = f + f;
800 final int end = l + f2;
801 for (int i = l + f; i < end; i++) {
802 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2);
803 }
804 // i in 5th 12th-tile
805 fp = f / 3;
806 s = l + f + fp;
807 // No adaption uses the middle to enforce strict margins
808 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
809 }
810 final int e = s + fp - 1;
811 for (int i = s; i <= e; i++) {
812 Sorting.sort3(a, i - fp, i, i + fp);
813 }
814 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
815 return expandPartition(a, l, r, s, e, p, upper[0], upper);
816 }
817
818 /**
819 * Partition an array slice around a pivot. Partitioning exchanges array elements such
820 * that all elements smaller than pivot are before it and all elements larger than
821 * pivot are after it.
822 *
823 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
824 * range.
825 *
826 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
827 * with the upper median of 4 then either median of 3 with the final sample placed in the
828 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile;
829 * the pivot chosen from the sample is adaptive using the input {@code k}.
830 *
831 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6.
832 *
833 * @param a Data array.
834 * @param l Lower bound (inclusive).
835 * @param r Upper bound (inclusive).
836 * @param k Target index.
837 * @param upper Upper bound (inclusive) of the pivot range.
838 * @param flags Control flags.
839 * @return Lower bound (inclusive) of the pivot range.
840 */
841 private static int repeatedStepRight(double[] a, int l, int r, int k, int[] upper, int flags) {
842 // Mirror image repeatedStepLeft using upper median into 3rd quartile
843 final int fp;
844 final int e;
845 int p;
846 if (flags <= MODE_SAMPLING) {
847 // Median into a 12th-tile
848 fp = (r - l + 1) / 12;
849 // Position the sample around the target k
850 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12
851 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp);
852 p = k;
853 } else {
854 // i in 3rd quartile
855 final int f = (r - l + 1) >> 2;
856 final int f2 = f + f;
857 final int end = r - f2;
858 for (int i = r - f; i > end; i--) {
859 Sorting.upperMedian4(a, i - f2, i - f, i, i + f);
860 }
861 // i in 8th 12th-tile
862 fp = f / 3;
863 e = r - f - fp;
864 // No adaption uses the middle to enforce strict margins
865 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1));
866 }
867 final int s = e - fp + 1;
868 for (int i = s; i <= e; i++) {
869 Sorting.sort3(a, i - fp, i, i + fp);
870 }
871 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
872 return expandPartition(a, l, r, s, e, p, upper[0], upper);
873 }
874
875 /**
876 * Partition an array slice around a pivot. Partitioning exchanges array elements such
877 * that all elements smaller than pivot are before it and all elements larger than
878 * pivot are after it.
879 *
880 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
881 * range.
882 *
883 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
884 * with the minimum of 4 then median of 3; the final sample is placed in the
885 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
886 *
887 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3.
888 *
889 * @param a Data array.
890 * @param l Lower bound (inclusive).
891 * @param r Upper bound (inclusive).
892 * @param k Target index.
893 * @param upper Upper bound (inclusive) of the pivot range.
894 * @param flags Control flags.
895 * @return Lower bound (inclusive) of the pivot range.
896 */
897 private static int repeatedStepFarLeft(double[] a, int l, int r, int k, int[] upper, int flags) {
898 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3
899 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile.
900 // The differences are:
901 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
902 // - The position of the sample is closer to the expected location of k < |A| / 12.
903 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
904 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving
905 // k in the larger parition and a wasted iteration.
906 // - Adaption is adjusted to force use of the lower margin when not sampling.
907 final int fp;
908 final int s;
909 int p;
910 if (flags <= MODE_SAMPLING) {
911 // 2nd 12th-tile
912 fp = (r - l + 1) / 12;
913 s = l + fp;
914 // Use adaption
915 p = s + mapDistance(k - l, l, r, fp);
916 } else {
917 // i in 2nd quartile; min into i-f (1st quartile)
918 final int f = (r - l + 1) >> 2;
919 final int f2 = f + f;
920 final int end = l + f2;
921 for (int i = l + f; i < end; i++) {
922 if (a[i + f] < a[i - f]) {
923 final double u = a[i + f];
924 a[i + f] = a[i - f];
925 a[i - f] = u;
926 }
927 if (a[i + f2] < a[i]) {
928 final double v = a[i + f2];
929 a[i + f2] = a[i];
930 a[i] = v;
931 }
932 if (a[i] < a[i - f]) {
933 final double u = a[i];
934 a[i] = a[i - f];
935 a[i - f] = u;
936 }
937 }
938 // 2nd 12th-tile
939 fp = f / 3;
940 s = l + fp;
941 // Lower margin has 2(d+1) elements; d == (position in sample) - s
942 // Force k into the lower margin. Note p will be within [s, e]
943 // if: (double) (k - l) / (r - l) < 1/12 => (k - l) / 2 < (r - l) / 24
944 p = s + ((k - l) >>> 1);
945 }
946 final int e = s + fp - 1;
947 for (int i = s; i <= e; i++) {
948 Sorting.sort3(a, i - fp, i, i + fp);
949 }
950 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
951 return expandPartition(a, l, r, s, e, p, upper[0], upper);
952 }
953
954 /**
955 * Partition an array slice around a pivot. Partitioning exchanges array elements such
956 * that all elements smaller than pivot are before it and all elements larger than
957 * pivot are after it.
958 *
959 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
960 * range.
961 *
962 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
963 * with the maximum of 4 then median of 3; the final sample is placed in the
964 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
965 *
966 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12.
967 *
968 * @param a Data array.
969 * @param l Lower bound (inclusive).
970 * @param r Upper bound (inclusive).
971 * @param k Target index.
972 * @param upper Upper bound (inclusive) of the pivot range.
973 * @param flags Control flags.
974 * @return Lower bound (inclusive) of the pivot range.
975 */
976 private static int repeatedStepFarRight(double[] a, int l, int r, int k, int[] upper, int flags) {
977 // Mirror image repeatedStepFarLeft
978 final int fp;
979 final int e;
980 int p;
981 if (flags <= MODE_SAMPLING) {
982 // 11th 12th-tile
983 fp = (r - l + 1) / 12;
984 e = r - fp;
985 // Use adaption
986 p = e - mapDistance(r - k, l, r, fp);
987 } else {
988 // i in 3rd quartile; max into i+f (4th quartile)
989 final int f = (r - l + 1) >> 2;
990 final int f2 = f + f;
991 final int end = r - f2;
992 for (int i = r - f; i > end; i--) {
993 if (a[i - f] > a[i + f]) {
994 final double u = a[i - f];
995 a[i - f] = a[i + f];
996 a[i + f] = u;
997 }
998 if (a[i - f2] > a[i]) {
999 final double v = a[i - f2];
1000 a[i - f2] = a[i];
1001 a[i] = v;
1002 }
1003 if (a[i] > a[i + f]) {
1004 final double u = a[i];
1005 a[i] = a[i + f];
1006 a[i + f] = u;
1007 }
1008 }
1009 // 11th 12th-tile
1010 fp = f / 3;
1011 e = r - fp;
1012 // Upper margin has 2(d+1) elements; d == e - (position in sample)
1013 // Force k into the upper margin. Note p will be within [s, e]
1014 // if: (double) (r - k) / (r - l) < 1/12 => (r - k) / 2 < (r - l) / 24
1015 p = e - ((r - k) >>> 1);
1016 }
1017 final int s = e - fp + 1;
1018 for (int i = s; i <= e; i++) {
1019 Sorting.sort3(a, i - fp, i, i + fp);
1020 }
1021 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
1022 return expandPartition(a, l, r, s, e, p, upper[0], upper);
1023 }
1024
1025 /**
1026 * Expand a partition around a single pivot. Partitioning exchanges array
1027 * elements such that all elements smaller than pivot are before it and all
1028 * elements larger than pivot are after it. The central region is already
1029 * partitioned.
1030 *
1031 * <pre>{@code
1032 * |l |s |p0 p1| e| r|
1033 * | ??? | <P | ==P | >P | ??? |
1034 * }</pre>
1035 *
1036 * <p>This requires that {@code start != end}. However it handles
1037 * {@code left == start} and/or {@code end == right}.
1038 *
1039 * @param a Data array.
1040 * @param left Lower bound (inclusive).
1041 * @param right Upper bound (inclusive).
1042 * @param start Start of the partition range (inclusive).
1043 * @param end End of the partitioned range (inclusive).
1044 * @param pivot0 Lower pivot location (inclusive).
1045 * @param pivot1 Upper pivot location (inclusive).
1046 * @param upper Upper bound (inclusive) of the pivot range [k1].
1047 * @return Lower bound (inclusive) of the pivot range [k0].
1048 */
1049 // package-private for testing
1050 static int expandPartition(double[] a, int left, int right, int start, int end,
1051 int pivot0, int pivot1, int[] upper) {
1052 // 3-way partition of the data using a pivot value into
1053 // less-than, equal or greater-than.
1054 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then
1055 // check for equal to the pivot and move again.
1056 //
1057 // Move sentinels from start and end to left and right. Scan towards the
1058 // sentinels until >=,<=. Swap then move == to the pivot region.
1059 // <-i j->
1060 // |l | | |p0 p1| | | r|
1061 // |>=| ??? | < | == | > | ??? |<=|
1062 //
1063 // When either i or j reach the edge perform finishing loop.
1064 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value
1065 // to p0 for < and updates the pivot range p1 (and optionally p0):
1066 // j->
1067 // |l |p0 p1| | | r|
1068 // | < | == | > | ??? |<=|
1069
1070 final double v = a[pivot0];
1071 // Use start/end as sentinels (requires start != end)
1072 double vi = a[start];
1073 double vj = a[end];
1074 a[start] = a[left];
1075 a[end] = a[right];
1076 a[left] = vj;
1077 a[right] = vi;
1078
1079 int i = start + 1;
1080 int j = end - 1;
1081
1082 // Positioned for pre-in/decrement to write to pivot region
1083 int p0 = pivot0 == start ? i : pivot0;
1084 int p1 = pivot1 == end ? j : pivot1;
1085
1086 while (true) {
1087 do {
1088 --i;
1089 } while (a[i] < v);
1090 do {
1091 ++j;
1092 } while (a[j] > v);
1093 vj = a[i];
1094 vi = a[j];
1095 a[i] = vi;
1096 a[j] = vj;
1097 // Move the equal values to pivot region
1098 if (vi == v) {
1099 a[i] = a[--p0];
1100 a[p0] = v;
1101 }
1102 if (vj == v) {
1103 a[j] = a[++p1];
1104 a[p1] = v;
1105 }
1106 // Termination check and finishing loops.
1107 // Note: This works even if pivot region is zero length (p1 == p0-1 due to
1108 // length 1 pivot region at either start/end) because we pre-inc/decrement
1109 // one side and post-inc/decrement the other side.
1110 if (i == left) {
1111 while (j < right) {
1112 do {
1113 ++j;
1114 } while (a[j] > v);
1115 final double w = a[j];
1116 // Move upper bound of pivot region
1117 a[j] = a[++p1];
1118 a[p1] = v;
1119 // Move lower bound of pivot region
1120 if (w != v) {
1121 a[p0] = w;
1122 p0++;
1123 }
1124 }
1125 break;
1126 }
1127 if (j == right) {
1128 while (i > left) {
1129 do {
1130 --i;
1131 } while (a[i] < v);
1132 final double w = a[i];
1133 // Move lower bound of pivot region
1134 a[i] = a[--p0];
1135 a[p0] = v;
1136 // Move upper bound of pivot region
1137 if (w != v) {
1138 a[p1] = w;
1139 p1--;
1140 }
1141 }
1142 break;
1143 }
1144 }
1145
1146 upper[0] = p1;
1147 return p0;
1148 }
1149
1150 /**
1151 * Partition the array such that indices {@code k} correspond to their
1152 * correctly sorted value in the equivalent fully sorted array.
1153 *
1154 * <p>For all indices {@code k} and any index {@code i}:
1155 *
1156 * <pre>{@code
1157 * data[i < k] <= data[k] <= data[k < i]
1158 * }</pre>
1159 *
1160 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the
1161 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as
1162 * partitioning divides the range.
1163 *
1164 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort
1165 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of
1166 * the quickselect is a heapselect.
1167 *
1168 * <p>The {@code flags} contain the current recursion count and the configured
1169 * length threshold for {@code r - l} to perform sort select. The count is in the upper
1170 * bits and the threshold is in the lower bits.
1171 *
1172 * @param a Values.
1173 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
1174 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
1175 * @param k Interval of indices to partition (ordered).
1176 * @param flags Control flags.
1177 */
1178 // package-private for testing
1179 static void dualPivotQuickSelect(double[] a, int left, int right, UpdatingInterval k, int flags) {
1180 // If partitioning splits the interval then recursion is used for the left-most side(s)
1181 // and the right-most side remains within this function. If partitioning does
1182 // not split the interval then it remains within this function.
1183 int l = left;
1184 int r = right;
1185 int f = flags;
1186 int ka = k.left();
1187 int kb = k.right();
1188 final int[] upper = {0, 0, 0};
1189 while (true) {
1190 // Select when ka and kb are close to the same end,
1191 // or the entire range is small
1192 // |l|-----|ka|--------|kb|------|r|
1193 final int n = r - l;
1194 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE ||
1195 n < (f & SORTSELECT_MASK)) {
1196 sortSelect(a, l, r, ka, kb);
1197 return;
1198 }
1199 if (kb - ka < DP_SORTSELECT_SIZE) {
1200 // Switch to single-pivot mode with Floyd-Rivest sub-sampling
1201 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING);
1202 return;
1203 }
1204 if (f < 0) {
1205 // Excess recursion, switch to heap select
1206 heapSelect(a, l, r, ka, kb);
1207 return;
1208 }
1209
1210 // Dual-pivot partitioning
1211 final int p0 = partition(a, l, r, upper);
1212 final int p1 = upper[0];
1213
1214 // Recursion to max depth
1215 // Note: Here we possibly branch left, middle and right with multiple keys.
1216 // It is possible that the partition has split the keys
1217 // and the recursion proceeds with a reduced set in each region.
1218 // p0 p1 p2 p3
1219 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r|
1220 // kb | ka
1221 f += RECURSION_INCREMENT;
1222 // Recurse left side if required
1223 if (ka < p0) {
1224 if (kb <= p1) {
1225 // Entirely on left side
1226 r = p0 - 1;
1227 if (r < kb) {
1228 kb = k.updateRight(r);
1229 }
1230 continue;
1231 }
1232 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f);
1233 // Here we must process middle and/or right
1234 ka = k.left();
1235 } else if (kb <= p1) {
1236 // No middle/right side
1237 return;
1238 } else if (ka <= p1) {
1239 // Advance lower bound
1240 ka = k.updateLeft(p1 + 1);
1241 }
1242 // Recurse middle if required
1243 final int p2 = upper[1];
1244 final int p3 = upper[2];
1245 if (ka < p2) {
1246 l = p1 + 1;
1247 if (kb <= p3) {
1248 // Entirely in middle
1249 r = p2 - 1;
1250 if (r < kb) {
1251 kb = k.updateRight(r);
1252 }
1253 continue;
1254 }
1255 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f);
1256 ka = k.left();
1257 } else if (kb <= p3) {
1258 // No right side
1259 return;
1260 } else if (ka <= p3) {
1261 ka = k.updateLeft(p3 + 1);
1262 }
1263 // Continue right
1264 l = p3 + 1;
1265 }
1266 }
1267
1268 /**
1269 * Partition an array slice around 2 pivots. Partitioning exchanges array elements
1270 * such that all elements smaller than pivot are before it and all elements larger
1271 * than pivot are after it.
1272 *
1273 * <p>This method returns 4 points describing the pivot ranges of equal values.
1274 *
1275 * <pre>{@code
1276 * |k0 k1| |k2 k3|
1277 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P |
1278 * }</pre>
1279 *
1280 * <ul>
1281 * <li>k0: lower pivot1 point</li>
1282 * <li>k1: upper pivot1 point (inclusive)</li>
1283 * <li>k2: lower pivot2 point</li>
1284 * <li>k3: upper pivot2 point (inclusive)</li>
1285 * </ul>
1286 *
1287 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are
1288 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result
1289 * is set to {@code k1 = k3; k2 == k0}. This can occur if
1290 * {@code P1 == P2} or there are zero or one value between the pivots
1291 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and
1292 * [k3+1, right] must check the length is {@code > 1}.
1293 *
1294 * @param a Data array.
1295 * @param left Lower bound (inclusive).
1296 * @param right Upper bound (inclusive).
1297 * @param bounds Points [k1, k2, k3].
1298 * @return Lower bound (inclusive) of the pivot range [k0].
1299 */
1300 private static int partition(double[] a, int left, int right, int[] bounds) {
1301 // Pick 2 pivots from 5 approximately uniform through the range.
1302 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much
1303 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406
1304 // Ensure the value is above zero to choose different points!
1305 final int n = right - left;
1306 final int step = 1 + (n >>> 3) + (n >>> 6);
1307 final int i3 = left + (n >>> 1);
1308 final int i2 = i3 - step;
1309 final int i1 = i2 - step;
1310 final int i4 = i3 + step;
1311 final int i5 = i4 + step;
1312 Sorting.sort5(a, i1, i2, i3, i4, i5);
1313
1314 // Partition data using pivots P1 and P2 into less-than, greater-than or between.
1315 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel.
1316 // k traverses the unknown region ??? and values moved if less-than or
1317 // greater-than:
1318 //
1319 // left less k great right
1320 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2|
1321 //
1322 // <P1 (left, lt)
1323 // P1 <= & <= P2 [lt, k)
1324 // >P2 (gt, right)
1325 //
1326 // At the end pivots are swapped back to behind the less and great pointers.
1327 //
1328 // | <P1 |P1| P1<= & <= P2 |P2| >P2 |
1329
1330 // Swap ends to the pivot locations.
1331 final double v1 = a[i2];
1332 a[i2] = a[left];
1333 a[left] = v1;
1334 final double v2 = a[i4];
1335 a[i4] = a[right];
1336 a[right] = v2;
1337
1338 // pointers
1339 int less = left;
1340 int great = right;
1341
1342 // Fast-forward ascending / descending runs to reduce swaps.
1343 // Cannot overrun as end pivots (v1 <= v2) act as sentinels.
1344 do {
1345 ++less;
1346 } while (a[less] < v1);
1347 do {
1348 --great;
1349 } while (a[great] > v2);
1350
1351 // a[less - 1] < P1 : a[great + 1] > P2
1352 // unvisited in [less, great]
1353 SORTING:
1354 for (int k = less; k <= great; k++) {
1355 final double v = a[k];
1356 if (v < v1) {
1357 // swap(a, k, less++)
1358 a[k] = a[less];
1359 a[less] = v;
1360 less++;
1361 } else if (v > v2) {
1362 // while k < great and a[great] > v2:
1363 // great--
1364 while (a[great] > v2) {
1365 if (great-- == k) {
1366 // Done
1367 break SORTING;
1368 }
1369 }
1370 // swap(a, k, great--)
1371 // if a[k] < v1:
1372 // swap(a, k, less++)
1373 final double w = a[great];
1374 a[great] = v;
1375 great--;
1376 // delay a[k] = w
1377 if (w < v1) {
1378 a[k] = a[less];
1379 a[less] = w;
1380 less++;
1381 } else {
1382 a[k] = w;
1383 }
1384 }
1385 }
1386
1387 // Change to inclusive ends : a[less] < P1 : a[great] > P2
1388 less--;
1389 great++;
1390 // Move the pivots to correct locations
1391 a[left] = a[less];
1392 a[less] = v1;
1393 a[right] = a[great];
1394 a[great] = v2;
1395
1396 // Record the pivot locations
1397 final int lower = less;
1398 bounds[2] = great;
1399
1400 // equal elements
1401 // Original paper: If middle partition is bigger than a threshold
1402 // then check for equal elements.
1403
1404 // Note: This is extra work. When performing partitioning the region of interest
1405 // may be entirely above or below the central region and this can be skipped.
1406
1407 // Here we look for equal elements if the centre is more than 5/8 the length.
1408 // 5/8 = 1/2 + 1/8. Pivots must be different.
1409 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) {
1410
1411 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends.
1412 // Since v1 != v2 these act as sentinels to prevent overrun.
1413 do {
1414 ++less;
1415 } while (a[less] == v1);
1416 do {
1417 --great;
1418 } while (a[great] == v2);
1419
1420 // This copies the logic in the sorting loop using == comparisons
1421 EQUAL:
1422 for (int k = less; k <= great; k++) {
1423 final double v = a[k];
1424 if (v == v1) {
1425 a[k] = a[less];
1426 a[less] = v;
1427 less++;
1428 } else if (v == v2) {
1429 while (a[great] == v2) {
1430 if (great-- == k) {
1431 // Done
1432 break EQUAL;
1433 }
1434 }
1435 final double w = a[great];
1436 a[great] = v;
1437 great--;
1438 if (w == v1) {
1439 a[k] = a[less];
1440 a[less] = w;
1441 less++;
1442 } else {
1443 a[k] = w;
1444 }
1445 }
1446 }
1447
1448 // Change to inclusive ends
1449 less--;
1450 great++;
1451 }
1452
1453 // Between pivots in (less, great)
1454 if (v1 != v2 && less < great - 1) {
1455 // Record the pivot end points
1456 bounds[0] = less;
1457 bounds[1] = great;
1458 } else {
1459 // No unsorted internal region (set k1 = k3; k2 = k0)
1460 bounds[0] = bounds[2];
1461 bounds[1] = lower;
1462 }
1463
1464 return lower;
1465 }
1466
1467 /**
1468 * Partition the elements between {@code ka} and {@code kb} using a heap select
1469 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1470 *
1471 * @param a Data array to use to find out the K<sup>th</sup> value.
1472 * @param left Lower bound (inclusive).
1473 * @param right Upper bound (inclusive).
1474 * @param ka Lower index to select.
1475 * @param kb Upper index to select.
1476 */
1477 static void heapSelect(int[] a, int left, int right, int ka, int kb) {
1478 if (right <= left) {
1479 return;
1480 }
1481 // Use the smallest heap
1482 if (kb - left < right - ka) {
1483 heapSelectLeft(a, left, right, ka, kb);
1484 } else {
1485 heapSelectRight(a, left, right, ka, kb);
1486 }
1487 }
1488
1489 /**
1490 * Partition the elements between {@code ka} and {@code kb} using a heap select
1491 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1492 *
1493 * <p>For best performance this should be called with {@code k} in the lower
1494 * half of the range.
1495 *
1496 * @param a Data array to use to find out the K<sup>th</sup> value.
1497 * @param left Lower bound (inclusive).
1498 * @param right Upper bound (inclusive).
1499 * @param ka Lower index to select.
1500 * @param kb Upper index to select.
1501 */
1502 static void heapSelectLeft(int[] a, int left, int right, int ka, int kb) {
1503 // Create a max heap in-place in [left, k], rooted at a[left] = max
1504 // |l|-max-heap-|k|--------------|
1505 // Build the heap using Floyd's heap-construction algorithm for heap size n.
1506 // Start at parent of the last element in the heap (k),
1507 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left
1508 int end = kb + 1;
1509 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) {
1510 maxHeapSiftDown(a, a[p], p, left, end);
1511 }
1512 // Scan the remaining data and insert
1513 // Mitigate worst case performance on descending data by backward sweep
1514 int max = a[left];
1515 for (int i = right; i > kb; i--) {
1516 final int v = a[i];
1517 if (v < max) {
1518 a[i] = max;
1519 maxHeapSiftDown(a, v, left, left, end);
1520 max = a[left];
1521 }
1522 }
1523 // Partition [ka, kb]
1524 // |l|-max-heap-|k|--------------|
1525 // | <-swap-> | then sift down reduced size heap
1526 // Avoid sifting heap of size 1
1527 final int last = Math.max(left, ka - 1);
1528 while (--end > last) {
1529 maxHeapSiftDown(a, a[end], left, left, end);
1530 a[end] = max;
1531 max = a[left];
1532 }
1533 }
1534
1535 /**
1536 * Sift the element down the max heap.
1537 *
1538 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root.
1539 *
1540 * @param a Heap data.
1541 * @param v Value to sift.
1542 * @param p Start position.
1543 * @param root Root of the heap.
1544 * @param end End of the heap (exclusive).
1545 */
1546 private static void maxHeapSiftDown(int[] a, int v, int p, int root, int end) {
1547 // child2 = root + 2 * (parent - root) + 2
1548 // = 2 * parent - root + 2
1549 while (true) {
1550 // Right child
1551 int c = (p << 1) - root + 2;
1552 if (c > end) {
1553 // No left child
1554 break;
1555 }
1556 // Use the left child if right doesn't exist, or it is greater
1557 if (c == end || a[c] < a[c - 1]) {
1558 --c;
1559 }
1560 if (v >= a[c]) {
1561 // Parent greater than largest child - done
1562 break;
1563 }
1564 // Swap and descend
1565 a[p] = a[c];
1566 p = c;
1567 }
1568 a[p] = v;
1569 }
1570
1571 /**
1572 * Partition the elements between {@code ka} and {@code kb} using a heap select
1573 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1574 *
1575 * <p>For best performance this should be called with {@code k} in the upper
1576 * half of the range.
1577 *
1578 * @param a Data array to use to find out the K<sup>th</sup> value.
1579 * @param left Lower bound (inclusive).
1580 * @param right Upper bound (inclusive).
1581 * @param ka Lower index to select.
1582 * @param kb Upper index to select.
1583 */
1584 static void heapSelectRight(int[] a, int left, int right, int ka, int kb) {
1585 // Create a min heap in-place in [k, right], rooted at a[right] = min
1586 // |--------------|k|-min-heap-|r|
1587 // Build the heap using Floyd's heap-construction algorithm for heap size n.
1588 // Start at parent of the last element in the heap (k),
1589 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k
1590 int end = ka - 1;
1591 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) {
1592 minHeapSiftDown(a, a[p], p, right, end);
1593 }
1594 // Scan the remaining data and insert
1595 // Mitigate worst case performance on descending data by backward sweep
1596 int min = a[right];
1597 for (int i = left; i < ka; i++) {
1598 final int v = a[i];
1599 if (v > min) {
1600 a[i] = min;
1601 minHeapSiftDown(a, v, right, right, end);
1602 min = a[right];
1603 }
1604 }
1605 // Partition [ka, kb]
1606 // |--------------|k|-min-heap-|r|
1607 // | <-swap-> | then sift down reduced size heap
1608 // Avoid sifting heap of size 1
1609 final int last = Math.min(right, kb + 1);
1610 while (++end < last) {
1611 minHeapSiftDown(a, a[end], right, right, end);
1612 a[end] = min;
1613 min = a[right];
1614 }
1615 }
1616
1617 /**
1618 * Sift the element down the min heap.
1619 *
1620 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root.
1621 *
1622 * @param a Heap data.
1623 * @param v Value to sift.
1624 * @param p Start position.
1625 * @param root Root of the heap.
1626 * @param end End of the heap (exclusive).
1627 */
1628 private static void minHeapSiftDown(int[] a, int v, int p, int root, int end) {
1629 // child2 = root - 2 * (root - parent) - 2
1630 // = 2 * parent - root - 2
1631 while (true) {
1632 // Right child
1633 int c = (p << 1) - root - 2;
1634 if (c < end) {
1635 // No left child
1636 break;
1637 }
1638 // Use the left child if right doesn't exist, or it is less
1639 if (c == end || a[c] > a[c + 1]) {
1640 ++c;
1641 }
1642 if (v <= a[c]) {
1643 // Parent less than smallest child - done
1644 break;
1645 }
1646 // Swap and descend
1647 a[p] = a[c];
1648 p = c;
1649 }
1650 a[p] = v;
1651 }
1652
1653 /**
1654 * Partition the elements between {@code ka} and {@code kb} using a sort select
1655 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1656 *
1657 * @param a Data array to use to find out the K<sup>th</sup> value.
1658 * @param left Lower bound (inclusive).
1659 * @param right Upper bound (inclusive).
1660 * @param ka Lower index to select.
1661 * @param kb Upper index to select.
1662 */
1663 static void sortSelect(int[] a, int left, int right, int ka, int kb) {
1664 // Combine the test for right <= left with
1665 // avoiding the overhead of sort select on tiny data.
1666 if (right - left <= MIN_SORTSELECT_SIZE) {
1667 Sorting.sort(a, left, right);
1668 return;
1669 }
1670 // Sort the smallest side
1671 if (kb - left < right - ka) {
1672 sortSelectLeft(a, left, right, kb);
1673 } else {
1674 sortSelectRight(a, left, right, ka);
1675 }
1676 }
1677
1678 /**
1679 * Partition the minimum {@code n} elements below {@code k} where
1680 * {@code n = k - left + 1}. Uses an insertion sort algorithm.
1681 *
1682 * <p>Works with any {@code k} in the range {@code left <= k <= right}
1683 * and performs a full sort of the range below {@code k}.
1684 *
1685 * <p>For best performance this should be called with
1686 * {@code k - left < right - k}, i.e.
1687 * to partition a value in the lower half of the range.
1688 *
1689 * @param a Data array to use to find out the K<sup>th</sup> value.
1690 * @param left Lower bound (inclusive).
1691 * @param right Upper bound (inclusive).
1692 * @param k Index to select.
1693 */
1694 static void sortSelectLeft(int[] a, int left, int right, int k) {
1695 // Sort
1696 for (int i = left; ++i <= k;) {
1697 final int v = a[i];
1698 // Move preceding higher elements above (if required)
1699 if (v < a[i - 1]) {
1700 int j = i;
1701 while (--j >= left && v < a[j]) {
1702 a[j + 1] = a[j];
1703 }
1704 a[j + 1] = v;
1705 }
1706 }
1707 // Scan the remaining data and insert
1708 // Mitigate worst case performance on descending data by backward sweep
1709 int m = a[k];
1710 for (int i = right; i > k; i--) {
1711 final int v = a[i];
1712 if (v < m) {
1713 a[i] = m;
1714 int j = k;
1715 while (--j >= left && v < a[j]) {
1716 a[j + 1] = a[j];
1717 }
1718 a[j + 1] = v;
1719 m = a[k];
1720 }
1721 }
1722 }
1723
1724 /**
1725 * Partition the maximum {@code n} elements above {@code k} where
1726 * {@code n = right - k + 1}. Uses an insertion sort algorithm.
1727 *
1728 * <p>Works with any {@code k} in the range {@code left <= k <= right}
1729 * and can be used to perform a full sort of the range above {@code k}.
1730 *
1731 * <p>For best performance this should be called with
1732 * {@code k - left > right - k}, i.e.
1733 * to partition a value in the upper half of the range.
1734 *
1735 * @param a Data array to use to find out the K<sup>th</sup> value.
1736 * @param left Lower bound (inclusive).
1737 * @param right Upper bound (inclusive).
1738 * @param k Index to select.
1739 */
1740 static void sortSelectRight(int[] a, int left, int right, int k) {
1741 // Sort
1742 for (int i = right; --i >= k;) {
1743 final int v = a[i];
1744 // Move succeeding lower elements below (if required)
1745 if (v > a[i + 1]) {
1746 int j = i;
1747 while (++j <= right && v > a[j]) {
1748 a[j - 1] = a[j];
1749 }
1750 a[j - 1] = v;
1751 }
1752 }
1753 // Scan the remaining data and insert
1754 // Mitigate worst case performance on descending data by backward sweep
1755 int m = a[k];
1756 for (int i = left; i < k; i++) {
1757 final int v = a[i];
1758 if (v > m) {
1759 a[i] = m;
1760 int j = k;
1761 while (++j <= right && v > a[j]) {
1762 a[j - 1] = a[j];
1763 }
1764 a[j - 1] = v;
1765 m = a[k];
1766 }
1767 }
1768 }
1769
1770 /**
1771 * Partition the array such that index {@code k} corresponds to its correctly
1772 * sorted value in the equivalent fully sorted array.
1773 *
1774 * <p>Assumes {@code k} is a valid index into [left, right].
1775 *
1776 * @param a Values.
1777 * @param left Lower bound of data (inclusive).
1778 * @param right Upper bound of data (inclusive).
1779 * @param k Index.
1780 */
1781 static void select(int[] a, int left, int right, int k) {
1782 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING);
1783 }
1784
1785 /**
1786 * Partition the array such that indices {@code k} correspond to their correctly
1787 * sorted value in the equivalent fully sorted array.
1788 *
1789 * @param a Values.
1790 * @param left Lower bound of data (inclusive).
1791 * @param right Upper bound of data (inclusive).
1792 * @param k Indices (may be destructively modified).
1793 * @param n Count of indices.
1794 */
1795 static void select(int[] a, int left, int right, int[] k, int n) {
1796 if (n == 1) {
1797 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING);
1798 return;
1799 }
1800
1801 // Interval creation validates the indices are in [left, right]
1802 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(k, n);
1803
1804 // Note: If the keys are not separated then they are effectively a single key.
1805 // Any split of keys separated by the sort select size
1806 // will be finished on the next iteration.
1807 final int k1 = keys.left();
1808 final int kn = keys.right();
1809 if (kn - k1 < DP_SORTSELECT_SIZE) {
1810 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING);
1811 } else {
1812 // Dual-pivot mode with small range sort length configured using index density
1813 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn));
1814 }
1815 }
1816
1817 /**
1818 * Partition the array such that indices {@code k} correspond to their
1819 * correctly sorted value in the equivalent fully sorted array.
1820 *
1821 * <p>For all indices {@code [ka, kb]} and any index {@code i}:
1822 *
1823 * <pre>{@code
1824 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i]
1825 * }</pre>
1826 *
1827 * <p>This function accepts indices {@code [ka, kb]} that define the
1828 * range of indices to partition. It is expected that the range is small.
1829 *
1830 * <p>The {@code flags} are used to control the sampling mode and adaption of
1831 * the index within the sample.
1832 *
1833 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher
1834 * than the keys if equal values are present in the data.
1835 *
1836 * @param a Values.
1837 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
1838 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
1839 * @param ka First key of interest.
1840 * @param kb Last key of interest.
1841 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive).
1842 * @param flags Adaption flags.
1843 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive).
1844 */
1845 static int quickSelectAdaptive(int[] a, int left, int right, int ka, int kb,
1846 int[] bounds, int flags) {
1847 int l = left;
1848 int r = right;
1849 int m = flags;
1850 while (true) {
1851 // Select when ka and kb are close to the same end
1852 // |l|-----|ka|kkkkkkkk|kb|------|r|
1853 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) {
1854 sortSelect(a, l, r, ka, kb);
1855 bounds[0] = kb;
1856 return ka;
1857 }
1858
1859 // Only target ka; kb is assumed to be close
1860 int p0;
1861 final int n = r - l;
1862 // f in [0, 1]
1863 final double f = (double) (ka - l) / n;
1864 // Record the larger margin (start at 1/4) to create the estimated size.
1865 // step L R
1866 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328)
1867 // left 1/6 1/4
1868 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219)
1869 int margin = n >> 2;
1870 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) {
1871 // Floyd-Rivest sample step uses the same margins
1872 p0 = sampleStep(a, l, r, ka, bounds);
1873 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) {
1874 margin += (n >> 5) + (n >> 6);
1875 } else if (f > STEP_LEFT && f < STEP_RIGHT) {
1876 margin -= n >> 5;
1877 }
1878 } else if (f <= STEP_LEFT) {
1879 if (f <= STEP_FAR_LEFT) {
1880 margin += (n >> 5) + (n >> 6);
1881 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m);
1882 } else {
1883 p0 = repeatedStepLeft(a, l, r, ka, bounds, m);
1884 }
1885 } else if (f >= STEP_RIGHT) {
1886 if (f >= STEP_FAR_RIGHT) {
1887 margin += (n >> 5) + (n >> 6);
1888 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m);
1889 } else {
1890 p0 = repeatedStepRight(a, l, r, ka, bounds, m);
1891 }
1892 } else {
1893 margin -= n >> 5;
1894 p0 = repeatedStep(a, l, r, ka, bounds, m);
1895 }
1896
1897 // Note: Here we expect [ka, kb] to be small and splitting is unlikely.
1898 // p0 p1
1899 // |l|--|ka|kkkk|kb|--|P|-------------------|r|
1900 // |l|----------------|P|--|ka|kkk|kb|------|r|
1901 // |l|-----------|ka|k|P|k|kb|--------------|r|
1902 final int p1 = bounds[0];
1903 if (kb < p0) {
1904 // Entirely on left side
1905 r = p0 - 1;
1906 } else if (ka > p1) {
1907 // Entirely on right side
1908 l = p1 + 1;
1909 } else {
1910 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish.
1911 // Here we set the bounds for use after median-of-medians pivot selection.
1912 // In the event there are many equal values this allows collecting those
1913 // known to be equal together when moving around the medians sample.
1914 if (kb > p1) {
1915 sortSelectLeft(a, p1 + 1, r, kb);
1916 bounds[0] = kb;
1917 }
1918 if (ka < p0) {
1919 sortSelectRight(a, l, p0 - 1, ka);
1920 p0 = ka;
1921 }
1922 return p0;
1923 }
1924 // Update mode based on target partition size
1925 if (r - l > n - margin) {
1926 m++;
1927 }
1928 }
1929 }
1930
1931 /**
1932 * Partition an array slice around a pivot. Partitioning exchanges array elements such
1933 * that all elements smaller than pivot are before it and all elements larger than
1934 * pivot are after it.
1935 *
1936 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will
1937 * fall in the smaller partition when the entire range is partitioned.
1938 *
1939 * <p>Assumes the range {@code r - l} is large.
1940 *
1941 * @param a Data array.
1942 * @param l Lower bound (inclusive).
1943 * @param r Upper bound (inclusive).
1944 * @param k Target index.
1945 * @param upper Upper bound (inclusive) of the pivot range.
1946 * @return Lower bound (inclusive) of the pivot range.
1947 */
1948 private static int sampleStep(int[] a, int l, int r, int k, int[] upper) {
1949 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate
1950 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the
1951 // (k-l+1)-th element is expected to lie in the smaller set after partitioning.
1952 final int n = r - l + 1;
1953 final int ith = k - l + 1;
1954 final double z = Math.log(n);
1955 // sample size = 0.5 * n^(2/3)
1956 final double s = 0.5 * Math.exp(0.6666666666666666 * z);
1957 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1));
1958 final int ll = Math.max(l, (int) (k - ith * s / n + sd));
1959 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd));
1960 // Sample recursion restarts from [ll, rr]
1961 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING);
1962 return expandPartition(a, l, r, ll, rr, p, upper[0], upper);
1963 }
1964
1965 /**
1966 * Partition an array slice around a pivot. Partitioning exchanges array elements such
1967 * that all elements smaller than pivot are before it and all elements larger than
1968 * pivot are after it.
1969 *
1970 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller
1971 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}.
1972 *
1973 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
1974 * with the median of 3 then median of 3; the final sample is placed in the
1975 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
1976 *
1977 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9.
1978 *
1979 * @param a Data array.
1980 * @param l Lower bound (inclusive).
1981 * @param r Upper bound (inclusive).
1982 * @param k Target index.
1983 * @param upper Upper bound (inclusive) of the pivot range.
1984 * @param flags Control flags.
1985 * @return Lower bound (inclusive) of the pivot range.
1986 */
1987 private static int repeatedStep(int[] a, int l, int r, int k, int[] upper, int flags) {
1988 // Adapted from Alexandrescu (2016), algorithm 8.
1989 final int fp;
1990 final int s;
1991 int p;
1992 if (flags <= MODE_SAMPLING) {
1993 // Median into a 12th-tile
1994 fp = (r - l + 1) / 12;
1995 // Position the sample around the target k
1996 s = k - mapDistance(k - l, l, r, fp);
1997 p = k;
1998 } else {
1999 // i in tertile [3f':6f')
2000 fp = (r - l + 1) / 9;
2001 final int f3 = 3 * fp;
2002 final int end = l + (f3 << 1);
2003 for (int i = l + f3; i < end; i++) {
2004 Sorting.sort3(a, i - f3, i, i + f3);
2005 }
2006 // 5th 9th-tile: [4f':5f')
2007 s = l + (fp << 2);
2008 // No adaption uses the middle to enforce strict margins
2009 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
2010 }
2011 final int e = s + fp - 1;
2012 for (int i = s; i <= e; i++) {
2013 Sorting.sort3(a, i - fp, i, i + fp);
2014 }
2015 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2016 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2017 }
2018
2019 /**
2020 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2021 * that all elements smaller than pivot are before it and all elements larger than
2022 * pivot are after it.
2023 *
2024 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2025 * range.
2026 *
2027 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2028 * with the lower median of 4 then either median of 3 with the final sample placed in the
2029 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile;
2030 * the pivot chosen from the sample is adaptive using the input {@code k}.
2031 *
2032 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4.
2033 *
2034 * @param a Data array.
2035 * @param l Lower bound (inclusive).
2036 * @param r Upper bound (inclusive).
2037 * @param k Target index.
2038 * @param upper Upper bound (inclusive) of the pivot range.
2039 * @param flags Control flags.
2040 * @return Lower bound (inclusive) of the pivot range.
2041 */
2042 private static int repeatedStepLeft(int[] a, int l, int r, int k, int[] upper, int flags) {
2043 // Adapted from Alexandrescu (2016), algorithm 9.
2044 final int fp;
2045 final int s;
2046 int p;
2047 if (flags <= MODE_SAMPLING) {
2048 // Median into a 12th-tile
2049 fp = (r - l + 1) / 12;
2050 // Position the sample around the target k
2051 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12
2052 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp);
2053 p = k;
2054 } else {
2055 // i in 2nd quartile
2056 final int f = (r - l + 1) >> 2;
2057 final int f2 = f + f;
2058 final int end = l + f2;
2059 for (int i = l + f; i < end; i++) {
2060 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2);
2061 }
2062 // i in 5th 12th-tile
2063 fp = f / 3;
2064 s = l + f + fp;
2065 // No adaption uses the middle to enforce strict margins
2066 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
2067 }
2068 final int e = s + fp - 1;
2069 for (int i = s; i <= e; i++) {
2070 Sorting.sort3(a, i - fp, i, i + fp);
2071 }
2072 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2073 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2074 }
2075
2076 /**
2077 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2078 * that all elements smaller than pivot are before it and all elements larger than
2079 * pivot are after it.
2080 *
2081 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2082 * range.
2083 *
2084 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2085 * with the upper median of 4 then either median of 3 with the final sample placed in the
2086 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile;
2087 * the pivot chosen from the sample is adaptive using the input {@code k}.
2088 *
2089 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6.
2090 *
2091 * @param a Data array.
2092 * @param l Lower bound (inclusive).
2093 * @param r Upper bound (inclusive).
2094 * @param k Target index.
2095 * @param upper Upper bound (inclusive) of the pivot range.
2096 * @param flags Control flags.
2097 * @return Lower bound (inclusive) of the pivot range.
2098 */
2099 private static int repeatedStepRight(int[] a, int l, int r, int k, int[] upper, int flags) {
2100 // Mirror image repeatedStepLeft using upper median into 3rd quartile
2101 final int fp;
2102 final int e;
2103 int p;
2104 if (flags <= MODE_SAMPLING) {
2105 // Median into a 12th-tile
2106 fp = (r - l + 1) / 12;
2107 // Position the sample around the target k
2108 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12
2109 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp);
2110 p = k;
2111 } else {
2112 // i in 3rd quartile
2113 final int f = (r - l + 1) >> 2;
2114 final int f2 = f + f;
2115 final int end = r - f2;
2116 for (int i = r - f; i > end; i--) {
2117 Sorting.upperMedian4(a, i - f2, i - f, i, i + f);
2118 }
2119 // i in 8th 12th-tile
2120 fp = f / 3;
2121 e = r - f - fp;
2122 // No adaption uses the middle to enforce strict margins
2123 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1));
2124 }
2125 final int s = e - fp + 1;
2126 for (int i = s; i <= e; i++) {
2127 Sorting.sort3(a, i - fp, i, i + fp);
2128 }
2129 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2130 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2131 }
2132
2133 /**
2134 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2135 * that all elements smaller than pivot are before it and all elements larger than
2136 * pivot are after it.
2137 *
2138 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2139 * range.
2140 *
2141 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2142 * with the minimum of 4 then median of 3; the final sample is placed in the
2143 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
2144 *
2145 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3.
2146 *
2147 * @param a Data array.
2148 * @param l Lower bound (inclusive).
2149 * @param r Upper bound (inclusive).
2150 * @param k Target index.
2151 * @param upper Upper bound (inclusive) of the pivot range.
2152 * @param flags Control flags.
2153 * @return Lower bound (inclusive) of the pivot range.
2154 */
2155 private static int repeatedStepFarLeft(int[] a, int l, int r, int k, int[] upper, int flags) {
2156 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3
2157 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile.
2158 // The differences are:
2159 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
2160 // - The position of the sample is closer to the expected location of k < |A| / 12.
2161 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
2162 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving
2163 // k in the larger parition and a wasted iteration.
2164 // - Adaption is adjusted to force use of the lower margin when not sampling.
2165 final int fp;
2166 final int s;
2167 int p;
2168 if (flags <= MODE_SAMPLING) {
2169 // 2nd 12th-tile
2170 fp = (r - l + 1) / 12;
2171 s = l + fp;
2172 // Use adaption
2173 p = s + mapDistance(k - l, l, r, fp);
2174 } else {
2175 // i in 2nd quartile; min into i-f (1st quartile)
2176 final int f = (r - l + 1) >> 2;
2177 final int f2 = f + f;
2178 final int end = l + f2;
2179 for (int i = l + f; i < end; i++) {
2180 if (a[i + f] < a[i - f]) {
2181 final int u = a[i + f];
2182 a[i + f] = a[i - f];
2183 a[i - f] = u;
2184 }
2185 if (a[i + f2] < a[i]) {
2186 final int v = a[i + f2];
2187 a[i + f2] = a[i];
2188 a[i] = v;
2189 }
2190 if (a[i] < a[i - f]) {
2191 final int u = a[i];
2192 a[i] = a[i - f];
2193 a[i - f] = u;
2194 }
2195 }
2196 // 2nd 12th-tile
2197 fp = f / 3;
2198 s = l + fp;
2199 // Lower margin has 2(d+1) elements; d == (position in sample) - s
2200 // Force k into the lower margin. Note p will be within [s, e]
2201 // if: (double) (k - l) / (r - l) < 1/12 => (k - l) / 2 < (r - l) / 24
2202 p = s + ((k - l) >>> 1);
2203 }
2204 final int e = s + fp - 1;
2205 for (int i = s; i <= e; i++) {
2206 Sorting.sort3(a, i - fp, i, i + fp);
2207 }
2208 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2209 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2210 }
2211
2212 /**
2213 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2214 * that all elements smaller than pivot are before it and all elements larger than
2215 * pivot are after it.
2216 *
2217 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2218 * range.
2219 *
2220 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2221 * with the maximum of 4 then median of 3; the final sample is placed in the
2222 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
2223 *
2224 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12.
2225 *
2226 * @param a Data array.
2227 * @param l Lower bound (inclusive).
2228 * @param r Upper bound (inclusive).
2229 * @param k Target index.
2230 * @param upper Upper bound (inclusive) of the pivot range.
2231 * @param flags Control flags.
2232 * @return Lower bound (inclusive) of the pivot range.
2233 */
2234 private static int repeatedStepFarRight(int[] a, int l, int r, int k, int[] upper, int flags) {
2235 // Mirror image repeatedStepFarLeft
2236 final int fp;
2237 final int e;
2238 int p;
2239 if (flags <= MODE_SAMPLING) {
2240 // 11th 12th-tile
2241 fp = (r - l + 1) / 12;
2242 e = r - fp;
2243 // Use adaption
2244 p = e - mapDistance(r - k, l, r, fp);
2245 } else {
2246 // i in 3rd quartile; max into i+f (4th quartile)
2247 final int f = (r - l + 1) >> 2;
2248 final int f2 = f + f;
2249 final int end = r - f2;
2250 for (int i = r - f; i > end; i--) {
2251 if (a[i - f] > a[i + f]) {
2252 final int u = a[i - f];
2253 a[i - f] = a[i + f];
2254 a[i + f] = u;
2255 }
2256 if (a[i - f2] > a[i]) {
2257 final int v = a[i - f2];
2258 a[i - f2] = a[i];
2259 a[i] = v;
2260 }
2261 if (a[i] > a[i + f]) {
2262 final int u = a[i];
2263 a[i] = a[i + f];
2264 a[i + f] = u;
2265 }
2266 }
2267 // 11th 12th-tile
2268 fp = f / 3;
2269 e = r - fp;
2270 // Upper margin has 2(d+1) elements; d == e - (position in sample)
2271 // Force k into the upper margin. Note p will be within [s, e]
2272 // if: (double) (r - k) / (r - l) < 1/12 => (r - k) / 2 < (r - l) / 24
2273 p = e - ((r - k) >>> 1);
2274 }
2275 final int s = e - fp + 1;
2276 for (int i = s; i <= e; i++) {
2277 Sorting.sort3(a, i - fp, i, i + fp);
2278 }
2279 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2280 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2281 }
2282
2283 /**
2284 * Expand a partition around a single pivot. Partitioning exchanges array
2285 * elements such that all elements smaller than pivot are before it and all
2286 * elements larger than pivot are after it. The central region is already
2287 * partitioned.
2288 *
2289 * <pre>{@code
2290 * |l |s |p0 p1| e| r|
2291 * | ??? | <P | ==P | >P | ??? |
2292 * }</pre>
2293 *
2294 * <p>This requires that {@code start != end}. However it handles
2295 * {@code left == start} and/or {@code end == right}.
2296 *
2297 * @param a Data array.
2298 * @param left Lower bound (inclusive).
2299 * @param right Upper bound (inclusive).
2300 * @param start Start of the partition range (inclusive).
2301 * @param end End of the partitioned range (inclusive).
2302 * @param pivot0 Lower pivot location (inclusive).
2303 * @param pivot1 Upper pivot location (inclusive).
2304 * @param upper Upper bound (inclusive) of the pivot range [k1].
2305 * @return Lower bound (inclusive) of the pivot range [k0].
2306 */
2307 // package-private for testing
2308 static int expandPartition(int[] a, int left, int right, int start, int end,
2309 int pivot0, int pivot1, int[] upper) {
2310 // 3-way partition of the data using a pivot value into
2311 // less-than, equal or greater-than.
2312 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then
2313 // check for equal to the pivot and move again.
2314 //
2315 // Move sentinels from start and end to left and right. Scan towards the
2316 // sentinels until >=,<=. Swap then move == to the pivot region.
2317 // <-i j->
2318 // |l | | |p0 p1| | | r|
2319 // |>=| ??? | < | == | > | ??? |<=|
2320 //
2321 // When either i or j reach the edge perform finishing loop.
2322 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value
2323 // to p0 for < and updates the pivot range p1 (and optionally p0):
2324 // j->
2325 // |l |p0 p1| | | r|
2326 // | < | == | > | ??? |<=|
2327
2328 final int v = a[pivot0];
2329 // Use start/end as sentinels (requires start != end)
2330 int vi = a[start];
2331 int vj = a[end];
2332 a[start] = a[left];
2333 a[end] = a[right];
2334 a[left] = vj;
2335 a[right] = vi;
2336
2337 int i = start + 1;
2338 int j = end - 1;
2339
2340 // Positioned for pre-in/decrement to write to pivot region
2341 int p0 = pivot0 == start ? i : pivot0;
2342 int p1 = pivot1 == end ? j : pivot1;
2343
2344 while (true) {
2345 do {
2346 --i;
2347 } while (a[i] < v);
2348 do {
2349 ++j;
2350 } while (a[j] > v);
2351 vj = a[i];
2352 vi = a[j];
2353 a[i] = vi;
2354 a[j] = vj;
2355 // Move the equal values to pivot region
2356 if (vi == v) {
2357 a[i] = a[--p0];
2358 a[p0] = v;
2359 }
2360 if (vj == v) {
2361 a[j] = a[++p1];
2362 a[p1] = v;
2363 }
2364 // Termination check and finishing loops.
2365 // Note: This works even if pivot region is zero length (p1 == p0-1 due to
2366 // length 1 pivot region at either start/end) because we pre-inc/decrement
2367 // one side and post-inc/decrement the other side.
2368 if (i == left) {
2369 while (j < right) {
2370 do {
2371 ++j;
2372 } while (a[j] > v);
2373 final int w = a[j];
2374 // Move upper bound of pivot region
2375 a[j] = a[++p1];
2376 a[p1] = v;
2377 // Move lower bound of pivot region
2378 if (w != v) {
2379 a[p0] = w;
2380 p0++;
2381 }
2382 }
2383 break;
2384 }
2385 if (j == right) {
2386 while (i > left) {
2387 do {
2388 --i;
2389 } while (a[i] < v);
2390 final int w = a[i];
2391 // Move lower bound of pivot region
2392 a[i] = a[--p0];
2393 a[p0] = v;
2394 // Move upper bound of pivot region
2395 if (w != v) {
2396 a[p1] = w;
2397 p1--;
2398 }
2399 }
2400 break;
2401 }
2402 }
2403
2404 upper[0] = p1;
2405 return p0;
2406 }
2407
2408 /**
2409 * Partition the array such that indices {@code k} correspond to their
2410 * correctly sorted value in the equivalent fully sorted array.
2411 *
2412 * <p>For all indices {@code k} and any index {@code i}:
2413 *
2414 * <pre>{@code
2415 * data[i < k] <= data[k] <= data[k < i]
2416 * }</pre>
2417 *
2418 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the
2419 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as
2420 * partitioning divides the range.
2421 *
2422 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort
2423 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of
2424 * the quickselect is a heapselect.
2425 *
2426 * <p>The {@code flags} contain the current recursion count and the configured
2427 * length threshold for {@code r - l} to perform sort select. The count is in the upper
2428 * bits and the threshold is in the lower bits.
2429 *
2430 * @param a Values.
2431 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
2432 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
2433 * @param k Interval of indices to partition (ordered).
2434 * @param flags Control flags.
2435 */
2436 // package-private for testing
2437 static void dualPivotQuickSelect(int[] a, int left, int right, UpdatingInterval k, int flags) {
2438 // If partitioning splits the interval then recursion is used for the left-most side(s)
2439 // and the right-most side remains within this function. If partitioning does
2440 // not split the interval then it remains within this function.
2441 int l = left;
2442 int r = right;
2443 int f = flags;
2444 int ka = k.left();
2445 int kb = k.right();
2446 final int[] upper = {0, 0, 0};
2447 while (true) {
2448 // Select when ka and kb are close to the same end,
2449 // or the entire range is small
2450 // |l|-----|ka|--------|kb|------|r|
2451 final int n = r - l;
2452 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE ||
2453 n < (f & SORTSELECT_MASK)) {
2454 sortSelect(a, l, r, ka, kb);
2455 return;
2456 }
2457 if (kb - ka < DP_SORTSELECT_SIZE) {
2458 // Switch to single-pivot mode with Floyd-Rivest sub-sampling
2459 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING);
2460 return;
2461 }
2462 if (f < 0) {
2463 // Excess recursion, switch to heap select
2464 heapSelect(a, l, r, ka, kb);
2465 return;
2466 }
2467
2468 // Dual-pivot partitioning
2469 final int p0 = partition(a, l, r, upper);
2470 final int p1 = upper[0];
2471
2472 // Recursion to max depth
2473 // Note: Here we possibly branch left, middle and right with multiple keys.
2474 // It is possible that the partition has split the keys
2475 // and the recursion proceeds with a reduced set in each region.
2476 // p0 p1 p2 p3
2477 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r|
2478 // kb | ka
2479 f += RECURSION_INCREMENT;
2480 // Recurse left side if required
2481 if (ka < p0) {
2482 if (kb <= p1) {
2483 // Entirely on left side
2484 r = p0 - 1;
2485 if (r < kb) {
2486 kb = k.updateRight(r);
2487 }
2488 continue;
2489 }
2490 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f);
2491 // Here we must process middle and/or right
2492 ka = k.left();
2493 } else if (kb <= p1) {
2494 // No middle/right side
2495 return;
2496 } else if (ka <= p1) {
2497 // Advance lower bound
2498 ka = k.updateLeft(p1 + 1);
2499 }
2500 // Recurse middle if required
2501 final int p2 = upper[1];
2502 final int p3 = upper[2];
2503 if (ka < p2) {
2504 l = p1 + 1;
2505 if (kb <= p3) {
2506 // Entirely in middle
2507 r = p2 - 1;
2508 if (r < kb) {
2509 kb = k.updateRight(r);
2510 }
2511 continue;
2512 }
2513 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f);
2514 ka = k.left();
2515 } else if (kb <= p3) {
2516 // No right side
2517 return;
2518 } else if (ka <= p3) {
2519 ka = k.updateLeft(p3 + 1);
2520 }
2521 // Continue right
2522 l = p3 + 1;
2523 }
2524 }
2525
2526 /**
2527 * Partition an array slice around 2 pivots. Partitioning exchanges array elements
2528 * such that all elements smaller than pivot are before it and all elements larger
2529 * than pivot are after it.
2530 *
2531 * <p>This method returns 4 points describing the pivot ranges of equal values.
2532 *
2533 * <pre>{@code
2534 * |k0 k1| |k2 k3|
2535 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P |
2536 * }</pre>
2537 *
2538 * <ul>
2539 * <li>k0: lower pivot1 point</li>
2540 * <li>k1: upper pivot1 point (inclusive)</li>
2541 * <li>k2: lower pivot2 point</li>
2542 * <li>k3: upper pivot2 point (inclusive)</li>
2543 * </ul>
2544 *
2545 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are
2546 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result
2547 * is set to {@code k1 = k3; k2 == k0}. This can occur if
2548 * {@code P1 == P2} or there are zero or one value between the pivots
2549 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and
2550 * [k3+1, right] must check the length is {@code > 1}.
2551 *
2552 * @param a Data array.
2553 * @param left Lower bound (inclusive).
2554 * @param right Upper bound (inclusive).
2555 * @param bounds Points [k1, k2, k3].
2556 * @return Lower bound (inclusive) of the pivot range [k0].
2557 */
2558 private static int partition(int[] a, int left, int right, int[] bounds) {
2559 // Pick 2 pivots from 5 approximately uniform through the range.
2560 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much
2561 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406
2562 // Ensure the value is above zero to choose different points!
2563 final int n = right - left;
2564 final int step = 1 + (n >>> 3) + (n >>> 6);
2565 final int i3 = left + (n >>> 1);
2566 final int i2 = i3 - step;
2567 final int i1 = i2 - step;
2568 final int i4 = i3 + step;
2569 final int i5 = i4 + step;
2570 Sorting.sort5(a, i1, i2, i3, i4, i5);
2571
2572 // Partition data using pivots P1 and P2 into less-than, greater-than or between.
2573 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel.
2574 // k traverses the unknown region ??? and values moved if less-than or
2575 // greater-than:
2576 //
2577 // left less k great right
2578 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2|
2579 //
2580 // <P1 (left, lt)
2581 // P1 <= & <= P2 [lt, k)
2582 // >P2 (gt, right)
2583 //
2584 // At the end pivots are swapped back to behind the less and great pointers.
2585 //
2586 // | <P1 |P1| P1<= & <= P2 |P2| >P2 |
2587
2588 // Swap ends to the pivot locations.
2589 final int v1 = a[i2];
2590 a[i2] = a[left];
2591 a[left] = v1;
2592 final int v2 = a[i4];
2593 a[i4] = a[right];
2594 a[right] = v2;
2595
2596 // pointers
2597 int less = left;
2598 int great = right;
2599
2600 // Fast-forward ascending / descending runs to reduce swaps.
2601 // Cannot overrun as end pivots (v1 <= v2) act as sentinels.
2602 do {
2603 ++less;
2604 } while (a[less] < v1);
2605 do {
2606 --great;
2607 } while (a[great] > v2);
2608
2609 // a[less - 1] < P1 : a[great + 1] > P2
2610 // unvisited in [less, great]
2611 SORTING:
2612 for (int k = less; k <= great; k++) {
2613 final int v = a[k];
2614 if (v < v1) {
2615 // swap(a, k, less++)
2616 a[k] = a[less];
2617 a[less] = v;
2618 less++;
2619 } else if (v > v2) {
2620 // while k < great and a[great] > v2:
2621 // great--
2622 while (a[great] > v2) {
2623 if (great-- == k) {
2624 // Done
2625 break SORTING;
2626 }
2627 }
2628 // swap(a, k, great--)
2629 // if a[k] < v1:
2630 // swap(a, k, less++)
2631 final int w = a[great];
2632 a[great] = v;
2633 great--;
2634 // delay a[k] = w
2635 if (w < v1) {
2636 a[k] = a[less];
2637 a[less] = w;
2638 less++;
2639 } else {
2640 a[k] = w;
2641 }
2642 }
2643 }
2644
2645 // Change to inclusive ends : a[less] < P1 : a[great] > P2
2646 less--;
2647 great++;
2648 // Move the pivots to correct locations
2649 a[left] = a[less];
2650 a[less] = v1;
2651 a[right] = a[great];
2652 a[great] = v2;
2653
2654 // Record the pivot locations
2655 final int lower = less;
2656 bounds[2] = great;
2657
2658 // equal elements
2659 // Original paper: If middle partition is bigger than a threshold
2660 // then check for equal elements.
2661
2662 // Note: This is extra work. When performing partitioning the region of interest
2663 // may be entirely above or below the central region and this can be skipped.
2664
2665 // Here we look for equal elements if the centre is more than 5/8 the length.
2666 // 5/8 = 1/2 + 1/8. Pivots must be different.
2667 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) {
2668
2669 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends.
2670 // Since v1 != v2 these act as sentinels to prevent overrun.
2671 do {
2672 ++less;
2673 } while (a[less] == v1);
2674 do {
2675 --great;
2676 } while (a[great] == v2);
2677
2678 // This copies the logic in the sorting loop using == comparisons
2679 EQUAL:
2680 for (int k = less; k <= great; k++) {
2681 final int v = a[k];
2682 if (v == v1) {
2683 a[k] = a[less];
2684 a[less] = v;
2685 less++;
2686 } else if (v == v2) {
2687 while (a[great] == v2) {
2688 if (great-- == k) {
2689 // Done
2690 break EQUAL;
2691 }
2692 }
2693 final int w = a[great];
2694 a[great] = v;
2695 great--;
2696 if (w == v1) {
2697 a[k] = a[less];
2698 a[less] = w;
2699 less++;
2700 } else {
2701 a[k] = w;
2702 }
2703 }
2704 }
2705
2706 // Change to inclusive ends
2707 less--;
2708 great++;
2709 }
2710
2711 // Between pivots in (less, great)
2712 if (v1 != v2 && less < great - 1) {
2713 // Record the pivot end points
2714 bounds[0] = less;
2715 bounds[1] = great;
2716 } else {
2717 // No unsorted internal region (set k1 = k3; k2 = k0)
2718 bounds[0] = bounds[2];
2719 bounds[1] = lower;
2720 }
2721
2722 return lower;
2723 }
2724
2725 /**
2726 * Partition the elements between {@code ka} and {@code kb} using a heap select
2727 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
2728 *
2729 * @param a Data array to use to find out the K<sup>th</sup> value.
2730 * @param left Lower bound (inclusive).
2731 * @param right Upper bound (inclusive).
2732 * @param ka Lower index to select.
2733 * @param kb Upper index to select.
2734 */
2735 static void heapSelect(long[] a, int left, int right, int ka, int kb) {
2736 if (right <= left) {
2737 return;
2738 }
2739 // Use the smallest heap
2740 if (kb - left < right - ka) {
2741 heapSelectLeft(a, left, right, ka, kb);
2742 } else {
2743 heapSelectRight(a, left, right, ka, kb);
2744 }
2745 }
2746
2747 /**
2748 * Partition the elements between {@code ka} and {@code kb} using a heap select
2749 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
2750 *
2751 * <p>For best performance this should be called with {@code k} in the lower
2752 * half of the range.
2753 *
2754 * @param a Data array to use to find out the K<sup>th</sup> value.
2755 * @param left Lower bound (inclusive).
2756 * @param right Upper bound (inclusive).
2757 * @param ka Lower index to select.
2758 * @param kb Upper index to select.
2759 */
2760 static void heapSelectLeft(long[] a, int left, int right, int ka, int kb) {
2761 // Create a max heap in-place in [left, k], rooted at a[left] = max
2762 // |l|-max-heap-|k|--------------|
2763 // Build the heap using Floyd's heap-construction algorithm for heap size n.
2764 // Start at parent of the last element in the heap (k),
2765 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left
2766 int end = kb + 1;
2767 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) {
2768 maxHeapSiftDown(a, a[p], p, left, end);
2769 }
2770 // Scan the remaining data and insert
2771 // Mitigate worst case performance on descending data by backward sweep
2772 long max = a[left];
2773 for (int i = right; i > kb; i--) {
2774 final long v = a[i];
2775 if (v < max) {
2776 a[i] = max;
2777 maxHeapSiftDown(a, v, left, left, end);
2778 max = a[left];
2779 }
2780 }
2781 // Partition [ka, kb]
2782 // |l|-max-heap-|k|--------------|
2783 // | <-swap-> | then sift down reduced size heap
2784 // Avoid sifting heap of size 1
2785 final int last = Math.max(left, ka - 1);
2786 while (--end > last) {
2787 maxHeapSiftDown(a, a[end], left, left, end);
2788 a[end] = max;
2789 max = a[left];
2790 }
2791 }
2792
2793 /**
2794 * Sift the element down the max heap.
2795 *
2796 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root.
2797 *
2798 * @param a Heap data.
2799 * @param v Value to sift.
2800 * @param p Start position.
2801 * @param root Root of the heap.
2802 * @param end End of the heap (exclusive).
2803 */
2804 private static void maxHeapSiftDown(long[] a, long v, int p, int root, int end) {
2805 // child2 = root + 2 * (parent - root) + 2
2806 // = 2 * parent - root + 2
2807 while (true) {
2808 // Right child
2809 int c = (p << 1) - root + 2;
2810 if (c > end) {
2811 // No left child
2812 break;
2813 }
2814 // Use the left child if right doesn't exist, or it is greater
2815 if (c == end || a[c] < a[c - 1]) {
2816 --c;
2817 }
2818 if (v >= a[c]) {
2819 // Parent greater than largest child - done
2820 break;
2821 }
2822 // Swap and descend
2823 a[p] = a[c];
2824 p = c;
2825 }
2826 a[p] = v;
2827 }
2828
2829 /**
2830 * Partition the elements between {@code ka} and {@code kb} using a heap select
2831 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
2832 *
2833 * <p>For best performance this should be called with {@code k} in the upper
2834 * half of the range.
2835 *
2836 * @param a Data array to use to find out the K<sup>th</sup> value.
2837 * @param left Lower bound (inclusive).
2838 * @param right Upper bound (inclusive).
2839 * @param ka Lower index to select.
2840 * @param kb Upper index to select.
2841 */
2842 static void heapSelectRight(long[] a, int left, int right, int ka, int kb) {
2843 // Create a min heap in-place in [k, right], rooted at a[right] = min
2844 // |--------------|k|-min-heap-|r|
2845 // Build the heap using Floyd's heap-construction algorithm for heap size n.
2846 // Start at parent of the last element in the heap (k),
2847 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k
2848 int end = ka - 1;
2849 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) {
2850 minHeapSiftDown(a, a[p], p, right, end);
2851 }
2852 // Scan the remaining data and insert
2853 // Mitigate worst case performance on descending data by backward sweep
2854 long min = a[right];
2855 for (int i = left; i < ka; i++) {
2856 final long v = a[i];
2857 if (v > min) {
2858 a[i] = min;
2859 minHeapSiftDown(a, v, right, right, end);
2860 min = a[right];
2861 }
2862 }
2863 // Partition [ka, kb]
2864 // |--------------|k|-min-heap-|r|
2865 // | <-swap-> | then sift down reduced size heap
2866 // Avoid sifting heap of size 1
2867 final int last = Math.min(right, kb + 1);
2868 while (++end < last) {
2869 minHeapSiftDown(a, a[end], right, right, end);
2870 a[end] = min;
2871 min = a[right];
2872 }
2873 }
2874
2875 /**
2876 * Sift the element down the min heap.
2877 *
2878 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root.
2879 *
2880 * @param a Heap data.
2881 * @param v Value to sift.
2882 * @param p Start position.
2883 * @param root Root of the heap.
2884 * @param end End of the heap (exclusive).
2885 */
2886 private static void minHeapSiftDown(long[] a, long v, int p, int root, int end) {
2887 // child2 = root - 2 * (root - parent) - 2
2888 // = 2 * parent - root - 2
2889 while (true) {
2890 // Right child
2891 int c = (p << 1) - root - 2;
2892 if (c < end) {
2893 // No left child
2894 break;
2895 }
2896 // Use the left child if right doesn't exist, or it is less
2897 if (c == end || a[c] > a[c + 1]) {
2898 ++c;
2899 }
2900 if (v <= a[c]) {
2901 // Parent less than smallest child - done
2902 break;
2903 }
2904 // Swap and descend
2905 a[p] = a[c];
2906 p = c;
2907 }
2908 a[p] = v;
2909 }
2910
2911 /**
2912 * Partition the elements between {@code ka} and {@code kb} using a sort select
2913 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
2914 *
2915 * @param a Data array to use to find out the K<sup>th</sup> value.
2916 * @param left Lower bound (inclusive).
2917 * @param right Upper bound (inclusive).
2918 * @param ka Lower index to select.
2919 * @param kb Upper index to select.
2920 */
2921 static void sortSelect(long[] a, int left, int right, int ka, int kb) {
2922 // Combine the test for right <= left with
2923 // avoiding the overhead of sort select on tiny data.
2924 if (right - left <= MIN_SORTSELECT_SIZE) {
2925 Sorting.sort(a, left, right);
2926 return;
2927 }
2928 // Sort the smallest side
2929 if (kb - left < right - ka) {
2930 sortSelectLeft(a, left, right, kb);
2931 } else {
2932 sortSelectRight(a, left, right, ka);
2933 }
2934 }
2935
2936 /**
2937 * Partition the minimum {@code n} elements below {@code k} where
2938 * {@code n = k - left + 1}. Uses an insertion sort algorithm.
2939 *
2940 * <p>Works with any {@code k} in the range {@code left <= k <= right}
2941 * and performs a full sort of the range below {@code k}.
2942 *
2943 * <p>For best performance this should be called with
2944 * {@code k - left < right - k}, i.e.
2945 * to partition a value in the lower half of the range.
2946 *
2947 * @param a Data array to use to find out the K<sup>th</sup> value.
2948 * @param left Lower bound (inclusive).
2949 * @param right Upper bound (inclusive).
2950 * @param k Index to select.
2951 */
2952 static void sortSelectLeft(long[] a, int left, int right, int k) {
2953 // Sort
2954 for (int i = left; ++i <= k;) {
2955 final long v = a[i];
2956 // Move preceding higher elements above (if required)
2957 if (v < a[i - 1]) {
2958 int j = i;
2959 while (--j >= left && v < a[j]) {
2960 a[j + 1] = a[j];
2961 }
2962 a[j + 1] = v;
2963 }
2964 }
2965 // Scan the remaining data and insert
2966 // Mitigate worst case performance on descending data by backward sweep
2967 long m = a[k];
2968 for (int i = right; i > k; i--) {
2969 final long v = a[i];
2970 if (v < m) {
2971 a[i] = m;
2972 int j = k;
2973 while (--j >= left && v < a[j]) {
2974 a[j + 1] = a[j];
2975 }
2976 a[j + 1] = v;
2977 m = a[k];
2978 }
2979 }
2980 }
2981
2982 /**
2983 * Partition the maximum {@code n} elements above {@code k} where
2984 * {@code n = right - k + 1}. Uses an insertion sort algorithm.
2985 *
2986 * <p>Works with any {@code k} in the range {@code left <= k <= right}
2987 * and can be used to perform a full sort of the range above {@code k}.
2988 *
2989 * <p>For best performance this should be called with
2990 * {@code k - left > right - k}, i.e.
2991 * to partition a value in the upper half of the range.
2992 *
2993 * @param a Data array to use to find out the K<sup>th</sup> value.
2994 * @param left Lower bound (inclusive).
2995 * @param right Upper bound (inclusive).
2996 * @param k Index to select.
2997 */
2998 static void sortSelectRight(long[] a, int left, int right, int k) {
2999 // Sort
3000 for (int i = right; --i >= k;) {
3001 final long v = a[i];
3002 // Move succeeding lower elements below (if required)
3003 if (v > a[i + 1]) {
3004 int j = i;
3005 while (++j <= right && v > a[j]) {
3006 a[j - 1] = a[j];
3007 }
3008 a[j - 1] = v;
3009 }
3010 }
3011 // Scan the remaining data and insert
3012 // Mitigate worst case performance on descending data by backward sweep
3013 long m = a[k];
3014 for (int i = left; i < k; i++) {
3015 final long v = a[i];
3016 if (v > m) {
3017 a[i] = m;
3018 int j = k;
3019 while (++j <= right && v > a[j]) {
3020 a[j - 1] = a[j];
3021 }
3022 a[j - 1] = v;
3023 m = a[k];
3024 }
3025 }
3026 }
3027
3028 /**
3029 * Partition the array such that index {@code k} corresponds to its correctly
3030 * sorted value in the equivalent fully sorted array.
3031 *
3032 * <p>Assumes {@code k} is a valid index into [left, right].
3033 *
3034 * @param a Values.
3035 * @param left Lower bound of data (inclusive).
3036 * @param right Upper bound of data (inclusive).
3037 * @param k Index.
3038 */
3039 static void select(long[] a, int left, int right, int k) {
3040 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING);
3041 }
3042
3043 /**
3044 * Partition the array such that indices {@code k} correspond to their correctly
3045 * sorted value in the equivalent fully sorted array.
3046 *
3047 * @param a Values.
3048 * @param left Lower bound of data (inclusive).
3049 * @param right Upper bound of data (inclusive).
3050 * @param k Indices (may be destructively modified).
3051 * @param n Count of indices.
3052 */
3053 static void select(long[] a, int left, int right, int[] k, int n) {
3054 if (n == 1) {
3055 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING);
3056 return;
3057 }
3058
3059 // Interval creation validates the indices are in [left, right]
3060 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(k, n);
3061
3062 // Note: If the keys are not separated then they are effectively a single key.
3063 // Any split of keys separated by the sort select size
3064 // will be finished on the next iteration.
3065 final int k1 = keys.left();
3066 final int kn = keys.right();
3067 if (kn - k1 < DP_SORTSELECT_SIZE) {
3068 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING);
3069 } else {
3070 // Dual-pivot mode with small range sort length configured using index density
3071 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn));
3072 }
3073 }
3074
3075 /**
3076 * Partition the array such that indices {@code k} correspond to their
3077 * correctly sorted value in the equivalent fully sorted array.
3078 *
3079 * <p>For all indices {@code [ka, kb]} and any index {@code i}:
3080 *
3081 * <pre>{@code
3082 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i]
3083 * }</pre>
3084 *
3085 * <p>This function accepts indices {@code [ka, kb]} that define the
3086 * range of indices to partition. It is expected that the range is small.
3087 *
3088 * <p>The {@code flags} are used to control the sampling mode and adaption of
3089 * the index within the sample.
3090 *
3091 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher
3092 * than the keys if equal values are present in the data.
3093 *
3094 * @param a Values.
3095 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
3096 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
3097 * @param ka First key of interest.
3098 * @param kb Last key of interest.
3099 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive).
3100 * @param flags Adaption flags.
3101 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive).
3102 */
3103 static int quickSelectAdaptive(long[] a, int left, int right, int ka, int kb,
3104 int[] bounds, int flags) {
3105 int l = left;
3106 int r = right;
3107 int m = flags;
3108 while (true) {
3109 // Select when ka and kb are close to the same end
3110 // |l|-----|ka|kkkkkkkk|kb|------|r|
3111 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) {
3112 sortSelect(a, l, r, ka, kb);
3113 bounds[0] = kb;
3114 return ka;
3115 }
3116
3117 // Only target ka; kb is assumed to be close
3118 int p0;
3119 final int n = r - l;
3120 // f in [0, 1]
3121 final double f = (double) (ka - l) / n;
3122 // Record the larger margin (start at 1/4) to create the estimated size.
3123 // step L R
3124 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328)
3125 // left 1/6 1/4
3126 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219)
3127 int margin = n >> 2;
3128 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) {
3129 // Floyd-Rivest sample step uses the same margins
3130 p0 = sampleStep(a, l, r, ka, bounds);
3131 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) {
3132 margin += (n >> 5) + (n >> 6);
3133 } else if (f > STEP_LEFT && f < STEP_RIGHT) {
3134 margin -= n >> 5;
3135 }
3136 } else if (f <= STEP_LEFT) {
3137 if (f <= STEP_FAR_LEFT) {
3138 margin += (n >> 5) + (n >> 6);
3139 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m);
3140 } else {
3141 p0 = repeatedStepLeft(a, l, r, ka, bounds, m);
3142 }
3143 } else if (f >= STEP_RIGHT) {
3144 if (f >= STEP_FAR_RIGHT) {
3145 margin += (n >> 5) + (n >> 6);
3146 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m);
3147 } else {
3148 p0 = repeatedStepRight(a, l, r, ka, bounds, m);
3149 }
3150 } else {
3151 margin -= n >> 5;
3152 p0 = repeatedStep(a, l, r, ka, bounds, m);
3153 }
3154
3155 // Note: Here we expect [ka, kb] to be small and splitting is unlikely.
3156 // p0 p1
3157 // |l|--|ka|kkkk|kb|--|P|-------------------|r|
3158 // |l|----------------|P|--|ka|kkk|kb|------|r|
3159 // |l|-----------|ka|k|P|k|kb|--------------|r|
3160 final int p1 = bounds[0];
3161 if (kb < p0) {
3162 // Entirely on left side
3163 r = p0 - 1;
3164 } else if (ka > p1) {
3165 // Entirely on right side
3166 l = p1 + 1;
3167 } else {
3168 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish.
3169 // Here we set the bounds for use after median-of-medians pivot selection.
3170 // In the event there are many equal values this allows collecting those
3171 // known to be equal together when moving around the medians sample.
3172 if (kb > p1) {
3173 sortSelectLeft(a, p1 + 1, r, kb);
3174 bounds[0] = kb;
3175 }
3176 if (ka < p0) {
3177 sortSelectRight(a, l, p0 - 1, ka);
3178 p0 = ka;
3179 }
3180 return p0;
3181 }
3182 // Update mode based on target partition size
3183 if (r - l > n - margin) {
3184 m++;
3185 }
3186 }
3187 }
3188
3189 /**
3190 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3191 * that all elements smaller than pivot are before it and all elements larger than
3192 * pivot are after it.
3193 *
3194 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will
3195 * fall in the smaller partition when the entire range is partitioned.
3196 *
3197 * <p>Assumes the range {@code r - l} is large.
3198 *
3199 * @param a Data array.
3200 * @param l Lower bound (inclusive).
3201 * @param r Upper bound (inclusive).
3202 * @param k Target index.
3203 * @param upper Upper bound (inclusive) of the pivot range.
3204 * @return Lower bound (inclusive) of the pivot range.
3205 */
3206 private static int sampleStep(long[] a, int l, int r, int k, int[] upper) {
3207 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate
3208 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the
3209 // (k-l+1)-th element is expected to lie in the smaller set after partitioning.
3210 final int n = r - l + 1;
3211 final int ith = k - l + 1;
3212 final double z = Math.log(n);
3213 // sample size = 0.5 * n^(2/3)
3214 final double s = 0.5 * Math.exp(0.6666666666666666 * z);
3215 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1));
3216 final int ll = Math.max(l, (int) (k - ith * s / n + sd));
3217 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd));
3218 // Sample recursion restarts from [ll, rr]
3219 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING);
3220 return expandPartition(a, l, r, ll, rr, p, upper[0], upper);
3221 }
3222
3223 /**
3224 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3225 * that all elements smaller than pivot are before it and all elements larger than
3226 * pivot are after it.
3227 *
3228 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller
3229 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}.
3230 *
3231 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
3232 * with the median of 3 then median of 3; the final sample is placed in the
3233 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
3234 *
3235 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9.
3236 *
3237 * @param a Data array.
3238 * @param l Lower bound (inclusive).
3239 * @param r Upper bound (inclusive).
3240 * @param k Target index.
3241 * @param upper Upper bound (inclusive) of the pivot range.
3242 * @param flags Control flags.
3243 * @return Lower bound (inclusive) of the pivot range.
3244 */
3245 private static int repeatedStep(long[] a, int l, int r, int k, int[] upper, int flags) {
3246 // Adapted from Alexandrescu (2016), algorithm 8.
3247 final int fp;
3248 final int s;
3249 int p;
3250 if (flags <= MODE_SAMPLING) {
3251 // Median into a 12th-tile
3252 fp = (r - l + 1) / 12;
3253 // Position the sample around the target k
3254 s = k - mapDistance(k - l, l, r, fp);
3255 p = k;
3256 } else {
3257 // i in tertile [3f':6f')
3258 fp = (r - l + 1) / 9;
3259 final int f3 = 3 * fp;
3260 final int end = l + (f3 << 1);
3261 for (int i = l + f3; i < end; i++) {
3262 Sorting.sort3(a, i - f3, i, i + f3);
3263 }
3264 // 5th 9th-tile: [4f':5f')
3265 s = l + (fp << 2);
3266 // No adaption uses the middle to enforce strict margins
3267 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
3268 }
3269 final int e = s + fp - 1;
3270 for (int i = s; i <= e; i++) {
3271 Sorting.sort3(a, i - fp, i, i + fp);
3272 }
3273 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
3274 return expandPartition(a, l, r, s, e, p, upper[0], upper);
3275 }
3276
3277 /**
3278 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3279 * that all elements smaller than pivot are before it and all elements larger than
3280 * pivot are after it.
3281 *
3282 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
3283 * range.
3284 *
3285 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
3286 * with the lower median of 4 then either median of 3 with the final sample placed in the
3287 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile;
3288 * the pivot chosen from the sample is adaptive using the input {@code k}.
3289 *
3290 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4.
3291 *
3292 * @param a Data array.
3293 * @param l Lower bound (inclusive).
3294 * @param r Upper bound (inclusive).
3295 * @param k Target index.
3296 * @param upper Upper bound (inclusive) of the pivot range.
3297 * @param flags Control flags.
3298 * @return Lower bound (inclusive) of the pivot range.
3299 */
3300 private static int repeatedStepLeft(long[] a, int l, int r, int k, int[] upper, int flags) {
3301 // Adapted from Alexandrescu (2016), algorithm 9.
3302 final int fp;
3303 final int s;
3304 int p;
3305 if (flags <= MODE_SAMPLING) {
3306 // Median into a 12th-tile
3307 fp = (r - l + 1) / 12;
3308 // Position the sample around the target k
3309 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12
3310 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp);
3311 p = k;
3312 } else {
3313 // i in 2nd quartile
3314 final int f = (r - l + 1) >> 2;
3315 final int f2 = f + f;
3316 final int end = l + f2;
3317 for (int i = l + f; i < end; i++) {
3318 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2);
3319 }
3320 // i in 5th 12th-tile
3321 fp = f / 3;
3322 s = l + f + fp;
3323 // No adaption uses the middle to enforce strict margins
3324 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
3325 }
3326 final int e = s + fp - 1;
3327 for (int i = s; i <= e; i++) {
3328 Sorting.sort3(a, i - fp, i, i + fp);
3329 }
3330 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
3331 return expandPartition(a, l, r, s, e, p, upper[0], upper);
3332 }
3333
3334 /**
3335 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3336 * that all elements smaller than pivot are before it and all elements larger than
3337 * pivot are after it.
3338 *
3339 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
3340 * range.
3341 *
3342 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
3343 * with the upper median of 4 then either median of 3 with the final sample placed in the
3344 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile;
3345 * the pivot chosen from the sample is adaptive using the input {@code k}.
3346 *
3347 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6.
3348 *
3349 * @param a Data array.
3350 * @param l Lower bound (inclusive).
3351 * @param r Upper bound (inclusive).
3352 * @param k Target index.
3353 * @param upper Upper bound (inclusive) of the pivot range.
3354 * @param flags Control flags.
3355 * @return Lower bound (inclusive) of the pivot range.
3356 */
3357 private static int repeatedStepRight(long[] a, int l, int r, int k, int[] upper, int flags) {
3358 // Mirror image repeatedStepLeft using upper median into 3rd quartile
3359 final int fp;
3360 final int e;
3361 int p;
3362 if (flags <= MODE_SAMPLING) {
3363 // Median into a 12th-tile
3364 fp = (r - l + 1) / 12;
3365 // Position the sample around the target k
3366 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12
3367 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp);
3368 p = k;
3369 } else {
3370 // i in 3rd quartile
3371 final int f = (r - l + 1) >> 2;
3372 final int f2 = f + f;
3373 final int end = r - f2;
3374 for (int i = r - f; i > end; i--) {
3375 Sorting.upperMedian4(a, i - f2, i - f, i, i + f);
3376 }
3377 // i in 8th 12th-tile
3378 fp = f / 3;
3379 e = r - f - fp;
3380 // No adaption uses the middle to enforce strict margins
3381 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1));
3382 }
3383 final int s = e - fp + 1;
3384 for (int i = s; i <= e; i++) {
3385 Sorting.sort3(a, i - fp, i, i + fp);
3386 }
3387 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
3388 return expandPartition(a, l, r, s, e, p, upper[0], upper);
3389 }
3390
3391 /**
3392 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3393 * that all elements smaller than pivot are before it and all elements larger than
3394 * pivot are after it.
3395 *
3396 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
3397 * range.
3398 *
3399 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
3400 * with the minimum of 4 then median of 3; the final sample is placed in the
3401 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
3402 *
3403 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3.
3404 *
3405 * @param a Data array.
3406 * @param l Lower bound (inclusive).
3407 * @param r Upper bound (inclusive).
3408 * @param k Target index.
3409 * @param upper Upper bound (inclusive) of the pivot range.
3410 * @param flags Control flags.
3411 * @return Lower bound (inclusive) of the pivot range.
3412 */
3413 private static int repeatedStepFarLeft(long[] a, int l, int r, int k, int[] upper, int flags) {
3414 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3
3415 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile.
3416 // The differences are:
3417 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
3418 // - The position of the sample is closer to the expected location of k < |A| / 12.
3419 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
3420 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving
3421 // k in the larger parition and a wasted iteration.
3422 // - Adaption is adjusted to force use of the lower margin when not sampling.
3423 final int fp;
3424 final int s;
3425 int p;
3426 if (flags <= MODE_SAMPLING) {
3427 // 2nd 12th-tile
3428 fp = (r - l + 1) / 12;
3429 s = l + fp;
3430 // Use adaption
3431 p = s + mapDistance(k - l, l, r, fp);
3432 } else {
3433 // i in 2nd quartile; min into i-f (1st quartile)
3434 final int f = (r - l + 1) >> 2;
3435 final int f2 = f + f;
3436 final int end = l + f2;
3437 for (int i = l + f; i < end; i++) {
3438 if (a[i + f] < a[i - f]) {
3439 final long u = a[i + f];
3440 a[i + f] = a[i - f];
3441 a[i - f] = u;
3442 }
3443 if (a[i + f2] < a[i]) {
3444 final long v = a[i + f2];
3445 a[i + f2] = a[i];
3446 a[i] = v;
3447 }
3448 if (a[i] < a[i - f]) {
3449 final long u = a[i];
3450 a[i] = a[i - f];
3451 a[i - f] = u;
3452 }
3453 }
3454 // 2nd 12th-tile
3455 fp = f / 3;
3456 s = l + fp;
3457 // Lower margin has 2(d+1) elements; d == (position in sample) - s
3458 // Force k into the lower margin. Note p will be within [s, e]
3459 // if: (double) (k - l) / (r - l) < 1/12 => (k - l) / 2 < (r - l) / 24
3460 p = s + ((k - l) >>> 1);
3461 }
3462 final int e = s + fp - 1;
3463 for (int i = s; i <= e; i++) {
3464 Sorting.sort3(a, i - fp, i, i + fp);
3465 }
3466 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
3467 return expandPartition(a, l, r, s, e, p, upper[0], upper);
3468 }
3469
3470 /**
3471 * Partition an array slice around a pivot. Partitioning exchanges array elements such
3472 * that all elements smaller than pivot are before it and all elements larger than
3473 * pivot are after it.
3474 *
3475 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
3476 * range.
3477 *
3478 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
3479 * with the maximum of 4 then median of 3; the final sample is placed in the
3480 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
3481 *
3482 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12.
3483 *
3484 * @param a Data array.
3485 * @param l Lower bound (inclusive).
3486 * @param r Upper bound (inclusive).
3487 * @param k Target index.
3488 * @param upper Upper bound (inclusive) of the pivot range.
3489 * @param flags Control flags.
3490 * @return Lower bound (inclusive) of the pivot range.
3491 */
3492 private static int repeatedStepFarRight(long[] a, int l, int r, int k, int[] upper, int flags) {
3493 // Mirror image repeatedStepFarLeft
3494 final int fp;
3495 final int e;
3496 int p;
3497 if (flags <= MODE_SAMPLING) {
3498 // 11th 12th-tile
3499 fp = (r - l + 1) / 12;
3500 e = r - fp;
3501 // Use adaption
3502 p = e - mapDistance(r - k, l, r, fp);
3503 } else {
3504 // i in 3rd quartile; max into i+f (4th quartile)
3505 final int f = (r - l + 1) >> 2;
3506 final int f2 = f + f;
3507 final int end = r - f2;
3508 for (int i = r - f; i > end; i--) {
3509 if (a[i - f] > a[i + f]) {
3510 final long u = a[i - f];
3511 a[i - f] = a[i + f];
3512 a[i + f] = u;
3513 }
3514 if (a[i - f2] > a[i]) {
3515 final long v = a[i - f2];
3516 a[i - f2] = a[i];
3517 a[i] = v;
3518 }
3519 if (a[i] > a[i + f]) {
3520 final long u = a[i];
3521 a[i] = a[i + f];
3522 a[i + f] = u;
3523 }
3524 }
3525 // 11th 12th-tile
3526 fp = f / 3;
3527 e = r - fp;
3528 // Upper margin has 2(d+1) elements; d == e - (position in sample)
3529 // Force k into the upper margin. Note p will be within [s, e]
3530 // if: (double) (r - k) / (r - l) < 1/12 => (r - k) / 2 < (r - l) / 24
3531 p = e - ((r - k) >>> 1);
3532 }
3533 final int s = e - fp + 1;
3534 for (int i = s; i <= e; i++) {
3535 Sorting.sort3(a, i - fp, i, i + fp);
3536 }
3537 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
3538 return expandPartition(a, l, r, s, e, p, upper[0], upper);
3539 }
3540
3541 /**
3542 * Expand a partition around a single pivot. Partitioning exchanges array
3543 * elements such that all elements smaller than pivot are before it and all
3544 * elements larger than pivot are after it. The central region is already
3545 * partitioned.
3546 *
3547 * <pre>{@code
3548 * |l |s |p0 p1| e| r|
3549 * | ??? | <P | ==P | >P | ??? |
3550 * }</pre>
3551 *
3552 * <p>This requires that {@code start != end}. However it handles
3553 * {@code left == start} and/or {@code end == right}.
3554 *
3555 * @param a Data array.
3556 * @param left Lower bound (inclusive).
3557 * @param right Upper bound (inclusive).
3558 * @param start Start of the partition range (inclusive).
3559 * @param end End of the partitioned range (inclusive).
3560 * @param pivot0 Lower pivot location (inclusive).
3561 * @param pivot1 Upper pivot location (inclusive).
3562 * @param upper Upper bound (inclusive) of the pivot range [k1].
3563 * @return Lower bound (inclusive) of the pivot range [k0].
3564 */
3565 // package-private for testing
3566 static int expandPartition(long[] a, int left, int right, int start, int end,
3567 int pivot0, int pivot1, int[] upper) {
3568 // 3-way partition of the data using a pivot value into
3569 // less-than, equal or greater-than.
3570 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then
3571 // check for equal to the pivot and move again.
3572 //
3573 // Move sentinels from start and end to left and right. Scan towards the
3574 // sentinels until >=,<=. Swap then move == to the pivot region.
3575 // <-i j->
3576 // |l | | |p0 p1| | | r|
3577 // |>=| ??? | < | == | > | ??? |<=|
3578 //
3579 // When either i or j reach the edge perform finishing loop.
3580 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value
3581 // to p0 for < and updates the pivot range p1 (and optionally p0):
3582 // j->
3583 // |l |p0 p1| | | r|
3584 // | < | == | > | ??? |<=|
3585
3586 final long v = a[pivot0];
3587 // Use start/end as sentinels (requires start != end)
3588 long vi = a[start];
3589 long vj = a[end];
3590 a[start] = a[left];
3591 a[end] = a[right];
3592 a[left] = vj;
3593 a[right] = vi;
3594
3595 int i = start + 1;
3596 int j = end - 1;
3597
3598 // Positioned for pre-in/decrement to write to pivot region
3599 int p0 = pivot0 == start ? i : pivot0;
3600 int p1 = pivot1 == end ? j : pivot1;
3601
3602 while (true) {
3603 do {
3604 --i;
3605 } while (a[i] < v);
3606 do {
3607 ++j;
3608 } while (a[j] > v);
3609 vj = a[i];
3610 vi = a[j];
3611 a[i] = vi;
3612 a[j] = vj;
3613 // Move the equal values to pivot region
3614 if (vi == v) {
3615 a[i] = a[--p0];
3616 a[p0] = v;
3617 }
3618 if (vj == v) {
3619 a[j] = a[++p1];
3620 a[p1] = v;
3621 }
3622 // Termination check and finishing loops.
3623 // Note: This works even if pivot region is zero length (p1 == p0-1 due to
3624 // length 1 pivot region at either start/end) because we pre-inc/decrement
3625 // one side and post-inc/decrement the other side.
3626 if (i == left) {
3627 while (j < right) {
3628 do {
3629 ++j;
3630 } while (a[j] > v);
3631 final long w = a[j];
3632 // Move upper bound of pivot region
3633 a[j] = a[++p1];
3634 a[p1] = v;
3635 // Move lower bound of pivot region
3636 if (w != v) {
3637 a[p0] = w;
3638 p0++;
3639 }
3640 }
3641 break;
3642 }
3643 if (j == right) {
3644 while (i > left) {
3645 do {
3646 --i;
3647 } while (a[i] < v);
3648 final long w = a[i];
3649 // Move lower bound of pivot region
3650 a[i] = a[--p0];
3651 a[p0] = v;
3652 // Move upper bound of pivot region
3653 if (w != v) {
3654 a[p1] = w;
3655 p1--;
3656 }
3657 }
3658 break;
3659 }
3660 }
3661
3662 upper[0] = p1;
3663 return p0;
3664 }
3665
3666 /**
3667 * Partition the array such that indices {@code k} correspond to their
3668 * correctly sorted value in the equivalent fully sorted array.
3669 *
3670 * <p>For all indices {@code k} and any index {@code i}:
3671 *
3672 * <pre>{@code
3673 * data[i < k] <= data[k] <= data[k < i]
3674 * }</pre>
3675 *
3676 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the
3677 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as
3678 * partitioning divides the range.
3679 *
3680 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort
3681 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of
3682 * the quickselect is a heapselect.
3683 *
3684 * <p>The {@code flags} contain the current recursion count and the configured
3685 * length threshold for {@code r - l} to perform sort select. The count is in the upper
3686 * bits and the threshold is in the lower bits.
3687 *
3688 * @param a Values.
3689 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
3690 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
3691 * @param k Interval of indices to partition (ordered).
3692 * @param flags Control flags.
3693 */
3694 // package-private for testing
3695 static void dualPivotQuickSelect(long[] a, int left, int right, UpdatingInterval k, int flags) {
3696 // If partitioning splits the interval then recursion is used for the left-most side(s)
3697 // and the right-most side remains within this function. If partitioning does
3698 // not split the interval then it remains within this function.
3699 int l = left;
3700 int r = right;
3701 int f = flags;
3702 int ka = k.left();
3703 int kb = k.right();
3704 final int[] upper = {0, 0, 0};
3705 while (true) {
3706 // Select when ka and kb are close to the same end,
3707 // or the entire range is small
3708 // |l|-----|ka|--------|kb|------|r|
3709 final int n = r - l;
3710 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE ||
3711 n < (f & SORTSELECT_MASK)) {
3712 sortSelect(a, l, r, ka, kb);
3713 return;
3714 }
3715 if (kb - ka < DP_SORTSELECT_SIZE) {
3716 // Switch to single-pivot mode with Floyd-Rivest sub-sampling
3717 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING);
3718 return;
3719 }
3720 if (f < 0) {
3721 // Excess recursion, switch to heap select
3722 heapSelect(a, l, r, ka, kb);
3723 return;
3724 }
3725
3726 // Dual-pivot partitioning
3727 final int p0 = partition(a, l, r, upper);
3728 final int p1 = upper[0];
3729
3730 // Recursion to max depth
3731 // Note: Here we possibly branch left, middle and right with multiple keys.
3732 // It is possible that the partition has split the keys
3733 // and the recursion proceeds with a reduced set in each region.
3734 // p0 p1 p2 p3
3735 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r|
3736 // kb | ka
3737 f += RECURSION_INCREMENT;
3738 // Recurse left side if required
3739 if (ka < p0) {
3740 if (kb <= p1) {
3741 // Entirely on left side
3742 r = p0 - 1;
3743 if (r < kb) {
3744 kb = k.updateRight(r);
3745 }
3746 continue;
3747 }
3748 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f);
3749 // Here we must process middle and/or right
3750 ka = k.left();
3751 } else if (kb <= p1) {
3752 // No middle/right side
3753 return;
3754 } else if (ka <= p1) {
3755 // Advance lower bound
3756 ka = k.updateLeft(p1 + 1);
3757 }
3758 // Recurse middle if required
3759 final int p2 = upper[1];
3760 final int p3 = upper[2];
3761 if (ka < p2) {
3762 l = p1 + 1;
3763 if (kb <= p3) {
3764 // Entirely in middle
3765 r = p2 - 1;
3766 if (r < kb) {
3767 kb = k.updateRight(r);
3768 }
3769 continue;
3770 }
3771 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f);
3772 ka = k.left();
3773 } else if (kb <= p3) {
3774 // No right side
3775 return;
3776 } else if (ka <= p3) {
3777 ka = k.updateLeft(p3 + 1);
3778 }
3779 // Continue right
3780 l = p3 + 1;
3781 }
3782 }
3783
3784 /**
3785 * Partition an array slice around 2 pivots. Partitioning exchanges array elements
3786 * such that all elements smaller than pivot are before it and all elements larger
3787 * than pivot are after it.
3788 *
3789 * <p>This method returns 4 points describing the pivot ranges of equal values.
3790 *
3791 * <pre>{@code
3792 * |k0 k1| |k2 k3|
3793 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P |
3794 * }</pre>
3795 *
3796 * <ul>
3797 * <li>k0: lower pivot1 point</li>
3798 * <li>k1: upper pivot1 point (inclusive)</li>
3799 * <li>k2: lower pivot2 point</li>
3800 * <li>k3: upper pivot2 point (inclusive)</li>
3801 * </ul>
3802 *
3803 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are
3804 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result
3805 * is set to {@code k1 = k3; k2 == k0}. This can occur if
3806 * {@code P1 == P2} or there are zero or one value between the pivots
3807 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and
3808 * [k3+1, right] must check the length is {@code > 1}.
3809 *
3810 * @param a Data array.
3811 * @param left Lower bound (inclusive).
3812 * @param right Upper bound (inclusive).
3813 * @param bounds Points [k1, k2, k3].
3814 * @return Lower bound (inclusive) of the pivot range [k0].
3815 */
3816 private static int partition(long[] a, int left, int right, int[] bounds) {
3817 // Pick 2 pivots from 5 approximately uniform through the range.
3818 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much
3819 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406
3820 // Ensure the value is above zero to choose different points!
3821 final int n = right - left;
3822 final int step = 1 + (n >>> 3) + (n >>> 6);
3823 final int i3 = left + (n >>> 1);
3824 final int i2 = i3 - step;
3825 final int i1 = i2 - step;
3826 final int i4 = i3 + step;
3827 final int i5 = i4 + step;
3828 Sorting.sort5(a, i1, i2, i3, i4, i5);
3829
3830 // Partition data using pivots P1 and P2 into less-than, greater-than or between.
3831 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel.
3832 // k traverses the unknown region ??? and values moved if less-than or
3833 // greater-than:
3834 //
3835 // left less k great right
3836 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2|
3837 //
3838 // <P1 (left, lt)
3839 // P1 <= & <= P2 [lt, k)
3840 // >P2 (gt, right)
3841 //
3842 // At the end pivots are swapped back to behind the less and great pointers.
3843 //
3844 // | <P1 |P1| P1<= & <= P2 |P2| >P2 |
3845
3846 // Swap ends to the pivot locations.
3847 final long v1 = a[i2];
3848 a[i2] = a[left];
3849 a[left] = v1;
3850 final long v2 = a[i4];
3851 a[i4] = a[right];
3852 a[right] = v2;
3853
3854 // pointers
3855 int less = left;
3856 int great = right;
3857
3858 // Fast-forward ascending / descending runs to reduce swaps.
3859 // Cannot overrun as end pivots (v1 <= v2) act as sentinels.
3860 do {
3861 ++less;
3862 } while (a[less] < v1);
3863 do {
3864 --great;
3865 } while (a[great] > v2);
3866
3867 // a[less - 1] < P1 : a[great + 1] > P2
3868 // unvisited in [less, great]
3869 SORTING:
3870 for (int k = less; k <= great; k++) {
3871 final long v = a[k];
3872 if (v < v1) {
3873 // swap(a, k, less++)
3874 a[k] = a[less];
3875 a[less] = v;
3876 less++;
3877 } else if (v > v2) {
3878 // while k < great and a[great] > v2:
3879 // great--
3880 while (a[great] > v2) {
3881 if (great-- == k) {
3882 // Done
3883 break SORTING;
3884 }
3885 }
3886 // swap(a, k, great--)
3887 // if a[k] < v1:
3888 // swap(a, k, less++)
3889 final long w = a[great];
3890 a[great] = v;
3891 great--;
3892 // delay a[k] = w
3893 if (w < v1) {
3894 a[k] = a[less];
3895 a[less] = w;
3896 less++;
3897 } else {
3898 a[k] = w;
3899 }
3900 }
3901 }
3902
3903 // Change to inclusive ends : a[less] < P1 : a[great] > P2
3904 less--;
3905 great++;
3906 // Move the pivots to correct locations
3907 a[left] = a[less];
3908 a[less] = v1;
3909 a[right] = a[great];
3910 a[great] = v2;
3911
3912 // Record the pivot locations
3913 final int lower = less;
3914 bounds[2] = great;
3915
3916 // equal elements
3917 // Original paper: If middle partition is bigger than a threshold
3918 // then check for equal elements.
3919
3920 // Note: This is extra work. When performing partitioning the region of interest
3921 // may be entirely above or below the central region and this can be skipped.
3922
3923 // Here we look for equal elements if the centre is more than 5/8 the length.
3924 // 5/8 = 1/2 + 1/8. Pivots must be different.
3925 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) {
3926
3927 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends.
3928 // Since v1 != v2 these act as sentinels to prevent overrun.
3929 do {
3930 ++less;
3931 } while (a[less] == v1);
3932 do {
3933 --great;
3934 } while (a[great] == v2);
3935
3936 // This copies the logic in the sorting loop using == comparisons
3937 EQUAL:
3938 for (int k = less; k <= great; k++) {
3939 final long v = a[k];
3940 if (v == v1) {
3941 a[k] = a[less];
3942 a[less] = v;
3943 less++;
3944 } else if (v == v2) {
3945 while (a[great] == v2) {
3946 if (great-- == k) {
3947 // Done
3948 break EQUAL;
3949 }
3950 }
3951 final long w = a[great];
3952 a[great] = v;
3953 great--;
3954 if (w == v1) {
3955 a[k] = a[less];
3956 a[less] = w;
3957 less++;
3958 } else {
3959 a[k] = w;
3960 }
3961 }
3962 }
3963
3964 // Change to inclusive ends
3965 less--;
3966 great++;
3967 }
3968
3969 // Between pivots in (less, great)
3970 if (v1 != v2 && less < great - 1) {
3971 // Record the pivot end points
3972 bounds[0] = less;
3973 bounds[1] = great;
3974 } else {
3975 // No unsorted internal region (set k1 = k3; k2 = k0)
3976 bounds[0] = bounds[2];
3977 bounds[1] = lower;
3978 }
3979
3980 return lower;
3981 }
3982
3983 /**
3984 * Map the distance from the edge of {@code [l, r]} to a new distance in {@code [0, n)}.
3985 *
3986 * <p>The provides the adaption {@code kf'/|A|} from Alexandrescu (2016) where
3987 * {@code k == d}, {@code f' == n} and {@code |A| == r-l+1}.
3988 *
3989 * <p>For convenience this accepts the input range {@code [l, r]}.
3990 *
3991 * @param d Distance from the edge in {@code [0, r - l]}.
3992 * @param l Lower bound (inclusive).
3993 * @param r Upper bound (inclusive).
3994 * @param n Size of the new range.
3995 * @return the mapped distance in [0, n)
3996 */
3997 private static int mapDistance(int d, int l, int r, int n) {
3998 return (int) (d * (n - 1.0) / (r - l));
3999 }
4000
4001 /**
4002 * Configure the dual-pivot control flags. This packs the maximum recursion depth and
4003 * sort select size into a single integer.
4004 *
4005 * @param left Lower bound (inclusive).
4006 * @param right Upper bound (inclusive).
4007 * @param k1 First key of interest.
4008 * @param kn Last key of interest.
4009 * @return the flags
4010 */
4011 private static int dualPivotFlags(int left, int right, int k1, int kn) {
4012 final int maxDepth = dualPivotMaxDepth(right - left);
4013 final int ss = dualPivotSortSelectSize(k1, kn);
4014 return dualPivotFlags(maxDepth, ss);
4015 }
4016
4017 /**
4018 * Configure the dual-pivot control flags. This packs the maximum recursion depth and
4019 * sort select size into a single integer.
4020 *
4021 * @param maxDepth Maximum recursion depth.
4022 * @param ss Sort select size.
4023 * @return the flags
4024 */
4025 static int dualPivotFlags(int maxDepth, int ss) {
4026 // The flags are packed using the upper bits to count back from -1 in
4027 // step sizes. The lower bits pack the sort select size.
4028 int flags = Integer.MIN_VALUE - maxDepth * RECURSION_INCREMENT;
4029 flags &= ~SORTSELECT_MASK;
4030 return flags | ss;
4031 }
4032
4033 /**
4034 * Compute the maximum recursion depth for dual pivot recursion.
4035 * This is an approximation to {@code 2 * log3 (x)}.
4036 *
4037 * <p>The result is between {@code 2*floor(log3(x))} and {@code 2*ceil(log3(x))}.
4038 * The result is correctly rounded when {@code x +/- 1} is a power of 3.
4039 *
4040 * @param x Value.
4041 * @return maximum recursion depth
4042 */
4043 static int dualPivotMaxDepth(int x) {
4044 // log3(2) ~ 1.5849625
4045 // log3(x) ~ log2(x) * 0.630929753... ~ log2(x) * 323 / 512 (0.630859375)
4046 // Use (floor(log2(x))+1) * 323 / 256
4047 return ((32 - Integer.numberOfLeadingZeros(x)) * 323) >>> 8;
4048 }
4049
4050 /**
4051 * Configure the sort select size for dual pivot partitioning.
4052 *
4053 * @param k1 First key of interest.
4054 * @param kn Last key of interest.
4055 * @return the sort select size.
4056 */
4057 private static int dualPivotSortSelectSize(int k1, int kn) {
4058 // Configure the sort select size based on the index density
4059 // l---k1---k---k-----k--k------kn----r
4060 //
4061 // For a full sort the dual-pivot quicksort can switch to insertion sort
4062 // when the length is small. The optimum value is dependent on the
4063 // hardware and the insertion sort implementation. Benchmarks show that
4064 // insertion sort can be used at length 80-120.
4065 //
4066 // During selection the SORTSELECT_SIZE specifies the distance from the edge
4067 // to use sort select. When keys are not dense there may be a small length
4068 // that is ignored by sort select due to the presence of another key.
4069 // Diagram of k-l = SORTSELECT_SIZE and r-k < SORTSELECT_SIZE where a second
4070 // key b is blocking the use of sort select. The key b is closest it can be to the right
4071 // key to enable blocking; it could be further away (up to k = left).
4072 //
4073 // |--SORTSELECT_SIZE--|
4074 // |--SORTSELECT_SIZE--|
4075 // l--b----------------k--r
4076 // l----b--------------k----r
4077 // l------b------------k------r
4078 // l--------b----------k--------r
4079 // l----------b--------k----------r
4080 // l------------b------k------------r
4081 // l--------------b----k--------------r
4082 // l----------------b--k----------------r
4083 // l------------------bk------------------r
4084 // |--SORTSELECT_SIZE--|
4085 //
4086 // For all these cases the partitioning method would have to run. Assuming ideal
4087 // dual-pivot partitioning into thirds, and that the left key is randomly positioned
4088 // in [left, k) it is more likely that after partitioning 2 partitions will have to
4089 // be processed rather than 1 partition. In this case the options are:
4090 // - split the range using partitioning; sort select next iteration
4091 // - use sort select with a edge distance above the optimum length for single k selection
4092 //
4093 // Contrast with a longer length:
4094 // |--SORTSELECT_SIZE--|
4095 // l-------------------k-----k-------k-------------------r
4096 // |--SORTSELECT_SIZE--|
4097 // Here partitioning has to run and 1, 2, or 3 partitions processed. But all k can
4098 // be found with a sort. In this case sort select could be used with a much higher
4099 // length (e.g. 80 - 120).
4100 //
4101 // When keys are extremely sparse (never within SORTSELECT_SIZE) then no switch
4102 // to sort select based on length is *required*. It may still be beneficial to avoid
4103 // partitioning if the length is very small due to the overhead of partitioning.
4104 //
4105 // Benchmarking with different lengths for a switch to sort select show inconsistent
4106 // behaviour across platforms due to the variable speed of insertion sort at longer
4107 // lengths. Attempts to transition the length based on various ramps schemes can
4108 // be incorrect and result is a slowdown rather than speed-up (if the correct threshold
4109 // is not chosen).
4110 //
4111 // Here we use a much simpler scheme based on these observations:
4112 // - If the average separation is very high then no length will collect extra indices
4113 // from a sort select over the current trigger of using the distance from the end. But
4114 // using a length dependence will not effect the work done by sort select as it only
4115 // performs the minimum sorting required.
4116 // - If the average separation is within the SORTSELECT_SIZE then a round of
4117 // partitioning will create multiple regions that all require a sort selection.
4118 // Thus a partitioning round can be avoided if the length is small.
4119 // - If the keys are at the end with nothing in between then partitioning will be able
4120 // to split them but a sort will have to sort the entire range:
4121 // lk-------------------------------kr
4122 // After partitioning starts the chance of keys being at the ends is low as keys
4123 // should be random within the divided range.
4124 // - Extremely high density keys is rare. It is only expected to saturate the range
4125 // with short lengths, e.g. 100 quantiles for length 1000 = separation 10 (high density)
4126 // but for length 10000 = separation 100 (low density).
4127 // - The density of (non-uniform) keys is hard to predict without complex analysis.
4128 //
4129 // Benchmarking using random keys at various density show no performance loss from
4130 // using a fixed size for the length dependence of sort select, if the size is small.
4131 // A large length can impact performance with low density keys, and on machines
4132 // where insertion sort is slower. Extreme performance gains occur when the average
4133 // separation of random keys is below 8-16, or of uniform keys around 32, by using a
4134 // sort at lengths up to 90. But this threshold shows performance loss greater than
4135 // the gains with separation of 64-128 on random keys, and on machines with slow
4136 // insertion sort. The transition to using an insertion sort of a longer length
4137 // is difficult to predict for all situations.
4138
4139 // Let partitioning run if the initial length is small.
4140 // Use kn - k1 as a proxy for the length. If length is actually very large then
4141 // the final selection is insignificant. This avoids slowdown for small lengths
4142 // where the keys may only be at the ends. Note ideal dual-pivot partitioning
4143 // creates thirds so 1 iteration on SORTSELECT_SIZE * 3 should create
4144 // SORTSELECT_SIZE partitions.
4145 if (kn - k1 < DP_SORTSELECT_SIZE * 3) {
4146 return 0;
4147 }
4148 // Here partitioning will run at least once.
4149 // Stable performance across platforms using a modest length dependence.
4150 return DP_SORTSELECT_SIZE * 2;
4151 }
4152 }