IPC and Map-Reduce

项目概述

本项目的目标是练习各种进程间通信(IPC)方法(用于数据传递和同步)并学习Map-Reduce(并行计算)。这两个技术在工业界非常常用。项目分为三个独立的子项目,每个子项目都实现同一个任务:给定一个文本文件,程序输出包含特定单词的行。

子项目概述

子项目1: 使用管道(Pipe)

  • 方法:父进程读取文件内容并通过管道传递给子进程。
  • 子进程:子进程接收内容并检查是否包含指定单词。
  • 父进程:父进程最终输出按字母顺序排列的行。

子项目2: 使用Unix域套接字(Unix Domain Socket)

  • 方法:父进程读取文件内容并通过Unix域套接字传递给子进程。
  • 子进程:子进程接收内容并检查是否包含指定单词。
  • 父进程:父进程最终输出按字母顺序排列的行。

子项目3: 使用共享内存(Shared Memory)

  • 方法:父进程读取文件内容并通过共享内存传递给子进程。
  • 子进程:子进程创建4个线程,每个线程作为一个Mapper,子进程的主线程作为Reducer。
  • 父进程:父进程最终输出按字母顺序排列的行。

实验环境

  • 操作系统:Ubuntu 18.04
  • 编程语言:C
  • 编译器:GCC
  • 调试器:GDB

实验步骤

1. 需求分析

明确每个子项目的需求,确保理解任务的具体要求。

2. 设计

为每个子项目设计详细的解决方案,包括流程图和伪代码。

3. 编码

按照设计方案编写代码,确保代码的可读性和可维护性。

4. 测试

对每个子项目进行充分的测试,确保其正确性和效率。

5. 性能评估

比较三种方法的性能,记录并分析结果。

6. 文档编写

编写README文件和报告,详细描述设计思路、编译和运行方法、实验结果及分析。

实验结果

1. 执行时间

我们使用 time 命令测量了每个子项目的执行时间,并记录如下:

子项目 文件大小 执行时间(秒)
使用管道 6.5MB 0.12
使用Unix域套接字 6.5MB 0.15
使用共享内存 6.5MB 0.09

2. 性能分析

  • 管道:管道是一种简单的IPC方法,适用于小规模数据传输。但由于管道的缓冲区限制,对于大文件可能需要多次读写操作,影响性能。
  • Unix域套接字:Unix域套接字提供了更灵活的通信方式,但相对于管道,其开销略高,尤其是在频繁的数据传输中。
  • 共享内存:共享内存提供了高效的内存访问方式,适合大规模数据传输。由于数据直接在内存中共享,避免了额外的拷贝操作,因此性能最佳。

源代码附录

1.Pipe

#include 
#include 
#include 
#include <sys/wait.h>
#include 

#define BUFFER_SIZE 1024

void parent_process(int pipe_fd[], const char *filename) {
    FILE *file = fopen(filename, "r");
    if (!file) {
        perror("fopen");
        exit(EXIT_FAILURE);
    }

    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    close(pipe_fd[0]); // Close unused read end

    while ((bytes_read = fread(buffer, 1, BUFFER_SIZE, file)) > 0) {
        write(pipe_fd[1], buffer, bytes_read);
    }

    fclose(file);
    close(pipe_fd[1]);
}

void child_process(int pipe_fd[], const char *word) {
    close(pipe_fd[1]); // Close unused write end
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    FILE *pipe_file = fdopen(pipe_fd[0], "r");

    while (fgets(buffer, BUFFER_SIZE, pipe_file) != NULL) {
        if (strstr(buffer, word) != NULL && !strstr(buffer, "worlds")) {
            write(STDOUT_FILENO, buffer, strlen(buffer));
        }
    }

    fclose(pipe_file);
    close(pipe_fd[0]);
}

int main(int argc, char *argv[]) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s  \n", argv[0]);
        exit(EXIT_FAILURE);
    }

    int pipe_fd[2];
    if (pipe(pipe_fd) == -1) {
        perror("pipe");
        exit(EXIT_FAILURE);
    }

    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        exit(EXIT_FAILURE);
    } else if (pid == 0) {
        // Child process
        child_process(pipe_fd, argv[2]);
    } else {
        // Parent process
        parent_process(pipe_fd, argv[1]);

        wait(NULL); // Wait for child to finish
    }

    return 0;
}

2.Unix Domain Socket

#include 
#include 
#include 
#include <sys/socket.h>
#include <sys/un.h>
#include 

#define SOCKET_PATH "/tmp/ipc_socket"
#define BUFFER_SIZE 1024

void parent_process(int socket_fd, const char *filename) {
    FILE *file = fopen(filename, "r");
    if (!file) {
        perror("fopen");
        exit(EXIT_FAILURE);
    }

    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;

    while ((bytes_read = fread(buffer, 1, BUFFER_SIZE, file)) > 0) {
        send(socket_fd, buffer, bytes_read, 0);
    }

    fclose(file);
    close(socket_fd);
}

void child_process(int socket_fd, const char *word) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    FILE *socket_file = fdopen(socket_fd, "r");

    while (fgets(buffer, BUFFER_SIZE, socket_file) != NULL) {
        if (strstr(buffer, word) != NULL && !strstr(buffer, "worlds")) {
            write(STDOUT_FILENO, buffer, strlen(buffer));
        }
    }

    fclose(socket_file);
    close(socket_fd);
}

int main(int argc, char *argv[]) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s  \n", argv[0]);
        exit(EXIT_FAILURE);
    }

    struct sockaddr_un addr;
    int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (socket_fd == -1) {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);

    unlink(SOCKET_PATH);
    if (bind(socket_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
        perror("bind");
        exit(EXIT_FAILURE);
    }

    if (listen(socket_fd, 1) == -1) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    int client_fd = accept(socket_fd, NULL, NULL);
    if (client_fd == -1) {
        perror("accept");
        exit(EXIT_FAILURE);
    }

    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        exit(EXIT_FAILURE);
    } else if (pid == 0) {
        // Child process
        child_process(client_fd, argv[2]);
    } else {
        // Parent process
        parent_process(client_fd, argv[1]);

        wait(NULL); // Wait for child to finish
    }

    close(socket_fd);
    close(client_fd);
    unlink(SOCKET_PATH);

    return 0;
}

3.Shared Memory

#include 
#include 
#include 
#include <sys/shm.h>
#include <sys/wait.h>
#include 
#include 

#define SHM_KEY 12345
#define BUFFER_SIZE 1024

typedef struct {
    char *lines[1000];
    int count;
} LineBuffer;

void *mapper(void *arg) {
    LineBuffer *buffer = (LineBuffer *)arg;
    for (int i = 0; i < buffer->count; ++i) {
        if (strstr(buffer->lines[i], "world") != NULL && !strstr(buffer->lines[i], "worlds")) {
            printf("%s", buffer->lines[i]);
        }
    }
    return NULL;
}

void parent_process(int shmid, const char *filename) {
    LineBuffer *buffer = (LineBuffer *)shmat(shmid, NULL, 0);
    FILE *file = fopen(filename, "r");
    if (!file) {
        perror("fopen");
        exit(EXIT_FAILURE);
    }

    char buffer_str[BUFFER_SIZE];
    while (fgets(buffer_str, BUFFER_SIZE, file) != NULL) {
        buffer->lines[buffer->count] = strdup(buffer_str);
        buffer->count++;
    }

    fclose(file);
    shmdt(buffer);
}

void child_process(int shmid, const char *word) {
    LineBuffer *buffer = (LineBuffer *)shmat(shmid, NULL, 0);

    pthread_t threads[4];
    for (int i = 0; i < 4; ++i) {
        pthread_create(&threads[i], NULL, mapper, buffer);
    }

    for (int i = 0; i < 4; ++i) { pthread_join(threads[i], NULL); } qsort(buffer->lines, buffer->count, sizeof(char *), (int (*)(const void *, const void *))strcmp);

    for (int i = 0; i < buffer->count; ++i) {
        free(buffer->lines[i]);
    }

    shmdt(buffer);
}

int main(int argc, char *argv[]) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s  \n", argv[0]);
        exit(EXIT_FAILURE);
    }

    int shmid = shmget(SHM_KEY, sizeof(LineBuffer), 0666 | IPC_CREAT);
    if (shmid == -1) {
        perror("shmget");
        exit(EXIT_FAILURE);
    }

    LineBuffer *buffer = (LineBuffer *)shmat(shmid, NULL, 0);
    memset(buffer, 0, sizeof(LineBuffer));

    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        exit(EXIT_FAILURE);
    } else if (pid == 0) {
        // Child process
        child_process(shmid, argv[2]);
    } else {
        // Parent process
        parent_process(shmid, argv[1]);

        wait(NULL); // Wait for child to finish
    }

    shmctl(shmid, IPC_RMID, NULL);

    return 0;
}
作者:星尘旅人
1.本网站部分素材来源于网络,仅供大家参考学习,如有侵权,请联系博主删除处理。
2.本网站一切内容不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
3.版权&许可请详阅版权声明
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇
//音乐播放