#define _GNU_SOURCE
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <mqueue.h>
#include <errno.h>

#define SIG SIGRTMIN
#define SIG1 (SIGRTMIN+1)

mqd_t mq;
struct message{
    int counter;
};

void *thread1(void *arg) {
    sigset_t set;
    int sig;
    timer_t timer1;
    struct itimerspec its1;
    struct sigevent sev1;
    struct timespec now;

    struct message msg;
    msg.counter = 1;

    sigemptyset(&set);
    sigaddset(&set, SIG);

    sev1.sigev_notify = SIGEV_SIGNAL;
    sev1.sigev_signo = SIG;
    timer_create(CLOCK_REALTIME, &sev1, &timer1);

    clock_gettime(CLOCK_REALTIME, &now);
    its1.it_value.tv_sec = now.tv_sec + 2; // 2 sekundy opóŸnienia
    its1.it_value.tv_nsec = now.tv_nsec;
    its1.it_interval.tv_sec = 1; // 1-dnosekudowy interwa³
    its1.it_interval.tv_nsec = 0;

    timer_settime(timer1, TIMER_ABSTIME, &its1, NULL);

    while (1) {
        sigwait(&set, &sig);
        if(mq_send(mq, (char*)&msg, sizeof(msg), 0) == -1) {
            if(errno == EAGAIN) {
                printf("Kolejka jest przepełniona\n");
            }
        } else {
            struct mq_attr attr;
            mq_getattr(mq,&attr);
            printf("Liczba wiadomości w kolejce: %d\n",(int) attr.mq_curmsgs);
            msg.counter++;
        }
    }
    pthread_exit(NULL);
}

void *thread2(void *arg) {
    sigset_t set;
    int sig;
    timer_t timer2;
    struct itimerspec its2;
    struct sigevent sev2;
    struct timespec now;
    struct message received;

    sigemptyset(&set);
    sigaddset(&set, SIG1);

    sev2.sigev_notify = SIGEV_SIGNAL;
    sev2.sigev_signo = SIG1;
    timer_create(CLOCK_REALTIME, &sev2, &timer2);

    clock_gettime(CLOCK_REALTIME, &now);
    its2.it_value.tv_sec = now.tv_sec + 2; // 2 sekundy opoznienia
    its2.it_value.tv_nsec = now.tv_nsec;
    its2.it_interval.tv_sec = 5; // 5-ciosekundowy interwa³
    its2.it_interval.tv_nsec = 0;

    timer_settime(timer2, TIMER_ABSTIME, &its2, NULL);

    while (1) {
        sigwait(&set, &sig);
            if(mq_receive(mq, (char*)&received, sizeof(received), 0) == -1) {
                if(errno == EAGAIN) {
                    printf("Kolejka jest pusta\n");
                    break;
                }
            } else {
                printf("Odebrana dana: %d\n", received.counter);
            }
    }
    pthread_exit(NULL);
}

int main() {
    pthread_t t1, t2;
    pthread_attr_t attr1, attr2;
    struct sched_param param1, param2, param_main;
    cpu_set_t cpuset;

    struct mq_attr attr;
    sigset_t set;

    attr.mq_maxmsg = 10;
    attr.mq_msgsize = sizeof(struct message);
    attr.mq_curmsgs = 0;
    mq = mq_open("/queue", O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);

    if (mq != (mqd_t) -1) {
        mq_close(mq);
        mq_unlink("/queue"); // Usuń istniejącą kolejkę
    } else if (errno != ENOENT) {
        fprintf(stderr, "Nie można otworzyć kolejki komunikatów.\n");
        exit(EXIT_FAILURE);
    }

    mq = mq_open("/queue", O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr); // Utwórz nową kolejkę

    if (mq == (mqd_t) -1) {
        fprintf(stderr, "Nie można utworzyć kolejki komunikatów.\n");
        exit(EXIT_FAILURE);
    }

    sigemptyset(&set);
    sigaddset(&set, SIG);
    sigaddset(&set, SIG1);
    pthread_sigmask(SIG_BLOCK, &set, NULL);

    CPU_ZERO(&cpuset);
    CPU_SET(1, &cpuset); // Wybie¿ procesor nr 2 w systemie
    printf("Liczba dostepnych CPU: %d\n", (int) sysconf(_SC_NPROCESSORS_ONLN));

    param_main.sched_priority = sched_get_priority_max(SCHED_FIFO) - 3;
    pthread_setschedparam(pthread_self(), SCHED_FIFO, &param_main);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);

    pthread_attr_init(&attr1);
    pthread_attr_setscope(&attr1, PTHREAD_SCOPE_SYSTEM);
    pthread_attr_setinheritsched(&attr1, PTHREAD_EXPLICIT_SCHED);
    pthread_attr_setschedpolicy(&attr1, SCHED_FIFO);
    param1.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
    pthread_attr_setschedparam(&attr1, &param1);
    pthread_attr_setaffinity_np(&attr1, sizeof(cpu_set_t), &cpuset);
    pthread_create(&t1, &attr1, thread1, NULL);

    pthread_attr_init(&attr2);
    pthread_attr_setscope(&attr2, PTHREAD_SCOPE_SYSTEM);
    pthread_attr_setinheritsched(&attr2, PTHREAD_EXPLICIT_SCHED);
    pthread_attr_setschedpolicy(&attr2, SCHED_FIFO);
    param2.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
    pthread_attr_setschedparam(&attr2, &param2);
    pthread_create(&t2, &attr2, thread2, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    mq_close(mq);
    mq_unlink("/queue");

    pthread_attr_destroy(&attr1);
    pthread_attr_destroy(&attr2);

    return 0;
}
