/* Copyright 2013 Willem Vermin, SURFsara Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include "stopos_pool.h" #include "stopos.h" #include "wutils.h" #include #include #include int stopos_pool::get_record(std::string &r, const std::string &k, const std::string &l) // non-sense example implementation, you have to overwrite this in a derived class { std::cerr << __FILE__<<":"<<__LINE__<<"stopos_pool::get_record must be overwritten in a derived class"<db_name<< std::endl; return 0; } int stopos_pool::open_db(void) // non-sense example implementation, you have to overwrite this in a derived class { std::cerr << __FILE__<<":"<<__LINE__<<"stopos_pool::open_db must be overwritten in a derived class"<db_name<<" and assign the result to an handle in your class" << std::endl; this->db_open = 1; return 0; } int stopos_pool::close_db(void) // non-sense example implementation, you have to overwrite this in a derived class { std::cerr << __FILE__<<":"<<__LINE__<<"stopos_pool::close_db must be overwritten in a derived class"<db_name<kvp = 0; this->statuskey = stopos_key(); // full key this->db_name = 0; this->db_open = 0; } // destructor stopos_pool::~stopos_pool() { if(db_name) free (db_name); } int stopos_pool::create_db(void) { int rc; rc = this->create_db(this->sdb_name); if(rc != 0) return rc; this->status.init(); rc = this->open_db(); if (rc != 0) return rc; rc = this->put_status(); this->close_db(); return rc; } int stopos_pool::get_record(datarecord &r, const stopos_key &k) { std::string s; int rc; rc = this->get_record(s, k.get_key(),k.get_slot()); if (rc != 0) return rc; r = datarecord(s,dbsep); return 0; } int stopos_pool::put_record(const datarecord &r, const stopos_key &k) { std::string b; int rc; b = r.str(dbsep); rc = this->put_record(b,k.get_key(),k.get_slot()); return rc; } int stopos_pool::remove_record(const stopos_key &k) { return this->remove_record(k.get_key(),k.get_slot()); } void stopos_pool::set_db_name(std::string filename) { this->sdb_name = filename; this->db_name = strdup(filename.c_str()); } std::string stopos_pool::get_db_name(void) { return this->sdb_name; } int stopos_pool::put_status(void) { std::string buf; buf = this->status.str(dbsep); return this->put_record(buf,this->statuskey.get_key(),this->statuskey.get_slot()); } int stopos_pool::get_status(void) { std::string buf; int rc =this->get_record(buf,this->statuskey.get_key(),this->statuskey.get_slot()); if (rc != 0) return rc; this->status = statusrecord(buf,dbsep); return 0; } int stopos_pool::add_line(const std::string line, std::string &key) { if (!this->db_open) return DBNOTOPEN; datarecord d; stopos_key dkey; int rc; rc = get_status(); if (rc != 0) return rc; //dkey is the key of the inserted record //dkey = status.last; std::string slot = this->key_to_slot(stopos_key::NumberToKey(status.count +1)); dkey.set(status.count + 1,slot); // if status.last != 0, modify 'next' of last record if (status.last != this->statuskey) { datarecord l; rc = this->get_record(l,this->status.last); if (rc != 0) return rc; l.next = dkey; rc = this->put_record(l,this->status.last); if (rc !=0) return rc; } // if status.next points to nothing // or // if status.next points to a record that has been committed, // then // status.next should point to the record just added // modify status record if (this->status.next == this->statuskey) // status.next points to nothing this->status.next = dkey; else // look if status.next points to a record that has already been committed { datarecord r; int rc; rc = this->get_record(r,this->status.next); if (rc != 0) return FETCHERROR; if (r.retrieved != 0) this->status.next = dkey; } if (this->status.first == this->statuskey) this->status.first = dkey; d.prev = status.last; this->status.last = dkey; this->status.count++; this->status.present++; this->status.present0++; rc = this->put_status(); if (rc != 0) return this->STOREERROR; d.value = zstrtohex(line); d.next = this->statuskey; d.retrieved = 0; rc = put_record(d,dkey); if (rc != 0) return this->STOREERROR; key = dkey.str(); return 0; } int stopos_pool::get_line(std::string &line, longuint &retrieved, std::string &key) { if (!this->db_open) return DBNOTOPEN; int rc; rc = this->get_status(); if (rc != 0) return FETCHERROR; if (this->status.next == this->statuskey) { if (!this->kvp) return NOTFOUND; // wrapping around ... status.next = status.first; if (this->status.next == this->statuskey) return NOTFOUND; } datarecord r; rc = this->get_record(r,this->status.next); if (rc != 0) return FETCHERROR; retrieved = r.retrieved; if (!this->kvp) if (r.retrieved !=0) return NOTFOUND; key = this->status.next.str(); line = zhextostr(r.value); // adjust the 'retrieved' field of r, and write it r.retrieved++; rc = this->put_record(r,status.next); if (rc != 0) return STOREERROR; // adjust the 'next' field of status. this must be equal // to the 'next' field of r this->status.next = r.next; if (r.retrieved == 1) this->status.present0--; this->put_status(); return 0; } int stopos_pool::remove_line(const std::string &k) { if (!this->db_open) return DBNOTOPEN; int rc; datarecord r; stopos_key key(k); rc = this->get_record(r,key); if (rc !=0) return rc; // remove this record from the linked lists. rc=this->get_status(); if (rc != 0) return rc; // //remove_line // r = record to remove // // situations we have to deal with: // // r.next = 0 ( = key to status) // r.next = data ( = key to data) // // r.prev = 0 // r.prev = data // // if (r.prev != 0) // get p(r.prev) // // if (r.next != 0) // get n(r.next) // // if (r.next = 0) //r is last record // { // if r.prev = data // { // p.next = r.next (=0) // if (key == s.next) // s.next = r.next ( = 0) // if (key == s.last) // must be true, r is last record // s.last = r.prev // if (key == s.first) // impossible, because there is a previous record // } // else // r.prev = 0 // this is the only record // { // s.next = r.next ( =r.prev =0) // s.first = r.prev ( = r.next =0) // s.last = r.prev ( = r.next = 0) // } // } // else // r has a follower // { // if r.prev = datarecord // { // p.next = r.next // n.prev = r.prev // if (key == s.next) // s.next = r.next // if (key == s.last) // impossible, there is a follower // if (key == s.first) // impossible, there is a predecessor // } // else // r.prev = 0: r is the first record // { // n.prev = r.prev (=0) // if (key == s.next) // s.next = r.next // if (key == s.last) // impossible, there is a follower // if (key == s.first) // must be // s.first = r.next // } // } // remove record r // this->get_record(r,key); // // situations we have to deal with: // // r.next = 0 ( = key to status) // r.next = data ( = key to data) // // r.prev = 0 // r.prev = data // datarecord p,n; if (r.prev != this->statuskey) this->get_record(p,r.prev); if (r.next != this->statuskey) this->get_record(n,r.next); // // and what to do if the next record to be dumped is // exactly this record? // // I think, status.nextd should become r.next, // whatever r.next is pointing to // However, if r.next is pointing to nothing, // we should give status.nextd an impossible value // if (this->status.nextd == key) { if (r.next == this->statuskey) this->status.nextd.impossible(); else this->status.nextd = r.next; } if (r.next == this->statuskey) // r is last record { if (r.prev != this->statuskey) // r is last record and has a predecessor { p.next = r.next; this->put_record(p,r.prev); if (key == status.next) status.next = r.next; // if (key == status.last) // always the case here status.last = r.prev; // if (key == status.first) // impossible, because there is a previous record } else // r is the only record { status.next = r.next; status.first = r.prev; status.last = r.prev; } } else // r has a follower { if (r.prev != this->statuskey) // r has a predecessor and a follower { p.next = r.next; this->put_record(p,r.prev); n.prev = r.prev; this->put_record(n,r.next); if (key == status.next) status.next = r.next; // if (key == status.last) // impossible, r has a follower // if (key == s.first) // impossible, r has a predecessor } else // r is first record and has a follower { n.prev = r.prev; this->put_record(n,r.next); if (key == status.next) status.next = r.next; // if (key == status.last) // impossible, r has a follower // if (key == status.first) // always true here status.first = r.next; } } // finally, remove the record rc = this->remove_record(key); if (rc != 0) return rc; // and rewrite status this->status.present--; if (r.retrieved == 0) this->status.present0--; rc = this->put_status(); if (rc != 0) return rc; return 0; } bool stopos_pool::is_statusrecord(const std::string &k) { return stopos_key::KeyToNumber(k) == 0; } void stopos_pool::set_kvp(bool v) { this->kvp = v; } bool stopos_pool::get_kvp(void) { return this->kvp; } int stopos_pool::get_counts(longuint &count, longuint &present, longuint &present0) { int rc = get_status(); if (rc != 0) return rc; count = status.count; present = status.present; present0 = status.present0; return 0; } int stopos_pool::dump_line(std::string &line, longuint &retrieved, std::string &key) { if (!this->db_open) return DBNOTOPEN; int rc; rc = this->get_status(); if (rc != 0) return FETCHERROR; // // if status.nextd is pointing to nothing, // we assume that one is starting the dump, // and let nextd point to the first record // if (this->status.nextd == this->statuskey) this->status.nextd = this->status.first; // if nextd is still pointing to nothing, // there is nothing if (this->status.nextd == this->statuskey) return NOTFOUND; datarecord r; rc = this->get_record(r,this->status.nextd); if (rc != 0) { this->status.nextd = this->status.first; this->put_status(); return NOTFOUND; } key = this->status.nextd.str(); retrieved = r.retrieved; line = zhextostr(r.value); this->status.nextd = r.next; if (this->status.nextd == this->statuskey) // we signal that we reached eoi this->status.nextd.impossible(); // at the next call this->put_status(); return 0; } datarecord::datarecord() { this->retrieved = 0; } datarecord::datarecord(const std::string &s, char sep) { std::vector v; split(v,s,sep); this->prev = stopos_key(v[0]); this->next = stopos_key(v[1]); this->retrieved = StringToNumber(v[2]); this->value = v[3]; } std::string datarecord::str(char sep) const { std::string r; r = this->prev.str() + sep + this->next.str() + sep + NumberToString(this->retrieved) + sep + this->value ; return r; } statusrecord::statusrecord(void) { this->init(); } statusrecord::statusrecord(const std::string &s, char sep) { std:: vector v; split(v,s,sep); this->next = stopos_key(v[0]); this->first = stopos_key(v[1]); this->last = stopos_key(v[2]); this->nextd = stopos_key(v[3]); this->count = StringToNumber(v[4]); this->present = StringToNumber(v[5]); this->present0 = StringToNumber(v[6]); this->extra = v[7]; } void statusrecord::init(void) { this->next = stopos_key(); this->first = stopos_key(); this->last = stopos_key(); this->nextd = stopos_key(); this->count = 0; this->present = 0; this->present0 = 0; // note: extra is not initialized here, that is // something for a derived class } std::string statusrecord::str(char sep) { std::string r; r = this->next.str() + sep + this->first.str() + sep + this->last.str() + sep + this->nextd.str() + sep + NumberToString(this->count) + sep + NumberToString(this->present) + sep + NumberToString(this->present0) + sep + this->extra ; return r; }