Naver D2 Timsort 분석기

Timsort

평소에 자주 즐겨보는 테크블로그 중 재미있는 글이 있어 분석하고 공부한 내용을 공유하려고 합니다
부족한 부분에 대한 지적은 감사히 받겠습니다!


About Timsort

팀소트란 Tim Peters가 고안한 정렬 알고리즘으로 Insertion sort와 Merge sort를 결합한 정렬입니다
기존의 Insertion Sort의 장점을 가져오고 Merge Sort를 최적화 하여 시간 복잡도와 공간 복잡도를 줄였습니다
팀소트의 시간 복잡도와 공간 복잡도는 다음과 같습니다

복잡도 Big O Notation
최선 시간 복잡도 O(n)
평균 시간 복잡도 O(n logn)
최악 시간 복잡도 O(n logn)
공간 복잡도 O(n/2)



Insertion Sort + Merge Sort == Time Sort ?

시간 복잡도가 O(n^2)인 삽입 정렬을 왜 합병 정렬과 결합하였을까
삽입 정렬의 시간 복잡도는 정확히 말하면 C * ( N ^ 2 ) + a입니다
비교적 무시하는 값인 a를 제외하고 C값에 영향을 미치는 요소가 Cache Locality입니다
삽입정렬의 경우는 캐시 지역성을 잘 반영한 정렬이기 때문에 적은 엔티티 요소에 있어서 시간 복잡도가 C * N * logn + a의 합병 정렬보다 빠르다고 합니다

즉 삽입 정렬의 C값을 Ci 라 하고 합병 정렬의 C값을 Cm이라 하였을 때 Ci * ( N ^ 2 ) + a < Cm * N * logn + a이 성립하게 됩니다
팀 소트는 적은 엔티티에 대해 삽입 정렬을 적용해 정렬을 수행합니다
이 때 삽입 정렬에 사용되는 엔티티 수가 2 ^ x개 라고 할 때 팀소트의 수행 시간은 Ct * n * (logn - x) + a이 됩니다

팀 소트는 2 ^ x의 값을 2 ^ 5 ~ 2 ^ 6으로 정의합니다
32 ~ 64개의 엔티티 사이에선 합병 정렬보다 삽입 정렬이 더 우수하다고 해석할 수 있습니다
그렇기 때문에 두 소팅 알고리즘에 대해 비교를 수행했습니다
두 소팅 알고리즘을 비교하기 위해 이전에 만들어둔 소팅 알고리즘을 활용했고 순수 정렬 시간만을 최대한 측정하기 위해 inline 함수와 함수 호출간 파라미터 바인딩에 참조자(&)를 사용했습니다
두 소팅 수행 시간 비교에 대한 코드는 다음과 같습니다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <iostream>
#include <vector>
#include <algorithm>
#include <ctime>
#include <cstdlib>
#include <random>
#include <numeric>

inline void Merge(std::vector<int> &mergeSortVec, int left, int mid, int right){
std::vector<int> sorted(right-left+1);
int L = left, R = mid + 1, Index = 0;
while(L<=mid && R<=right){
if(mergeSortVec[L] < mergeSortVec[R])
sorted[Index++] = mergeSortVec[L++];
else
sorted[Index++] = mergeSortVec[R++];
}
if(L<=mid){
for(;L<=mid;){
sorted[Index++] = mergeSortVec[L++];
}
}
else{
for(;R<=right;){
sorted[Index++] = mergeSortVec[R++];
}
}
for(Index = left; Index <= right; Index++)
mergeSortVec[Index] = sorted[Index - left];
}

void MergeSort(std::vector<int> &mergeSortVec, int left = 0, int right = 32){
if(left < right){
int mid = (left + right) >> 1;
MergeSort(mergeSortVec,left,mid);
MergeSort(mergeSortVec,mid+1,right);
Merge(mergeSortVec, left, mid, right);
}
}
void InsertionSort(std::vector<int> &insertionSortvec){
int MAXSIZE = insertionSortvec.size();
for(int start = 1; start < MAXSIZE; ++start){
for(int compare = start-1; compare >= 0 && insertionSortvec[compare] > insertionSortvec[compare+1]; --compare){
insertionSortvec[compare] ^= insertionSortvec[compare+1] ^= insertionSortvec[compare] ^= insertionSortvec[compare+1];
}
}
}

int main() {
std::srand ( unsigned ( std::time(0) ) );
std::vector<int> vec(32);
iota(begin(vec),end(vec),1);
std::shuffle(std::begin(vec), std::end(vec), std::default_random_engine {});

std::vector<int> mergeVec = vec;
std::vector<int> insertionVec = vec;

std::chrono::steady_clock::time_point merge_begin = std::chrono::steady_clock::now();
MergeSort(mergeVec);
std::chrono::steady_clock::time_point merge_end = std::chrono::steady_clock::now();

std::chrono::steady_clock::time_point insertion_begin = std::chrono::steady_clock::now();
InsertionSort(insertionVec);
std::chrono::steady_clock::time_point insertion_end = std::chrono::steady_clock::now();

std::cout << "Time difference merge sort= " << std::chrono::duration_cast<std::chrono::microseconds>(merge_end - merge_begin).count() << "[µs]" << std::endl;
std::cout << "Time difference merge sort = " << std::chrono::duration_cast<std::chrono::nanoseconds> (merge_end - merge_begin).count() << "[ns]" << std::endl;

std::cout << "Time difference insertion sort = " << std::chrono::duration_cast<std::chrono::microseconds>(insertion_end - insertion_begin).count() << "[µs]" << std::endl;
std::cout << "Time difference insertion sort = " << std::chrono::duration_cast<std::chrono::nanoseconds> (insertion_end - insertion_begin).count() << "[ns]" << std::endl;
}

또한 두 소팅 알고리즘의 수행 시간 결과는 다음과 같습니다

1
2
3
4
Time difference merge sort = 21[µs]
Time difference merge sort = 21601[ns]
Time difference insertion sort = 4[µs]
Time difference insertion sort = 4951[ns]

엔티티의 수가 1000개 일때의 두 소팅 알고리즘의 수행 시간 결과는 다음과 같습니다

1
2
3
4
Time difference merge sort = 696[µs]
Time difference merge sort = 696149[ns]
Time difference insertion sort = 3414[µs]
Time difference insertion sort = 3414441[ns]

실험을 해 본 결과 실제로 적은 엔티티의 데이터에 대해서 삽입 정렬이 합병 정렬보다 빠르게 정렬을 수행함을 확인할 수 있었습니다


TimSort 최적화

Run

팀소트의 a값을 유지하면서 x값을 늘려 전체 수행시간을 줄이는 방법을 통해 팀소트는 최적화를 진행합니다
이 때 x값을 늘리는 방법이 삽입 정렬을 통해 정렬 수행시간의 이익을 많이 볼 수 있는 부분을 최대한 가져가는 방법을 취합니다
삽입 정렬을 통해 만드는 단편적으로 정렬된 배열을 Run이라 표현하며 이 Run의 min값은 32 ~ 64로 정의합니다
이 Run을 생성하는 방법에 있어서도 오름차순과 내림차순중 최적의 방법을 선택하여 Run을 생성하게 됩니다

Binary Insrtion Sort

팀소트에서 사용되는 삽입 정렬은 일반 삽입정렬이 아닌 이분 탐색을 활용한 Binary Insertion Sort를 진행합니다
삽입 하는 위치를 찾는데 있어서 일반 삽입정렬보다 캐시 지역성은 떨어지지만 하나의 원소를 삽입하기 위해 O(n)의 비교 대신 O(logn)의 비교를 수행해 시간을 절약합니다
또한 삽입될 인덱스를 찾은 후엔 시프트 연산을 통해 정렬된 상태를 만들기 때문에 더욱 효율적입니다

Merge

삽입 정렬로 만들어지는 Run들을 어떻게 하면 효율적으로 병합을 수행할 수 있는가에 대한 이야기 입니다
병합에 있어 적은 메모리들을 사용해서 병합을 진행하기 위해 C > A + B의 조건과 B > A을 만족하는 스택을 사용하게 됩니다
해당 조건을 만족하는 스택을 사용하게 될 경우 스택에 있는 run 수를 작게 유지할 수 있고 비슷한 크기의 인접해있는 run끼리의 병합을 진행할 수 있게 됩니다

Run Merge

두 run의 병합에 있어서도 최대한 적은 메모리를 활용하기 위해 최적화를 적용합니다
Run1 : 1 2 3 5 7 9
Run2 : 4 6 8 10 11 12
On memory : 1 2 3 5 7 9 4 6 8 10 11 12
두 개의 Run이 존재 할 때 정렬을 수행해야할 지점을 찾기 위해 Run Merge를 수행합니다
Run1에서 반드시 정렬을 수행해야할 지점은 Run2[0]의 값보다 큰 Run1[3]이 해당됩니다
Run2에서 반드시 정렬을 수행해야할 지점은 Run1[5]의 값보다 작은 Run2[2]가 해당됩니다
이렇게 정렬을 수행해야할 지점을 찾아 필요없는 부분을 제외해가며 최적화된 정렬을 수행합니다
또한 병합을 위해 메모리 복사를 수행해야 하는데 이 두 정렬해야할 부분의 Run중 적은 메모리 공간을 차지하고 있는 곳을 선택해 복사를 진행한 후 정렬을 수행하기 때문에 공간복잡도가 O(n/2)가 나오게 됩니다

Galloping

또한 병합을 수행할 인덱스를 찾기 위해 Galloping이라는 방법을 사용하게 됩니다
하나씩 인덱스를 증가시켜 병합을 수행할 적절한 위치를 찾는 것 보다 합병정렬을 수행하는 시점에서 이미 엔티티들은 어느정도 정렬이 진행된 상태라는 점을 고려하여 인덱스를 찾게 됩니다
인덱스를 찾기 위해 하나씩 비교하는 One pair at a time mode를 3번 수행하다 인덱스를 찾지 못하면 1 2 4 8 … 과 같이 2 ^ k씩 점프해가며 삽입될 인덱스를 탐색합니다


구현

Naver D2에 나와있는 포스팅을 토대로 구현한 Timsort는 다음과 같습니다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#include <iostream>
#include <vector>
#include <algorithm>
#include <ctime>
#include <cstdlib>
#include <random>
#include <numeric>

using namespace std;

enum GallopingMode{
Galloping_Mode_Front_Finder = 1,
Galloping_Mode_Back_Finder = -1
};

enum SortingFlag{
INCREMENT = 1,
DECREMENT = 0
};

struct RunStack {
int startPos;
int endPos;
};

//바이너리 삽입 정렬
void binaryInsertionSort(vector<int> &vec, int left, int right, int target, SortingFlag sortingFlag) {
int l = left, r = right, m;
while (l < r) {
m = (l + r) / 2;
if (sortingFlag) { vec[m] <= target ? (l = m + 1) : (r = m); }
else { vec[m] > target ? (l = m + 1) : (r = m); }
}
if (sortingFlag && vec[l] < target) ++l;
if (!sortingFlag && vec[l] >= target) ++l;
int temp = vec[right + 1], changePos = right + 1;
while (l < changePos) {
vec[changePos] = vec[changePos - 1];
--changePos;
}
vec[l] = temp;
}

//RunStack 비교
bool CompareRunStack(RunStack PreBeforeLast, RunStack BeforeLast, RunStack Last) {
return PreBeforeLast.endPos - PreBeforeLast.startPos > Last.endPos - BeforeLast.startPos;
}

//앞에서부터 삽입하는 Run Merge 구현
void MergeFront(vector<int> &vec, RunStack A, RunStack B) {
if (A.startPos == B.startPos) return;
vector<int> temp;
temp.assign(vec.begin() + A.startPos, vec.begin() + A.endPos + 1);
int tempPos = 0, backPos = B.startPos;
int PushPos = A.startPos;
while (tempPos < temp.size() && backPos <= B.endPos) {
if (temp[tempPos] <= vec[backPos]) {
vec[PushPos] = temp[tempPos];
++tempPos;
} else {
vec[PushPos] = vec[backPos];
++backPos;
}
++PushPos;
}
while (tempPos < temp.size()) {
vec[PushPos] = temp[tempPos];
++PushPos, ++tempPos;
}
}

//뒤에서부터 삽입하는 Run Merge 구현
void MergeBack(vector<int> &vec, RunStack A, RunStack B) {
vector<int> temp;
temp.assign(vec.begin() + B.startPos, vec.begin() + B.endPos + 1);
int tempPos = temp.size() - 1, frontPos = A.endPos;
int PushPos = B.endPos;
while (tempPos >= 0 && frontPos >= A.startPos) {
if (temp[tempPos] > vec[frontPos]) {
vec[PushPos] = temp[tempPos];
--tempPos;
} else {
vec[PushPos] = vec[frontPos];
--frontPos;
}
--PushPos;
}
while (tempPos >= 0) {
vec[PushPos] = temp[tempPos];
--tempPos, --PushPos;
}
}

//Galloping 구현
int Galloping(int mode, int times) {
return times>=3 ? mode*(1<<(times-2)) : mode;
}

int getPosUseGalloping(vector<int> &vec, int firstPos, int secondPos, int sizePos, GallopingMode mode){
int index = 0, times = 0, jump = 0;
while (mode * vec[firstPos + index] < mode * vec[secondPos] && jump != mode){
jump = 0; times = 0;
while(mode * vec[firstPos + index + jump] < mode * vec[secondPos] && mode * (firstPos + index + jump) <= (mode * sizePos)) {
index += jump;
jump = Galloping(mode, times++);
}
}
return index;
}

//Merge 구현
RunStack Merge(vector<int> &vec, RunStack A, RunStack B) {
RunStack ret = {.startPos = A.startPos, .endPos = B.endPos};
A.startPos += getPosUseGalloping(vec, A.startPos, B.startPos, A.endPos, Galloping_Mode_Front_Finder);
B.endPos += getPosUseGalloping(vec, B.endPos, A.endPos, B.startPos, Galloping_Mode_Back_Finder);
if (B.endPos - B.startPos < A.endPos - A.startPos) MergeFront(vec, A, B);
else MergeBack(vec, A, B);
return ret;
}

//RunStack의 Merge 구현
void mergeRunStacks(vector<int> &vec, vector<RunStack> &runVec, int &runVecSize) {
while (runVecSize >= 3 &&
!CompareRunStack(runVec[runVecSize - 3], runVec[runVecSize - 2], runVec[runVecSize - 1])) {
int choose = runVec[runVecSize - 1].endPos - runVec[runVecSize - 1].startPos <=
runVec[runVecSize - 3].endPos - runVec[runVecSize - 3].startPos ?
runVecSize - 3 : runVecSize - 1;
runVec[runVecSize - 2] = Merge(vec, runVec[min(runVecSize - 2, choose)],
runVec[max(runVecSize - 2, choose)]);
runVec.erase(runVec.begin() + choose);
--runVecSize;
}
}

void TimSort(vector<int> &vec, int cacheSize = 5) {
vector<RunStack> runStacks;
int Pos = 0, SPos = 0;
bool runFlag = true;
const int runSize = 1 << cacheSize;
while (runFlag) {
SortingFlag sortingFlag = vec[SPos] <= vec[SPos + 1] ? INCREMENT : DECREMENT;
++Pos;
//RunStack을 생성한다
//default는 최소 2^5의 블록사이즈
while (Pos + 1 < runSize + SPos && Pos < vec.size()) {
if (!((sortingFlag && vec[Pos] <= vec[Pos + 1]) || (!sortingFlag && vec[Pos] > vec[Pos + 1])))
binaryInsertionSort(vec, SPos, Pos, vec[Pos + 1], sortingFlag);
++Pos;
}
//default를 넘는 데이터들에 있어서는 정렬하지 않아도 되는 데이터만 RunStack에 포함시킨다
while (Pos < vec.size() &&
!((sortingFlag && vec[Pos - 1] > vec[Pos]) || (!sortingFlag && vec[Pos - 1] <= vec[Pos])))
++Pos;
--Pos;
if (Pos + 1 >= vec.size()) runFlag = false, Pos = vec.size() - 1;
if (!sortingFlag) reverse(vec.begin() + SPos, vec.begin() + Pos + 1);
runStacks.push_back({SPos, Pos});
++Pos;
SPos = Pos;
if (runStacks.size() < 3) continue;
int RunvecSize = runStacks.size();
mergeRunStacks(vec, runStacks, RunvecSize);
}

//완성된 RunStack에 대해 머지를 수행한다
while (runStacks.size() >= 3) {
int RunvecSize = runStacks.size();
runStacks[RunvecSize - 2] = Merge(vec, runStacks[RunvecSize - 2], runStacks[RunvecSize - 1]);
runStacks.pop_back();
mergeRunStacks(vec, runStacks, RunvecSize);
}
if(runStacks.size() != 1) Merge(vec, runStacks[0], runStacks[1]);
}

int main() {
std::srand(unsigned(std::time(0)));
vector<int> vec(1000);
iota(begin(vec), end(vec), 1);
std::shuffle(std::begin(vec), std::end(vec), default_random_engine{});
cout << "Before TimSorting..." << endl;
for (auto it : vec)
cout << it << " ";
cout << endl;

TimSort(vec);

cout << "After TimSorting..." << endl;
for (auto it : vec)
cout << it << " ";
return 0;
}
1
2
3
Time difference tim sort = 488[µs]
Time difference tim sort = 488106[ns]

이미 정렬된 엔티티들에 대한 팀소트 수행 결과

1
2
3
Time difference = 15[µs]
Time difference = 15520[ns]

같이 보기

참고

https://d2.naver.com/helloworld/0315536

Author: Song Hayoung
Link: https://songhayoung.github.io/2020/06/28/Algorithm/Tim_Sort/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.