哈希表与散列技术?散列技术是在记录的存储位置和它的关键字之间建立一个确定的对应关系f,使得每个关键字key对应一个存储位置f(key),在查找时,根据这个确定的对应关系找到给定key的映射f(key),如果待查找集合中存在这个记录,则必定在f(key)的位置上,简单来讲:元素的值通过一个映射关系找到元素在表中存储的位置。我们把这种对应关系f称为散列函数,又称为哈希函数。采用散列技术将记录存储在一块连续的存储空间中,这块连续的存储空间称为散列表或者哈希表。
优势:适用于快速的查找,时间复杂度O(1)。缺点:占用内存空间比较大。哈希表的空间效率还是不够高。如果用哈希表存储一亿个垃圾邮件地址,每个email地址对应 8bytes, 而哈希表的存储效率一般只有50%,因此一个email地址需要占用16bytes. 因此一亿个email地址占用1.6GB,如果存储几十亿个email address则需要上百GB的内存。除非是超级计算机,一般的服务器是无法存储的。是典型的空间换时间。
对于散列函数设计特点:计算简单(复杂会降低查找的时间),散列地址分布均匀(减少哈希冲突)散列结构越离散越好。例如:直接定址法,直接用(或者做个简单的线性变换后)作为数组的下标,数据直接存在对应下标的位置,存取的时候一步算出位置就能拿到,不需要额外挨个查找(适合关键字范围小、分布集中连续,有时会导致内存浪费)。数字分析法(身份证后四位不同概率高):当你要存储的关键字都是很长的数字,能明显看出这些数字里:有些位置的数字重复率高、大家都差不多,另一些位置的数字随机性强、每个人差别很大;那就直接把(变化大的那几位)抽出来,直接当这个关键字在哈希表里的存储地址,这样就能大幅减少不同关键字撞到同一个地址的概率(也就是哈希冲突)。平方取中法,取关键字key的平方后的中间部分数字作为散列地址。折叠法&随机数法
除留余数法,不仅可以对关键字直接取模,也可以折叠、平方取中法等运算之后取模。p的取值很重要,一般取素数,否则冲突的概率会比较大。md5、sha加密hash算法等
散列冲突处理:线性探测(开放定址法),当哈希函数计算出的地址已被占用时,会顺序向后找最近空闲单元存储。二次探测:二次探测是对线性探测法的改进,在散列冲突位置的前后都可以查找空位置。链地址法:用链表存储组织产生哈希冲突的key。将具有相同哈希地址的元素组织成一个链表(或其他线性结构),哈希表的每个槽位存储该链表的头指针。
哈希表删除操作:比如删除18,哈希函数为除留余数法,得到18的余数为4,在访问第4号位元素,如果不为18,那么发生哈希冲突,用线性探测法解决哈希冲突,顺序查找下一个空闲位置存储数据。最终找到18在删除。取模操作为O(1),但产生了哈希冲突,慢慢变成O(n)。哈希表的时间复杂度趋近于O(1),我们只能减少哈希冲突的概率,当哈希表放的元素越来越多的时候,发生冲突的概率增加。所以我们要对哈希函数进行优化,对于除留余数法,让哈希表(桶)的长度取素数。让哈希函数从原始数据映射到哈希表存储位置的时候,能够减少哈希冲突的概率。引入哈希表的装载因子(已占用的桶的个数/桶的总个数)>0.75,哈希表就需要扩容了,相当于数组的扩容(线性探测哈希表),原来哈希表中的元素,需要在新的哈希表中重新哈希O(n),以及线性探测哈希表均摊时间复杂度为O(1)。
线性探测哈希表实现:增加:通过哈希函数计算数据存放位置,该位置空闲直接存储元素完成。该位置被占用,从当前位置向后找空闲位置,存放该元素。查询:通过哈希函数计算数据存放的位置,从该位置取值,如果该值等于要查询元素值找到了,如果不等于说明之前往这个位置放元素时,发生哈希冲突了,继续遍历往后找该元素,如果遇到这个位置一直是空的,没放过元素,不需要继续往后搜索了。如果这个位置是空,是因为以前放过元素,后来被删除了,需要继续往后搜索。所以用一个Node结构体表示元素的内容,元素值加状态。删除:通过哈希函数计算数据存放的位置,从该位置取值,判断状态是否为正在使用,该值等于要删除的值,直接修改当前位置的状态就可以,如果该值不等于发生哈希冲突,从当前位置向后遍历找到该元素,如果遇到从未使用过的状态结束。
#include <iostream> using namespace std; //桶的状态 enum State { STATE_UNUSE, //从未使用的桶 STATE_USING, //正在使用的桶 STATE_DEL, //元素被删除的桶 }; //桶的类型 struct Bucket { Bucket(int key = 0, State state = STATE_UNUSE) //指针指向堆上动态开辟内存 初始化 : key_(key) , state_(state) {} int key_; //存储的数据 State state_;// 桶的当前状态 }; // 线性探测哈希表类型 class HashTable { public: HashTable(int size = primes_[0], double loadFactor = 0.75)//primes_[0] 3 : useBucketNum_(0) , loadFactor_(loadFactor) , primeIdx_(0) { // 把用户传入的size调整到最近的比较大的素数上 if (size != primes_[0]) { for ( ; primeIdx_ < PRIME_SIZE; primeIdx_++) { if (primes_[primeIdx_] > size) break; } //用户传入的size值过大,已经超过最后一个素数,调整到最后一个素数 if (primeIdx_ == PRIME_SIZE) { primeIdx_--; } } tableSize_ = primes_[primeIdx_]; table_ = new Bucket[tableSize_]; } ~HashTable() { delete[]table_; table_ = nullptr; } public: //插入元素 bool insert(int key) { // 考虑扩容 double factor = useBucketNum_*1.0 / tableSize_; cout << "factor:" << factor << endl; if (factor > loadFactor_) { //哈希表扩容 expand(); } int idx = key % tableSize_; int i = idx; do { if (table_[i].state_ != STATE_USING) { table_[i].state_ = STATE_USING; table_[i].key_ = key; useBucketNum_++; return true; //O(1) } i = (i + 1) % tableSize_; } while (i != idx); // O(n) return false; } //删除元素 bool erase(int key) { int idx = key % tableSize_; int i = idx; do { if (table_[i].state_ == STATE_USING && table_[i].key_ == key) { table_[i].state_ = STATE_DEL; useBucketNum_--; } i = (i + 1) % tableSize_; } while(table_[i].state_ != STATE_UNUSE && i != idx); return true; } //查询 count(key) bool find(int key) { int idx = key % tableSize_; int i = idx; do { if (table_[i].state_ == STATE_USING && table_[i].key_ == key) { return true; } i = (i + 1) % tableSize_; } while(table_[i].state_ != STATE_UNUSE && i != idx); return false; } private: //扩容操作 void expand() { ++primeIdx_; //如果指向末尾元素 if (primeIdx_ == PRIME_SIZE) { throw "HashTable is too large, can not expand antmore!"; } Bucket* newTable = new Bucket[primes_[primeIdx_]]; for (int i = 0; i < tableSize_; i++) { if (table_[i].state_ == STATE_USING) { //旧表有效的数据,重新哈希放到扩容后的新表 int idx = table_[i].key_ % primes_[primeIdx_]; int k = idx; do { if (newTable[k].state_ != STATE_USING) { newTable[k].state_ = STATE_USING; newTable[k].key_ = table_[i].key_; break; } k = (k + 1) % primes_[primeIdx_]; } while (k != idx); } } delete[]table_; table_ = newTable; tableSize_ = primes_[primeIdx_]; } private: Bucket* table_; //指向动态开辟的哈希表 int tableSize_; //哈希表当前的长度 int useBucketNum_;//已经使用的桶的个数 double loadFactor_;//哈希表的装载因子 static const int PRIME_SIZE = 10; //素数表大小 const静态成员可能在类内初始化(静态常量的整型类型) static int primes_[PRIME_SIZE]; //素数表 需要在类外单独初始化一下 int primeIdx_; //当前使用的素数下标 每个哈希表不一样 }; int HashTable ::primes_[PRIME_SIZE] = { 3, 7, 23, 47, 97, 251, 443, 911, 1471, 42773}; int main() { /* int data = 3; //素数表 for (int i = data; i < 10000; i++) { int j = 2; for ( ; j < i; j++) { if (i % j == 0) break; } if (j == i) { cout << i << " "; } }*/ HashTable htable; htable.insert(21); htable.insert(32); htable.insert(14); htable.insert(15); htable.insert(22); cout << htable.find(14) << endl; htable.erase(14); cout << htable.find(14) << endl; }线性探测哈希表的缺点:发生哈希冲突时,会逐渐趋于O(n)的时间复杂度,存储变慢了(不能优化)。多线程同时在一个数组中进行查询修改不是线程安全的,在多线程环境中,线性探测所用到的基于数组实现的哈希表,只能给全局的表用互斥锁来保证哈希表的原子操作,保证线程安全,映射后不在同一位置都不能并发执行。链式哈希表在多线程环境下,可采用分段的锁,既保证了线程安全,又有一定的并发量,提高了效率。解决哈希冲突的方式采用链地址法,把所有哈希索引相同的冲突元素,全部存储在同一个额外的动态结构(通常是链表或红黑树)中,哈希表主数组的每个位置(桶)不直接存元素,只存一个链表头指针:所有哈希到同一个桶的冲突元素,都会作为节点挂在对应桶的链表下(vector<list<int>> table)。哈希表因哈希冲突的存在,增删查无限趋近于O(1)。每个桶的链表比较长,链表搜索花费的时间就大,优化一:当链表长度大于某个值,把桶里面的这个链表转化成红黑树O(logn)。优化二:链式哈希表每个桶都可以创建自己的互斥锁,不同桶中的链表操作,可以并发执行起来。
#include <iostream> #include <vector> #include <list> #include <algorithm> //泛型算法 using namespace std; //链式哈希表 class HashTable { public: HashTable(int size = primes_[0], double loadFactor = 0.75) : useBucketNum_(0) , loadFactor_(loadFactor) , primeIdx_(0) { if (size != primes_[0]) { for (; primeIdx_ < PRIME_SIZE; primeIdx_++) { if (primes_[primeIdx_] >= size) break; } if (primeIdx_ == PRIME_SIZE) { primeIdx_--; } } table_.resize(primes_[primeIdx_]);//开辟空间创建元素 } public: //增加元素 不能重复插入key void insert(int key) { //判断扩容 double factor = useBucketNum_ * 1.0 / table_.size(); cout << "factor:" << factor << endl; if (factor > loadFactor_) { expand(); } int idx = key % table_.size(); //O(1) if (table_[idx].empty()) { useBucketNum_++; //只有第一次桶为空的时候,添加第一个元素时桶要加加 table_[idx].emplace_front(key); //链表元素的插入 }else{ //泛型搜索 通过迭代器 判断是否重复 //使用全局的:: find泛型算法,而不是调用自己的成员方法find auto it = ::find(table_[idx].begin(), table_[idx].end(), key); //去重O(n) 优化可转为红黑树 if (it == table_[idx].end()) { //等于链表的迭代器没找到了 // key不存在 table_[idx].emplace_front(key); } } } //删除元素 void erase(int key) { int idx = key % table_.size(); //O(1) //如果链表节点过长 如果散列结果比较集中 散列函数有问题 //如果散列结果比较离散 链表长度一般不会过长 因为有装载因子 auto it = ::find(table_[idx].begin(), table_[idx].end(), key); //O(n) if (it != table_[idx].end()) { //找着 table_[idx].erase(it); if (table_[idx].empty()) { //判断是否空桶 useBucketNum_--; } } } //搜索元素 bool find(int key) { int idx = key % table_.size(); //O(1) auto it = ::find(table_[idx].begin(), table_[idx].end(), key); //O(n) if (it != table_[idx].end()) { //找着 return true; } //return it != table_[idx].end(); return false; } private: //扩容函数 void expand() { if (primeIdx_ + 1 == PRIME_SIZE) { //已经到头了 throw"hashtable can not expand anymore!"; } primeIdx_++; useBucketNum_ = 0;//扩容后桶的个数会改变 vector<list<int>> oldTable; //容器元素太多swap的效率很低?? 交换了两个容器的成员变量 没有交换堆上的内存 table_.swap(oldTable);//调用容器的swap函数交换一下 table_.resize(primes_[primeIdx_]); //table为空重新resize 加加后的 for (auto list : oldTable) { for (auto key : list) { int idx = key % table_.size(); if (table_[idx].empty()) { useBucketNum_++; } table_[idx].emplace_front(key); } } } private: vector<list<int>> table_; //哈希表的数据结构 int useBucketNum_; //记录桶的个数 double loadFactor_; //记录哈希表装载因子 static const int PRIME_SIZE = 10; //素数表大小 const静态成员可能在类内初始化(静态常量的整型类型) static int primes_[PRIME_SIZE]; //素数表 需要在类外单独初始化一下 int primeIdx_; //当前使用的素数下标 每个哈希表不一样 }; int HashTable ::primes_[PRIME_SIZE] = { 3, 7, 23, 47, 97, 251, 443, 911, 1471, 42773}; int main() { HashTable htable; htable.insert(21); htable.insert(32); htable.insert(14); htable.insert(15); htable.insert(22); htable.insert(67); cout << htable.find(14) << endl; htable.erase(14); cout << htable.find(14) << endl; return 0; } //vector<int> nums = {1, 2, 3, 4, 5}; // 定义一个vector容器 // 1. 获取迭代器:begin()返回指向第一个元素的迭代器,end()返回指向末尾元素的下一个位置的迭代器(哨兵位置) //vector<int>::iterator it = nums.begin(); // 等价于 auto it = nums.begin();(C++11及以上) //vector<int>::iterator end = nums.end();哈希表解决大数据查重或者统计重复的次数。查询效率高但是占用内存空间较大。
#include <iostream> #include <vector> #include <unordered_set>//只存key #include <unordered_map>//存键值对 #include <stdlib.h> #include <time.h> #include <string> using namespace std; int main() { //模拟问题,vector中放原始的数据 vector<int> vec; srand(time(NULL)); for (int i = 0; i < 10000; i++) { vec.push_back(rand() % 10000); } //找第一个出现重复的数字;用一个set,只存数字,每遍历一个放到哈希表里面 //找所有重复出现的数字(去掉break即可) unordered_set<int> s1; for (auto key : vec) { auto it = s1.find(key); //O(1) 哈希表的查询 if (it == s1.end()) { //it已经到末尾了 没有重复 第一次出现 s1.insert(key); }else { cout << "key:" << key << endl; //第一个重复的数字 break; } } //统计重复数字出现的次数 数据加统计的结果 用键值对 unordered_map<int, int> m1; for (int key : vec) { //映射表里面用pair类型 键first 值用second存 打包键值对 auto it = m1.find(key); //O(1) if (it == m1.end()) { m1.emplace(key, 1); //O(1) }else { it->second += 1;//给他的值加个一 } } //m1[key]++; for (auto pair : m1) { //遍历键值对 if (pair.second > 1) { //等于1就是没有重复的数字 cout << "key:" << pair.first << "cnt:" << pair.second << endl; } } //一组数据有些数字是重复的,把重复数字过滤掉,每个数字只出现一次 unordered_multiset允许k重复 unordered_set<int> s1; for (auto key : vec) { s1.emplace(key); } } int main() { string src = "fhusufhjsjjfijioewfw"; //找出来第一个没有重复出现过的字符 unordered_map<int, int> m; for (char ch : src) { m[ch]++; } for (char ch : src) { if (m[ch] == 1) { cout << "第一个没有重复出现过的字符是:" << ch << endl; return 0; } } cout << "所有字符都有重复出现过" << endl; return 0; } //有两个文件分别是a和b,里面放了很多ip地址(url地址 email地址) 让你找出两个文件夹重复的ip 输出出来 //把a文件中所有的ip存放在一个哈希表中,然后依次遍历文件b里面的ip,每遍历一个ip,在哈希表中搜索一下,搜到的,既是两个文件重复的ip并输出 //用到了哈希表的查询O(1) //有两个文件分别是a和b,各自存放在约一亿条ip地址,每个ip地址是4个字节,限制内存使用100M,让找出来两个文件中重复的ip地址并输出 800M分治算法 //先把文件a和文件b分解成11个小文件,把a文件里的每一个ip散列的放在11个小文件当中,依次读取出来% //11得到桶的下标,写到不同的小文件当中,文件b也如此,然后对应的a1与b1查重,2与2查重即可(因为用的相同的哈希函数)。建一个哈希表,另一个文件依次读ip在哈希表中搜索即可。分文件的数量保持一致。位图法解决大数据查重,位图法就是用一个位(0或者1)来存储数据的状态,比较适合状态简单,数据量比较大,要求内存使用率低的问题场景。位图法解决问题,首先需要知道待处理数据中的最大值,然后按照size =(maxNumber / 32)+1的大小来开辟一个char类型的数组,当需要在位图中查找某个元素是否存在的时候,首先需要计算该数字对应的数组中的比特位,然后读取值,0表示不存在,1表示已存在。位图法有一个很大的缺点,就是数据没有多少,但是最大值却很大,比如有10个整数,最大值是10亿,那么就得按10亿这个数字计算开辟位图数组的大小,太浪费内存空间。数字是否出现过的状态,存储到一个位图数组当中。需要知道元素序列的最大值,根据最大值来定义bitmap位图数组,如果是char类型的bitmap,大小就是最大值/8在加1,根据/%两个操作,映射到元素对应的位,/%几号位元素的第几位,比如7 int index= 7/8=>0,int offset = 7%8=>7,零号位元素的第7位,在读取该位的值01出现过的次数。判断某个特定位置的比特位是否为1(按位与):bitmap[index] & (1<<offset),如何把这个位置置成1(按位或):bitmap[index] | (1<<offset)。
#include <iostream> #include <vector> #include <stdlib.h> #include <time.h> #include <memory>//智能指针 using namespace std; /* 有一亿个整数,最大值不超过一亿,问都有哪些元素重复了 谁是第一个重复的? 谁是第一个不重复的(1个位保存数据的状态,2个位保存数据的状态012) 内存限制100M 1亿 = 100M 100M * 4 * 2 = 800M 位图算法 int bitmap[100000000/32+1] 3.2M 缺陷; int 20亿 unsigned int 40亿 1 3 100000000 int bitmap[10000000/32+1] 30M 推荐的数据序列:数据的个数 >=和序列里面数字最大值相当 */ int main() { vector<int> vec{12, 78, 90, 123, 8, 9}; //定义位图数组 int max = vec[0];//找最大值 for (int i = 1; i < vec.size(); i++) { //O(n) if (vec[i] > max) max = vec[i]; } int* bitmap = new int[max / 32 + 1](); //变量动态定义到堆上 unique_ptr<int[]> ptr(bitmap); //内存自动释放 //找第一个重复出现的数字 for (auto key : vec) { int index = key / 32; int offset = key % 32; //取key对应的位的值 if (0 == (bitmap[index] & (1 << offset))) { //表示key没有出现过 bitmap[index] |= (1 << offset); }else { //cout << key << "是第一个重复出现的数字" << endl; //return 0; cout << key << "重复出现过" << endl; } } }搜索查找问题经常会考察到,除了考察二分查找,哈希表,还有BST,AVL,红黑树等等。在内存有所限制的情况下,快速判断一个元素是否在一个集合(容器)当中,还可以使用布隆过滤器。布隆过滤器到底是个什么东西呢?通俗来讲,在使用哈希表比较占内存的情况下,它是一种更高级的“位图法解决方案(省内存速度快)。Bloom Filter是通过一个位数组+k个哈希函数构成的。Bloom Filter的空间和时间利用率都很高,但是它有一定的错误率,虽然错误率很低,BloomFilter判断某个元素不在一个集合中,那该元素肯定不在集合里面;Bloom Filter判断某个元素在一个集合中,那该元素有可能在,有可能不在集合当中。Bloom Filter的查找错误率,当然和位数组的大小,以及哈希函数的个数有关系。Bloom Filter默认只支持add增加和query查询操作,不支持delete删除操作(因为存储的状态位有可能也是其它数据的状态位,删除后导致其它元素查找判断出错)。Bloom Filter增加元素的过程:把元素的值通过k个哈希函数进行计算,得到k个值,然后把k当作位数组的下标,在位数组中把相应k个值修改成1。Bloom Filter查询元素的过程:把元素的值通过k个哈希函数进行计算,得到k个值,然后把k当作位数组的下标,看看相应位数组下标标识的值是否全部是1,如果有一个为0,表示元素不存在(判断不存在绝对正确);如果都为1,表示元素存在(判断存在有错误率)。很显然,过小的布隆过滤器很快所有的bit位均为1,那么查询任何值都会返回“可能存在”,起不到过滤的目的。布隆过滤器的长度会直接影响误报率,布隆过滤器越长其误报率越小。另外,哈希函数的个数也需要权衡,个数越多则布隆过滤器bit位置为1的速度越快,且布隆过滤器的效率越低;但是如果太少的话,那误报率就会变高。场景一:提示过滤一些非法或者钓鱼网站。布隆过滤器把所有可能怀疑有问题的网站的URL(域名)添加到布隆过滤器当中。在用浏览器访问网址的时候,查找当前访问的网址URL是否在黑名单中,如果网址URL不存在,那肯定是白名单上的合法网站直接访问。如果存在(有误判率),会进行提示当前网站有风险,禁止访问。场景二:redis缓存中的应用(快速判断key存在与否),客户端请求输URL到IO模块网络层接收用户请求,做分发到不同工作线程(服务层)涉及数据的增删改查,在redis缓存层查key到底在不在,而且效率要求高,最好还省内存。所有的key通过redis提供的setBit(key)方法直接操作底层实现的布隆过滤器,把所有的key放进去在查询的时候可以根据getBit(Key)去判断,如果key不存在不用去redis里面去找了,直接去数据库里面查数据,在缓存redis,在给用户返回。如果key存在去redis中找key(有误判但概率低)。
#include <iostream> #include <vector> #include "stringhash.h" #include <string> using namespace std; //布隆过滤器实现 没有自己开辟过资源 不用析构函数 class BloomFilter { public: BloomFilter(int bitSize = 1471) //位的大小 : bitSize_(bitSize) { bitMap_.resize(bitSize_ / 32 + 1);//数组大小 } public: //添加元素 O(1) void setBit(const char* str) { //计算k组哈希函数的值 int idx1 = BKDRHash(str) % bitSize_; //字符串转整数 int idx2 = RSHash(str) % bitSize_; int idx3 = APHash(str) % bitSize_; //把相应的idx1 idx2 idx3 这几个位全部置一 int index = 0; int offset = 0; index = idx1 / 32; offset = idx1 % 32; bitMap_[index] |= (1 << offset); index = idx2 / 32; offset = idx2 % 32; bitMap_[index] |= (1 << offset); index = idx3 / 32; offset = idx3 % 32; bitMap_[index] |= (1 << offset); } //查询元素 O(1) bool getBit(const char* str) { //计算k组哈希函数的值 int idx1 = BKDRHash(str) % bitSize_; int idx2 = RSHash(str) % bitSize_; int idx3 = APHash(str) % bitSize_; int index = 0; int offset = 0; index = idx1 / 32; offset = idx1 % 32; if (0 == (bitMap_[index] & (1 << offset))) { return false; } index = idx2 / 32; offset = idx2 % 32; if (0 == (bitMap_[index] & (1 << offset))) { return false; } index = idx3 / 32; offset = idx3 % 32; if (0 == (bitMap_[index] & (1 << offset))) { return false; } return true; } private: int bitSize_; //位图的长度 vector<int> bitMap_; //位图数组 }; //URL黑名单 class BlackList { public: void add(string url) { //给黑名单添加非法的URL blockList_.setBit(url.c_str()); //给布隆过滤器添加 用于兼容需要C风格字符串的场景 } bool query(string url) { return blockList_.getBit(url.c_str()); } private: BloomFilter blockList_; }; int main() { BlackList list; list.add("http://www.baidu.com"); list.add("http://www.360buy.com"); list.add("http://www.tmall.com"); list.add("http://www.tencent.com"); string url = "http://www.alibaba.com"; cout << list.query(url) << endl; }求top k问题:例如求值最大的或最小的前k个。如果用排序算法那么时间复杂度O(n^2基础)O(nlogn高级),怎样可以在O(n)线性时间内找到topk的元素呢?利用大/小根堆,利用大根堆过滤前top k小的数据;小根堆过滤前top k大的数据。利于求序列值最小的三个元素,需要用一个大根堆堆顶元素的值是最大的。把大根堆堆顶的大值不断淘汰,放入小值。先把前三个元素构建成一个大根堆,再用第四个元素跟堆顶元素比较,如果大于(说明比大根堆里所有元素都大)不管比较下一个,如果小于堆顶直接出堆,调整把小值放入最后。只把原始序列遍历了一遍O(n)。大根堆留下的就是序列中值最小的三个元素。调整大根堆为常数级可去掉。求最大的前3个元素,利用小根堆,用k个元素构建小根堆,把堆顶小元素不断淘汰掉,把大元素放进来。
#include <iostream> #include <time.h> #include <vector> #include <stdlib.h> #include <queue> //队列及优先级队列(堆) #include <functional> #include <unordered_map>//哈希表 键值对 无序映射表 using namespace std; //查重和top k问题综合起来了 int main() { vector<int> vec; srand(time(NULL)); for (int i = 0; i < 10000; i++) { vec.push_back(rand() % 1000); } // 统计重复次数最小的前三个数字 引入哈希表 int k = 3; unordered_map<int, int> map; for (auto key : vec) { map[key]++; //直接统计 } //放入大根堆的时候,需要放key-value键值对 用value重复次数来排 最终打印出来是key数字本身 以pari存放 using Type = pair<int, int>; using Comp = function<bool(Type&, Type&)>; priority_queue<Type, vector<Type>, Comp> maxheap([](Type& a, Type& b)->bool {return a.second < b.second;}); //把map的前三个对象添加到大根堆来 auto it = map.begin(); for (int i = 0; i < k; i++, ++it) { maxheap.push(*it); } //把堆顶大的淘汰出去 for (; it != map.end(); ++it) { if (maxheap.top().second > it->second) { maxheap.pop(); maxheap.push(*it); } } while (!maxheap.empty()) { cout << "key:" << maxheap.top().first << " cnt:" << maxheap.top().second << endl; maxheap.pop(); } // 统计重复次数最大的前3个数字 int k = 3; unordered_map<int, int> map; for (auto key : vec) { map[key]++; } // 放入大根堆的时候,需要放key-value键值对 using Type = pair<int, int>; using Comp = function<bool(Type&, Type&)>; priority_queue<Type, vector<Type>, Comp> minheap( [](Type& a, Type& b)->bool { return a.second > b.second; }); auto it = map.begin(); for (int i = 0; i < k; i++, ++it) { minheap.push(*it); } for (; it != map.end(); ++it) { if (minheap.top().second < it->second) { minheap.pop(); minheap.push(*it); } } while (!minheap.empty()) { cout << "key:" << minheap.top().first << " cnt:" << minheap.top().second << endl; minheap.pop(); } } /* int main() { vector<int> vec; srand(time(NULL)); for (int i = 0; i < 1000; i++) { vec.push_back(rand() % 10000); } //求vec中值最小的前5个元素 priority_queue<int> maxheap;//默认大根堆 int k = 5; //由前k个元素构建一个大根堆 for (int i = 0; i < 5; i++) { maxheap.push(vec[i]); } //遍历剩余的元素直到最后 for (int i = 5; i < vec.size(); i++) { if (maxheap.top() > vec[i]) { maxheap.pop(); maxheap.push(vec[i]); } } //输出结果 while (!maxheap.empty()) { cout << maxheap.top() << " "; maxheap.pop(); } //for (auto key : vec) { // cout << key << " "; //} cout << endl;--------------- //求vec中值最大的前五个元素 定制一下比较方式 priority_queue<int, vector<int>, greater<int>> minheap;//传一个函数对象 int k = 5; //由前k个元素构建一个小根堆 for (int i = 0; i < 5; i++) { minheap.push(vec[i]); } //遍历剩余的元素直到最后 for (int i = 5; i < vec.size(); i++) { if (minheap.top() < vec[i]) { minheap.pop(); minheap.push(vec[i]); } } //输出结果 while (!minheap.empty()) { cout << minheap.top() << " "; minheap.pop(); } cout << endl; }*/利用快排分割函数每次返回的基准数的位置,找出前top k大的或者前top k小的数据。定义两个下标最左边为i最右边为j,选最左边的为基准数,从右往左找小于基准数的,找到放在基准树的位置上,i++在从左往右找大于基准树,找到以后放到j的位置,j--再找第一个小于基准树的放到i。直到ij到同一个位置,把基准树放到ij相等的位置即可,基准树左边都比基准树小右边都比他大。平均时间复杂度:如果一直去分成为一个等比数列求和平均下来就是O(n),最差是序列已经有序了O(n^2)每次分只少了一个元素,基准树选取采用三数取中。
#include <iostream> using namespace std; //快排分割函数 int Partation(int arr[], int begin, int end) { int val = arr[begin]; int i = begin; int j = end; while (i < j) { while(i < j && arr[j] < val) j--; if (i < j) { arr[i] = arr[j]; i++; } while (i < j && arr[i] > val) i++; if (i < j) { arr[j] = arr[i]; j--; } } arr[i] = val; return i; } //求top的函数 void SelectTopK(int arr[], int begin, int end, int k) { int pos = Partation(arr, begin, end); if (pos == k - 1) { return; }else if (pos > k -1) { SelectTopK(arr, begin, pos - 1, k);//左边继续进行快排分割 }else { //落在了右边 SelectTopK(arr, pos + 1, end, k); } } int main() { int arr[] = {64, 45, 52, 80, 66, 68, 0, 2, 18, 75}; int size = sizeof arr / sizeof arr[0]; //求最小的前三个元素 返回下标二 访问前三个元素 int k = 3; SelectTopK(arr, 0, size-1, k);//起始跟末尾元素的下标 在数组中使用递归必须标识出一段范围出来 for (int i = 0; i < k; i++) { cout << arr[i] << " "; } cout << endl; }一致性哈希算法(分布式系统负载均衡的首选算法),服务器负载均衡环境下,可以配置的负载均衡算法很多,如轮询算法,哈希算法,权重比算法,最少连接算法等,且一个良好的分布式哈希方案应该具有良好的单调性,即服务节点的增减不会造成大量哈希的重新定位。负载场景,接入机负载均衡后端带N台服务器,接入机接收到请求,需要将请求平均分发到每台服务器上,每台服务器负责1/N的服务处理。普通的哈希算法根据客户端的ip+port经过md5哈希函数处理完%服务器的个数,得到数字意味着一直分发到对应的服务器上,所以同一个客户端永远被映射到一台指定的server上(这个可以有效解决会话共享的问题,不用额外引入redis缓存)。但如果1号机器挂掉了,nginx会动态的识别到故障,重新进行哈希,会影响之前服务器上登录成功的用户。场景二:key-value缓存,服务器要CRUD数据,可以直接操作DB但做不到高并发磁盘IO较慢(即使对索引做一些优化,索引大了读索引也要花费磁盘IO),所以会把热点数据储存到缓存上,缓存也要集群部署。用用户的id当做key在缓存负载均衡器上用md5处理,映射到某台缓存服务器上。如果某台缓存服务器掉了,会映射到别的服务器上,缓存中找不到相应的信息去DB上找,可能会瞬间释放大量的用户信息查询到DB上,导致缓存血崩MySQL挂掉业务瘫痪。总结:业务服务器缓存服务器通过普通的哈希算法,如果有server宕机/增加,会有大量的请求变更到不同的服务器上很不稳定。理想上谁挂了不影响其他server上的客户,只会把后期新的请求映射到新的server上减少改动。
一致性哈希算法:一致性哈希算法将整个哈希值空间理解成一个环,取值范围是0-2^32-1共4G的整数空间。2.将所有服务器进行哈希,传入服务器的ip地址通过md5算法得到uint的整数,最终落在这个一致性哈希环上。3.进行负载时,先哈希输入值得到一致性哈希环上的一个哈希值,例如A的请求来了带着ip+port作为输入值md5进行哈希得到整数位置,然后沿着顺时针,遇到的第一台服务器,就是最终负载到的服务器。4.容错性(减少服务器节点)和可扩展性(添加新节点),减少一个节点不会影响在别的服务器上的客户端,只影响原来映射到这台机器上的客户端,增加一个节点只影响增加的服务位置,按逆时针遇到的上一台服务中间的这些请求(做到了最小的改动),不会大量哈希定位。一致性哈希算法底层利用了一个set红黑树,储存散列码从小到大的关系只存key(哈希环),哈希得到的值以后添加到里面。服务器经过一致性哈希处理后,在哈希环上应该分散一些(每个server负载能力均衡)。采用一致性哈希环的虚拟节点,就是把一台机器虚拟的看成一百台机器,使用ip#...的方式,ip相同#后面的数字不同,得到不同位置的散列值。且md5算法哪怕输入参数值比较近似,md5算法处理后结果也是非常离散的。给真实的主机放100~200个虚拟节点,且虚拟节点肯定保存对应的物理节点主机的信息,最终负载落在对应真实主机上。虚拟节点的目的:防止物理节点过少,导致哈希处理后,在一致性哈希环上挤在一起,导致某一台server负载太多,其他的server一直很空闲。
//一致性哈希 #include <iostream> #include <string> //ip地址 #include <set> #include <list> //虚拟节点存储 #include <map>//每一台主机 承载了几台客户端的映射 using namespace std; #include "md5.h" using uint = unsigned int; //一致性哈希环的取值类型 //虚拟节点要访问物理节点 做一个前置声明 class PhscialHost; //虚拟节点 class VirtualHost { public: VirtualHost(string ip, PhscialHost* p) : ip_(ip) , phscialHost_(p) { md5_ = ::getMD5(ip_.c_str()); //调用全局不是自己 } //提供小于运算符的重载函数 给相应的类对象提供重载函数 添加 bool operator<(const VirtualHost& host) const { return md5_ < host.md5_;//当前节点的md5要小于传进来的md5 } //比较两个自定义如何相等 删除中的查找 相等运算符的重载函数 bool operator==(const VirtualHost& host) const { return ip_ == host.ip_; } uint getMD5() const { return md5_; } const PhscialHost* getPhscialHost() const { return phscialHost_; } private: string ip_; //虚拟节点的ip信息 uint md5_; //虚拟节点在一致性哈希环上的位置 PhscialHost* phscialHost_;//虚拟节点隶属的物理节点 }; // 物理节点 class PhscialHost { public: PhscialHost(string ip, int vnumber) : ip_(ip) { for (int i = 0; i < vnumber; i++) { // 虚拟节点需要一个ip,还需要记录它属于哪个物理节点 virtualHosts_.emplace_back( ip + "#" + ::to_string(i), this ); } } string getIP() const //一致性哈希要访问ip 和 虚拟链表 { return ip_; } const list<VirtualHost>& getVirtualHosts() const { return virtualHosts_; } private: string ip_;//物理机器的ip地址 list<VirtualHost> virtualHosts_;//存储虚拟节点列表 }; //一致性哈希 class ConsistentHash { public: // 在一致性哈希环上添加物理主机的虚拟节点 void addHost(PhscialHost& host) { //获取物理主机所有虚拟节点列表 auto list = host.getVirtualHosts(); for (auto host : list) { //遍历这个列表 hashCircle_.insert(host); } } //在一致性哈希环上删除物理主机的虚拟节点 void delHost(PhscialHost& host) { //获取物理主机所有虚拟节点列表 auto list = host.getVirtualHosts(); for (auto host : list) { //遍历这个列表 auto it = hashCircle_.find(host); if (it != hashCircle_.end()) { //在一致性哈希环上删除所有物理主机对应的虚拟节点 hashCircle_.erase(it); } } } //返回负载的真实物理主机的ip信息 string getHost(string clientip) const { uint md5 = getMD5(clientip.c_str());//计算客户端的md5 for (auto vhost : hashCircle_) { if (vhost.getMD5() > md5) { return vhost.getPhscialHost()->getIP(); } } //映射从0开始遇见的第一个虚拟节点 return hashCircle_.begin()->getPhscialHost()->getIP(); } private: set<VirtualHost> hashCircle_; //一致性哈希环 }; void ShowConsistentHash(ConsistentHash& chash);//函数声明 //测试一致性哈希算法的功能 int main() { PhscialHost host1("10.117.124.10", 150); PhscialHost host2("10.117.124.20", 150); PhscialHost host3("10.117.124.30", 150); ConsistentHash chash; chash.addHost(host1); chash.addHost(host2); chash.addHost(host3); ShowConsistentHash(chash); //模拟host1有故障 chash.delHost(host1); //封装一个接口完成上述同样的操作 ShowConsistentHash(chash); } void ShowConsistentHash(ConsistentHash& chash) { list<string> iplists{ "192.168.1.123", "192.168.1.12", "192.168.1.13", "192.168.1.23", "192.168.1.54", "192.168.1.89", "192.168.1.21", "192.168.1.27", "192.168.1.49", "192.168.1.145", "192.168.2.34", "192.168.6.78", "192.168.2.90", "192.168.4.5" }; map<string, list<string>> logMap; for (auto clientip : iplists) { string host = chash.getHost(clientip); logMap[host].emplace_back(clientip); } for (auto pair : logMap) { cout << "物理主机:" << pair.first << endl; cout << "客户端映射的数量:" << pair.second.size() << endl; for (auto ip : pair.second) { cout << ip << endl; } cout << "----------------------------" << endl; } cout << endl; } //每个虚拟节点通过phscialHost_指针指向所属的物理节点,因此当客户端映射到某个虚拟节点时,能快速找到对应的物理节点(getPhscialHost()->getIP())。