#include #include #include #include #include #include // from daq.h #define DAQ_EVTYPE_START_RUN 1 #define DAQ_EVTYPE_STOP_RUN 2 #define PORT 31337 #define OUT_FILE "junk" //#define HOST 192. //#define flen 53851 #define BUFLEN 65536 //65536-20 is the maximum udp packet size, looks like a nice maximum for server. //#define DEBUG /** * TODO: * Make sure data is fetched from same client. **/ int main(void){ int sockHandle; //int port; struct sockaddr_in *sock_server, *sock_client; char buf[BUFLEN]; unsigned char *bufp,evtype; unsigned int downloaded=0; unsigned int sequence=0; unsigned long fileSize=0; unsigned long transSize=0,evsize=0; int packetCount=-1; int received=0; FILE *fileHandle; time_t timv; struct tm *ltm; char filename[40]; struct timeval tv_start, tv_finish; double diff_msec; socklen_t socklen=sizeof(struct sockaddr_in); printf("datasink\n"); sock_server = malloc(socklen); sock_client = malloc(socklen); if(!sock_server || !sock_client){ printf("Cannot allocate memory for socket.\n"); } sockHandle=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(sockHandle<0){ printf("Cannot get socket\n"); return 0; } sock_server->sin_family = AF_INET; sock_server->sin_addr.s_addr=htonl(INADDR_ANY); sock_server->sin_port = htons(PORT); if(bind(sockHandle, (struct sockaddr *)sock_server, socklen)<0){ printf("Error, cannot bind socket\n"); return 0; } printf("ready for data\n"); while(1){ received=recvfrom(sockHandle, buf, BUFLEN, 0, (struct sockaddr *)sock_client, &socklen); if(received<0){ printf("Error, cannot receive data\n"); return 0; } if(packetCount==-1){ sequence = buf[0]<<8; sequence |= buf[1]; sequence &= 0x0000ffff; if(sequence!=0){ printf("Caught packet in middle of stream, ignoring\n"); continue; }else{ #ifdef DEBUG printf("Caught first packet, starting capture.\n"); #endif packetCount=0; transSize = (buf[2] & 0x000000ff)<<8; transSize |= buf[3] & 0x000000ff; transSize &= 0x0000ffff; #ifdef DEBUG printf("s%u, p%u packet/ransfer size =0x%x/%x\n", sequence, packetCount, received,transSize); #endif //check for correct n3c format bufp = (unsigned char*)buf+4; evtype = *bufp; evsize = (bufp[1] & 0xff)<<16; evsize |= (bufp[2] & 0xff)<<8 ; evsize |= (bufp[3] & 0xff); if(sequence==0 && (evsize != received - 4)) { bufp = (unsigned char*)buf+4; printf("Not n3c event. Size=0x%x, evSize=0x%x, evtype=0x%x, body=%02x%02x%02x%02x\n", transSize, evsize, evtype, bufp[0],bufp[1],bufp[2],bufp[3]); } else packetCount=-1; // all packets of this transfer have been received switch (evtype) { case DAQ_EVTYPE_START_RUN: timv = time(0); ltm = localtime(&timv); sprintf(filename,"%02i%02i%02i%02i%02i%02i.dq0", ltm->tm_year%100,ltm->tm_mon+1,ltm->tm_mday, ltm->tm_hour,ltm->tm_min,ltm->tm_sec); printf("Start Run %s\n",filename); fileHandle=fopen(filename, "wb"); if(fileHandle==NULL) { printf("Cannot open file for writing\n"); return 0; } downloaded = 0; packetCount=-1; gettimeofday(&tv_start,0); break; case DAQ_EVTYPE_STOP_RUN: gettimeofday(&tv_finish,0); printf("Stop Run. Downloaded %i bytes. ",downloaded); diff_msec = (tv_finish.tv_sec - tv_start.tv_sec)*1000. + (double)(tv_finish.tv_usec - tv_start.tv_usec)/1000.; printf("@ %g KBytes/s\n",downloaded/diff_msec); if(fileHandle) fclose(fileHandle); fileHandle=NULL; printf("continue..\n"); break; default: fwrite(buf+4, 1, received-4, fileHandle); downloaded += received-4; } } }else{ sequence = buf[0]<<8; sequence |= buf[1]; sequence &= 0x0000ffff; if(sequence!=packetCount+1){ printf("Missed a packet, bumping sequence. Expecting %u, got %u. Waiting for new sequence.\n", packetCount+1, sequence); fseek(fileHandle, 0, SEEK_SET); packetCount=0; downloaded=0; }else{ fwrite(buf+2, 1, received-2, fileHandle); packetCount++; downloaded += received-2; printf("Downloaded: %i\n", downloaded); } } //if(downloaded>=fileSize){ //printf("Download complete. %i out of %i bytes.\n", // downloaded, fileSize); // break; } if(fileHandle) fclose(fileHandle); //listen(sockHandle) }