/************************************** * Tyler Simon * mmmio_t.c * memory mapped I/O with threads * put a file in memory and copy * to another file concurrently with pthreads * * Don't use this on anything you don't have backed up, this code does not guarantee * data integrity. This is a research prototype use at your own risk, that * being said if you have any problems please let me know. * * * 8/5/09: Original * 4/04/10 Added automatic core detection * 10/14/10 Added thread<->core affinity via core.c * * *************************************/ #include #include #include #include #include #include #include #include #include #include #include #ifndef DEBUG #define DEBUG 0 #endif #ifndef MAP #define MAP 1 #endif #ifndef HALF #define HALF 0 #endif #ifndef SYNC #define SYNC 0 #endif #define MAX_THREADS 255 double getCurrentTime(void); void *tcopy_segment(void*); struct thread_data{ int t_id; int t_fd; int t_fdout; long t_buf_size; }; struct thread_data thread_data_array[MAX_THREADS]; char *buf; char *data; char *out; float residual; ssize_t totbytes=0; int cpufreq, nprocs; int main(int argc, char *argv[]) { int fd, fdout,i,rc,t; double t0,t1,t2,t3; struct stat sbuf; char fname[255]; char fnameout[255]; int nthreads,tid; pthread_attr_t tattr; size_t stacksize=1048576; long int bread; char hname[255]; double elapsedTime; double start_time,end_time; if (argc != 3) { fprintf(stderr, "usage: %s \n", argv[0]); exit(1); } strcpy(fname,argv[1]); strcpy(fnameout,argv[2]); gethostname(hname,255); /* see how many processors we have*/ if(HALF)nprocs=getNumCPU()/2; else nprocs=getNumCPU(); //get processinfo start_time=getCurrentTime(); fd=open(fname,O_RDONLY); if(fd<0){ if(errno) {/*We know there is an error*/ printf("Error %d: %s %s\n", errno, fname,strerror(errno)); exit(1); } else perror("problems opening file"); exit(1); } if(!fchdir(fd)){ puts("is a directory"); exit(1); } if(access(fname, F_OK)) { printf("Error %d: %s %s\n", errno, fname,strerror(errno)); exit(1); } /*if the files exist and are the same*/ if (strcmp(fname,fnameout)==0)exit(1); unlink(fnameout); if (fdout=open(fnameout,O_WRONLY,O_NONBLOCK)<0) { if(errno == ENOENT) { if ((fdout = creat(fnameout, O_RDWR)) < 0) { perror("creat error"); exit(1); } }else {perror("please specify destination file"); exit(1); } } chmod(fnameout,0600); if (stat(fname, &sbuf) == -1) { perror("stat"); exit(1); } residual=sbuf.st_size%nprocs; pthread_t threads[nprocs]; data=malloc(sbuf.st_size); if(!data) { perror("malloc Failed:"); exit(1); } if ((data = mmap((caddr_t)0, sbuf.st_size, PROT_READ, MAP_SHARED,fd, 0)) == (caddr_t)(-1)) { perror("mmap"); exit(1); } close(fd); printf("%s: Copying %s --> %s with %d threads.\n",hname, fname,fnameout,nprocs); /*create threads*/ for (t=0;tt_id; int from=local_data->t_fd; int to=local_data->t_fdout; long len=local_data->t_buf_size; /* Map a thread to a core*/ if(MAP)getCPU(taskid); long int myshare; if(DEBUG==2)printf("id=%d to=%d from=%d len=%ld res=%f offset = %d\n", taskid, to, from, len, residual, (len-residual)*taskid); if(taskid==nprocs-1){ myshare=(len-residual)*taskid; lseek(to, myshare,SEEK_CUR); totbytes+=write(to,data+myshare,len); if(errno){printf("error writing = %d\n", errno);} } else totbytes+=pwrite(to,data+(len*taskid),len,len*taskid); if(DEBUG==1) { char cpucmd[255]; sprintf(cpucmd, "ps -p %d -L -o cmd,thcount,pid,tid,pcpu,pmem,time,size,psr", getpid()); system(cpucmd); } if(DEBUG==2)printf("%d residual = %f blocklength = %d totbytes = %ld \n",taskid, residual ,len, totbytes); //msync(len,totbytes,MS_ASYNC); pthread_exit(NULL); }/*end copy segment */ double getCurrentTime(void) { struct timeval tval; gettimeofday(&tval, NULL); return (tval.tv_sec + tval.tv_usec/1000000.0); }