V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
commoccoom
V2EX  ›  C

请教一个关于线程条件变量的问题

  •  
  •   commoccoom · 2021-08-10 09:06:48 +08:00 · 1368 次点击
    这是一个创建于 980 天前的主题,其中的信息可能已经有所发展或是发生改变。

    两个线程,线程 1(processData)生成数据 p 并写入数据库,另一个线程 2(someSocket)将线程 1 生成的数据 p 通过 socket 发送到客户端。

    当线程 1 中的for循环结束时,如何通知线程 2 while(res == TRUE)应当结束了,线程 2 因为 pthread_cond_wait 一直在阻塞,但是此时线程 1 不会再发出信号了。这是不是就是死锁了。。。

    代码如下

    #include <string.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <sys/time.h>
    #include <unistd.h>
    #include <mysql.h>
    #include <errno.h>
    #include <pthread.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/wait.h>
    #include <netinet/in.h>
    #define TRUE 1
    #define FALSE 0
    #define MAX_STRING 128
    #define PORT 3389
    #define SA struct sockaddr
    
    pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
    
    // 存储温湿度结构体
    typedef struct {
        int temp;
        int humd;
    }humiture;
    
    // 可变长消息体
    typedef struct { 
        int nLen; 
        char data[ 0];
    }MyMessage;
    
    //全局变量
    humiture p;
    int res = TRUE;
    
    void error(char *msg)
    {
        fprintf(stderr, "%s: %s\n", msg, strerror(errno));
        exit(1);
    }
    
    void info(char *msg)
    {
        fprintf(stdout,"%s\n",msg);
    }
    
    void finish_with_error(MYSQL *con)
    {
      fprintf(stderr, "%s\n", mysql_error(con));
      mysql_close(con);
      exit(1);
    }
    
    // socket 发送数据
    int sendall(int s, char *buf, int *len)
    {
        int total = 0;
        int bytesleft = *len;
        int n;
    
        while(total < *len) {
            n = send(s, buf+total, bytesleft, 0);
            if (n == -1) { break; }
            total += n;
            bytesleft -= n;
        }
    
        *len = total;
    
        return n==-1?-1:0;
    } 
    
    // 生成数据
    humiture  collectData()
    {
        int temperature,humidity;
        srand((unsigned)time(NULL));    // 根据时间来播种随机数种子
        // 生成数据
        temperature = rand()%40+10;     // 生成 10~50 的随机数 当做温度
        humidity = rand()%70+10;    // 生成 10~80 的随机数当做湿度
        
        humiture p = {humidity, temperature};
        
        return p;
    }
    
    // 启动 MySQL 建立连接
    MYSQL* startMysql()
    {
        MYSQL *con = mysql_init(NULL);
    
        if (con == NULL)
        {        
            fprintf(stderr, "%s\n", mysql_error(con));
            exit(1);
        }
    
        if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
        {
            finish_with_error(con);
        }
    
        return con;
    }
    
    // 生成数据并存入数据库
    void * processData()
    {
        MYSQL * con = startMysql();
    
        for(int i = 0; i <20;i++)
        {
           
            pthread_mutex_lock(&plock);
            p = collectData();
            pthread_cond_signal(&pready);
            pthread_mutex_unlock(&plock);
    
            char query[MAX_STRING] = {0};
    
            snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p.temp, p.humd);
    
            if (mysql_query(con, query)) 
            {
                finish_with_error(con);
            }
            sleep(2);
        } 
    
        // 循环结束给出信号
        res = FALSE;
    
        mysql_close(con); 
        mysql_library_end();
        return NULL;
    }
    
    void * someSocket()
    {
        int sockfd, connfd;
        struct sockaddr_in servaddr, cli;
        socklen_t len;
        char buff[10];
        
        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd == -1) {
            error("socket creation failed...");
        }
        else
            info("Socket successfully created...");
    
        memset(&servaddr, 0, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(PORT);
    
        int reuse = 1;
        if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int)) == -1)
            error("Can't set the reuse option on the socket...");
    
        if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
            error("socket bind failed...");
        }
        else
            fprintf(stdout,"%s\n","Socket successfully binded...");
    
        if ((listen(sockfd, 5)) != 0) {
            error("Listen failed...");
        }
        else
           info("Server listening...");    
    
        while(TRUE)
        {
            len = sizeof(cli);        
            connfd = accept(sockfd, (SA*)&cli, &len);    
            if (connfd < 0) {
                error("server acccept failed...");
            }
            else
                info("server acccept the client...");
            
            MyMessage * myMessage = (MyMessage*)malloc(sizeof(MyMessage)+sizeof(humiture));
            int needSend = sizeof(MyMessage)+sizeof(humiture);  
            char *buffer =(char*)malloc(needSend);
    
            while(res == TRUE)
            {
                myMessage->nLen = htonl(sizeof(humiture));
                pthread_mutex_lock(&plock);
                pthread_cond_wait(&pready,&plock);
                memcpy(myMessage->data,&p,sizeof(humiture));
                pthread_mutex_unlock(&plock);
                memcpy(buffer,myMessage,needSend);
                sendall(connfd,buffer,&needSend);
                recv(connfd,buff,sizeof(buff),0);
            }
            // 当需要停止的时候发送 0 字节信息让客户端停止循环
            if(res == FALSE) 
            {
                // 将发送消息定义为 0
                myMessage->nLen = htonl(res);
                char *buffer =(char*)malloc(sizeof(int));
                memcpy(buffer,myMessage,sizeof(MyMessage));
                send(connfd,buffer,sizeof(MyMessage),0);
                shutdown(connfd,SHUT_RDWR);
                free(buffer);
            }
    
            free(myMessage);
            free(buffer);
            close(connfd);
            break;
        }
        close(sockfd);    
        return NULL;
    }
    
    int main(void)
    {
        pthread_t t0,t1;
    
        if(pthread_create(&t0, NULL,processData,NULL)==-1)
        {
            error("Can't create thread processData");
        }
        if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
        {
             error("Can't create thread someSocket");
        }
    
        void *reslut;
        if(pthread_join(t0,&reslut)==-1)
        {
            error("Can't reclaim thread t0");
        }
        if(pthread_join(t1,&reslut)==-1)
        {
            error("Can't reclaim thread t1");
        }
    
        return 0;
    }
    
    4 条回复    2021-08-10 13:33:59 +08:00
    wevsty
        1
    wevsty  
       2021-08-10 09:29:11 +08:00
    线程 1 退出的时候再发一个 pthread_cond_signal 这样线程 2 收到信号就不会锁死了。最后线程 2 检查一下退出标志再发数据不就行了么?
    commoccoom
        2
    commoccoom  
    OP
       2021-08-10 10:36:52 +08:00
    @wevsty 确实,我人傻了😂
    FranzKafka95
        3
    FranzKafka95  
       2021-08-10 13:11:31 +08:00 via Android
    或者线程 1 执行完 for 循环以后先别置 res 为 false,再加一个条件变量在这儿等,等到线程 2 发送完以后通过信号量通知线程 1 再置为 false,等到线程 2 再执行 while 循环时 res 已经为 flalse,这样就可以同步了。
    commoccoom
        4
    commoccoom  
    OP
       2021-08-10 13:33:59 +08:00
    @FranzKafka95 后续我的想法是线程 1(processData)会一直循环,线程 2(someSocket)是接收别的信号,然后启动或者停止这样。所以线程 1 不能阻塞,线程 2 可以阻塞。线程 1 现在加了循环次数是因为之前我发现有内存泄漏,所以加个停止条件看看哪里有问题😂。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3393 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 12:28 · PVG 20:28 · LAX 05:28 · JFK 08:28
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.