tinyboy-emulator/co/channel.c
2024-07-16 16:47:13 +09:00

357 lines
5.4 KiB
C

#include "taskimpl.h"
Channel*
newchan(int elemsize, int bufsize)
{
Channel* c;
c = malloc(sizeof *c + bufsize * elemsize);
if (c == 0) {
exit(1);
}
*c = (Channel){
.elemsize = elemsize,
.bufsize = bufsize,
.nbuf = 0,
.buf = (uchar*)&c[1],
};
return c;
}
void
deletechan(Channel* c)
{
if (c == 0)
return;
free(c->name);
free(c->asend.a);
free(c->arecv.a);
free(c);
}
static void
addarray(Altarray* a, Alt* alt)
{
if (a->n == a->m) {
a->m += 16;
a->a = realloc(a->a, a->m * sizeof a->a[0]);
if (a->a == nil) {
exit(1);
}
}
a->a[a->n++] = alt;
}
static void
amove(void* dst, void* src, uint n)
{
if (dst) {
if (src == nil)
memset(dst, 0, n);
else
memmove(dst, src, n);
}
}
static void
delarray(Altarray* a, int i)
{
--a->n;
a->a[i] = a->a[a->n];
}
static enum chan_op
otherop(enum chan_op op)
{
return (CHANSND + CHANRCV) - op;
}
static Altarray*
chanarray(Channel* c, enum chan_op op)
{
switch (op) {
default:
return nil;
case CHANSND:
return &c->asend;
case CHANRCV:
return &c->arecv;
}
}
/*
* 3 players: sender, reciver, 'channel itself'
* if chann.empty
* send(send to reciver);
*/
static void
altcopy(Alt* s, Alt* r)
{
Channel* c;
uchar* cp;
if (s == nil && r == nil)
return;
assert(s != nil); /* sender must not nil */
c = s->c;
if (s->op == CHANRCV) {
Alt* t = s;
s = r;
r = t;
}
assert(s == nil || s->op == CHANSND); /* sender */
assert(r == nil || r->op == CHANRCV); /* reciver */
/* Channel is unbufferd */
if (s && r && c->nbuf == 0) {
amove(r->v, s->v, c->elemsize);
return;
}
if (r) {
/* recive buffred data first */
cp = c->buf + c->off * c->elemsize;
amove(r->v, cp, c->elemsize);
--c->nbuf;
if (++c->off == c->bufsize)
c->off = 0;
}
if (s) {
cp = c->buf + (c->off + c->nbuf) % c->bufsize * c->elemsize;
amove(cp, s->v, c->elemsize);
++c->nbuf;
}
}
static void
altdeque(Alt* a)
{
Altarray* ar;
ar = chanarray(a->c, a->op);
if (ar == nil) {
exit(1);
}
for (uint i = 0; i < ar->n; ++i) {
if (ar->a[i] == a) {
delarray(ar, i);
return;
}
}
/* can't find self in altdq */
exit(1);
}
static void
altqueue(Alt* a)
{
Altarray* ar;
ar = chanarray(a->c, a->op);
addarray(ar, a);
}
static void
altalldeque(Alt* a)
{
for (int i = 0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) {
if (a[i].op != CHANNOP)
altdeque(&a[i]);
}
}
static void
altexec(Alt* a)
{
Altarray* ar;
Channel* c;
c = a->c;
ar = chanarray(c, otherop(a->op));
if (ar && ar->n) {
int i = rand() % ar->n; /* find any other palyer */
Alt* other = ar->a[i];
altcopy(a, other); /* copy data to other's buffer */
altalldeque(other->xalt); /* delete because now request is completed */
other->xalt[0].xalt = other;
taskready(other->task);
} else
altcopy(a, nil); /* if channel is unbufferd then do nothing */
}
static int
altcanexec(Alt* a)
{
Channel* c;
if (a->op == CHANNOP)
return 0;
c = a->c;
if (c->bufsize == 0) {
Altarray* ar = chanarray(c, otherop(a->op));
return ar && ar->n;
}
switch (a->op) {
default:
return 0;
case CHANSND:
return c->nbuf < c->bufsize;
case CHANRCV:
return c->nbuf > 0;
}
}
int
chanalt(Alt* a)
{
int i, j, ncan, n, canblock;
Task* t;
assertstack(512);
for (i = 0; a[i].op != CHANEND && a[i].op != CHANNOBLK; ++i)
;
n = i;
canblock = (a[i].op == CHANEND);
t = taskrunning;
for (i = 0; i < n; ++i) {
a[i].task = t;
a[i].xalt = a;
}
ncan = 0;
for (i = 0; i < n; i++) {
if (altcanexec(&a[i])) {
ncan++;
}
}
if (ncan) {
j = rand() % ncan;
for (i = 0; i < n; ++i) {
if (altcanexec(&a[i])) {
if (j-- == 0) {
altexec(&a[i]);
return i;
}
}
}
}
if (canblock == 0)
return -1;
for (i = 0; i < n; ++i) {
if (a[i].op != CHANNOP)
altqueue(&a[i]);
}
taskswitch();
return a[0].xalt - a; /* calculate offset */
}
static int
_chanop(Channel* c, int op, void* p, int canblock)
{
Alt ar[2];
ar[0] = (Alt){
.c = c,
.op = op,
.v = p,
};
ar[1] = (Alt){
.op = canblock ? CHANEND : CHANNOBLK,
};
if (chanalt(ar) < 0)
return -1;
return 1;
}
int
chansend(Channel* c, void* v)
{
return _chanop(c, CHANSND, v, 1);
}
int
chanbsend(Channel* c, void* v)
{
return _chanop(c, CHANSND, v, 0);
}
int
chanrecv(Channel* c, void* v)
{
return _chanop(c, CHANRCV, v, 1);
}
int
chanbrecv(Channel* c, void* v)
{
return _chanop(c, CHANSND, v, 0);
}
int
chansendp(Channel* c, void* v)
{
return _chanop(c, CHANSND, &v, 1);
}
void*
chanrecvp(Channel* c)
{
void* v;
_chanop(c, CHANRCV, &v, 1);
return v;
}
int
channbsendp(Channel* c, void* v)
{
return _chanop(c, CHANSND, (void*)&v, 0);
}
void*
channbrecvp(Channel* c)
{
void* v;
_chanop(c, CHANRCV, &v, 0);
return v;
}
int
chansendul(Channel* c, ulong v)
{
return _chanop(c, CHANSND, &v, 1);
}
ulong
chanrecvul(Channel* c)
{
ulong v;
_chanop(c, CHANRCV, &v, 1);
return v;
}
int
channbsendul(Channel* c, ulong v)
{
return _chanop(c, CHANSND, &v, 0);
}
int
channbrecvul(Channel* c, ulong v)
{
return _chanop(c, CHANRCV, &v, 0);
}
int
channbsend(Channel* c, void* v)
{
return _chanop(c, CHANSND, v, 0);
}
int
channbrecv(Channel* c, void* v)
{
return _chanop(c, CHANRCV, v, 0);
}