#include "MergeSpaceCoordinator.h" #include "Log.h" #include "ScopedLock.h" #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include <sys/statvfs.h> #include <stdio.h> #include <string.h> #include <time.h> #include <sys/time.h> #include <errno.h> #include <utime.h> #include <signal.h> #include <stdlib.h> void *hold_lock_thread_function_trampoline(void *pv); static const time_t touch_interval = 30; static std::string make_lock_filename(const std::string &lock_dir, int lock_number) { char name[16]; sprintf(name,"lock%d",lock_number); return lock_dir + "/" + name; } MergeSpaceCoordinator::MergeSpaceCoordinator(const char *lock_dir_, int min_lock_files_, const char *merge_space_dir_) : hold_lock_thread_running(false), mtx(), please_shutdown(false), held_lock_number(-1), lock_dir(lock_dir_), min_lock_files(min_lock_files_), merge_space_dir(merge_space_dir_) { pthread_cond_init(&cond,NULL); int rc = pthread_create(&hold_lock_tid,NULL,hold_lock_thread_function_trampoline,this); if(rc!=0) { log(LOG_ERROR,"pthread_create() returned %d (%s)", rc, strerror(rc)); return; } hold_lock_thread_running = true; } MergeSpaceCoordinator::~MergeSpaceCoordinator() { relinquish(); if(hold_lock_thread_running) { pthread_mutex_lock(&mtx.mtx); please_shutdown = true; pthread_cond_signal(&cond); pthread_mutex_unlock(&mtx.mtx); pthread_join(hold_lock_tid,NULL); hold_lock_thread_running = false; } } bool MergeSpaceCoordinator::acquire(uint64_t how_much) { if(please_shutdown) return false; if(held_lock_number>=0) return true; if(min_lock_files<=0) { log(LOG_ERROR,"MergeSpaceCoordinator::acquire: min_lock_files=%d. Locking will never succeed",min_lock_files); return false; } struct statvfs svfs; if(statvfs(merge_space_dir.c_str(),&svfs)==0) { unsigned long free_bytes = svfs.f_bavail * svfs.f_bsize; if(free_bytes < how_much) return false; } //verify or create lock directory struct stat st; if(stat(lock_dir.c_str(),&st)==0) { if(S_ISDIR(st.st_mode)) ; //excellent else log(LOG_ERROR,"Lock directory %s is not a directory", lock_dir.c_str()); } else { //lock directory doesn't exist. Try to create it. //the equivalent of "mkdir -p" would be nice but not necessary (void)mkdir(lock_dir.c_str(),0777); } for(int lock_number=0; ; lock_number++) { std::string filename(make_lock_filename(lock_dir,lock_number)); int fd; if(lock_number<min_lock_files) fd = open(filename.c_str(), O_RDWR|O_CREAT, 0666); else fd = open(filename.c_str(), O_RDWR, 0666); if(fd>=0) { char buf[16]; ssize_t bytes_read = pread(fd,buf,sizeof(buf)-1,0); if(bytes_read<0) { log(LOG_ERROR,"read(%s) failed with errno %d (%s)", filename.c_str(), errno, strerror(errno)); close(fd); continue; } else if(bytes_read>0) { buf[bytes_read] = '\0'; char *endptr=NULL; long pid = strtol(buf,&endptr,10); if(endptr==NULL || !*endptr) { //perhaps it is a pid of a running process if(kill((pid_t)pid,0)==0) { //it is a pid of a running process. Has the file actually been touched the past 30 seconds? struct stat st2; if(fstat(fd,&st2)!=0) { log(LOG_ERROR,"fstat(%s) failed with errno %d (%s)", filename.c_str(), errno, strerror(errno)); close(fd); continue; } if(st2.st_mtim.tv_sec + touch_interval*2+1 > time(0)) { //touched the past 61 seconds, so the process holds the lock close(fd); continue; } } else { //old pid in file. truncate it log(LOG_DEBUG,"Old pid %ld found in %s, or errno=%d", pid, filename.c_str(), errno); (void)ftruncate(fd,0); } } else { //junk in file. truncate it log(LOG_DEBUG,"Junk in %s", filename.c_str()); (void)ftruncate(fd,0); } } else { //empty - assume it is unheld } //ok, the lock appears unheld. sprintf(buf,"%d",(int)getpid()); ssize_t bytes_written = pwrite(fd,buf,strlen(buf),0); if(bytes_written<0) { log(LOG_ERROR,"pwrite(%s) failed with errno %d (%s)", filename.c_str(), errno, strerror(errno)); close(fd); continue; } else if((size_t)bytes_written<strlen(buf)) { //what? log(LOG_ERROR,"pwrite(%s,%ld) returned %ld", filename.c_str(), strlen(buf), bytes_written); close(fd); continue; } //we wrote our pid to the file. Wait a bit and then verify that it has not been overwritten timespec ts; ts.tv_sec = 1; ts.tv_nsec = 0; nanosleep(&ts,NULL); bytes_read = pread(fd,buf,sizeof(buf)-1,0); close(fd); if(bytes_read!=bytes_written) { //file size changed or there was a read error. In either case the lock failed log(LOG_DEBUG,"%s changed size", filename.c_str()); continue; } buf[bytes_read] = '\0'; char *endptr=NULL; long pid = strtol(buf,&endptr,10); if(endptr!=NULL && *endptr) { log(LOG_DEBUG,"Junk found in %s", filename.c_str()); continue; } if(pid!=getpid()) { log(LOG_DEBUG,"New pid (%d) found in %s (our pid = %d)", (int)pid, filename.c_str(), (int)getpid()); continue; } // lock is not needed but it shuts up thread sanitizer ScopedLock sl(mtx); //hurray! held_lock_number = lock_number; log(LOG_DEBUG,"MergeSpaceCoordinator::acquire: got lock #%d for %" PRIu64" bytes", held_lock_number, how_much); return true; } else { if(lock_number<min_lock_files) log(LOG_ERROR,"open(%s|O_CREAT) failed with errno %d (%s)", filename.c_str(), errno, strerror(errno)); else break; } } return false; } bool MergeSpaceCoordinator::acquired() const { return held_lock_number>=0; } void MergeSpaceCoordinator::relinquish() { if(held_lock_number>=0) { log(LOG_DEBUG,"MergeSpaceCoordinator::relinquish(): held_lock_number=%d", held_lock_number); ScopedLock sl(mtx); int tmp = held_lock_number; held_lock_number = -1; sl.unlock(); std::string filename(make_lock_filename(lock_dir,tmp)); int fd = open(filename.c_str(),O_WRONLY|O_TRUNC,0666); if(fd>=0) close(fd); else (void)::unlink(filename.c_str()); } } void *hold_lock_thread_function_trampoline(void *pv) { MergeSpaceCoordinator *msc = (MergeSpaceCoordinator*)pv; msc->hold_lock_thread_function(); return NULL; } //touch the lock file every 30 seconds void MergeSpaceCoordinator::hold_lock_thread_function() { ScopedLock sl(mtx); while(!please_shutdown) { //touch the lock file if we hold the lock if(held_lock_number>=0) { std::string filename(make_lock_filename(lock_dir,held_lock_number)); struct timeval tv[2]; gettimeofday(tv+0,NULL); tv[1] = tv[0]; if(utimes(filename.c_str(),tv)!=0) { log(LOG_ERROR,"utimes(%s) failed with errno=%d (%s)", filename.c_str(), errno, strerror(errno)); } } //sleep 30 seconds timespec ts; clock_gettime(CLOCK_REALTIME,&ts); ts.tv_sec += touch_interval; (void)pthread_cond_timedwait(&cond,&mtx.mtx,&ts); } }