dslinux/user/perl/lib/Thread Queue.pm Queue.t Semaphore.pm Semaphore.t

cayenne dslinux_cayenne at user.in-berlin.de
Tue Dec 5 05:27:19 CET 2006


Update of /cvsroot/dslinux/dslinux/user/perl/lib/Thread
In directory antilope:/tmp/cvs-serv7729/lib/Thread

Added Files:
	Queue.pm Queue.t Semaphore.pm Semaphore.t 
Log Message:
Adding fresh perl source to HEAD to branch from

--- NEW FILE: Queue.t ---
use warnings;

BEGIN {
    chdir 't' if -d 't';
    push @INC ,'../lib';
    require Config; import Config;
    unless ($Config{'useithreads'}) {
        print "1..0 # Skip: no ithreads\n";
        exit 0;
    }
}

use strict;
use threads;
use Thread::Queue;

my $q = new Thread::Queue;
$|++;
print "1..26\n";

my $test : shared = 1;

sub ok {
    lock($test);
    print "ok $test\n";
    $test++;
}

sub reader {
    my $tid = threads->self->tid;
    my $i = 0;
    while (1) {
	$i++;
#	print "reader (tid $tid): waiting for element $i...\n";
	my $el = $q->dequeue;
	ok();
# 	print "ok $test\n"; $test++;
#	print "reader (tid $tid): dequeued element $i: value $el\n";
	select(undef, undef, undef, rand(1));
	if ($el == -1) {
	    # end marker
#	    print "reader (tid $tid) returning\n";
	    return;
	}
    }
}

my $nthreads = 5;
my @threads;

for (my $i = 0; $i < $nthreads; $i++) {
    push @threads, threads->new(\&reader, $i);
}

for (my $i = 1; $i <= 20; $i++) {
    my $el = int(rand(100));
    select(undef, undef, undef, rand(1));
#    print "writer: enqueuing value $el\n";
    $q->enqueue($el);
}

$q->enqueue((-1) x $nthreads); # one end marker for each thread

for(@threads) {
#	print "waiting for join\n";
	$_->join();
}
ok();
#print "ok $test\n";



--- NEW FILE: Semaphore.t ---
use warnings;

BEGIN {
    chdir 't' if -d 't';
    push @INC ,'../lib';
    require Config; import Config;
    unless ($Config{'useithreads'}) {
        print "1..0 # Skip: no ithreads\n";
        exit 0;
    }
}

print "1..1\n";
use threads;
use Thread::Semaphore;
print "ok 1\n";


--- NEW FILE: Queue.pm ---
package Thread::Queue;

use threads::shared;
use strict;

our $VERSION = '2.00';

=head1 NAME

Thread::Queue - thread-safe queues

=head1 SYNOPSIS

    use Thread::Queue;
    my $q = new Thread::Queue;
    $q->enqueue("foo", "bar");
    my $foo = $q->dequeue;    # The "bar" is still in the queue.
    my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was empty
    my $left = $q->pending;   # returns the number of items still in the queue

=head1 DESCRIPTION

A queue, as implemented by C<Thread::Queue> is a thread-safe 
data structure much like a list.  Any number of threads can safely 
add elements to the end of the list, or remove elements from the head 
of the list. (Queues don't permit adding or removing elements from 
the middle of the list).

=head1 FUNCTIONS AND METHODS

=over 8

=item new

The C<new> function creates a new empty queue.

=item enqueue LIST

The C<enqueue> method adds a list of scalars on to the end of the queue.
The queue will grow as needed to accommodate the list.

=item dequeue

The C<dequeue> method removes a scalar from the head of the queue and
returns it. If the queue is currently empty, C<dequeue> will block the
thread until another thread C<enqueue>s a scalar.

=item dequeue_nb

The C<dequeue_nb> method, like the C<dequeue> method, removes a scalar from
the head of the queue and returns it. Unlike C<dequeue>, though,
C<dequeue_nb> won't block if the queue is empty, instead returning
C<undef>.

=item pending

The C<pending> method returns the number of items still in the queue.

=back

=head1 SEE ALSO

L<threads>, L<threads::shared>

=cut

sub new {
    my $class = shift;
    my @q : shared = @_;
    return bless \@q, $class;
}

sub dequeue  {
    my $q = shift;
    lock(@$q);
    cond_wait @$q until @$q;
    cond_signal @$q if @$q > 1;
    return shift @$q;
}

sub dequeue_nb {
    my $q = shift;
    lock(@$q);
    return shift @$q;
}

sub enqueue {
    my $q = shift;
    lock(@$q);
    push @$q, @_  and cond_signal @$q;
}

sub pending  {
    my $q = shift;
    lock(@$q);
    return scalar(@$q);
}

1;



--- NEW FILE: Semaphore.pm ---
package Thread::Semaphore;

use threads::shared;

our $VERSION = '2.01';

=head1 NAME

Thread::Semaphore - thread-safe semaphores

=head1 SYNOPSIS

    use Thread::Semaphore;
    my $s = new Thread::Semaphore;
    $s->down;	# Also known as the semaphore P operation.
    # The guarded section is here
    $s->up;	# Also known as the semaphore V operation.

    # The default semaphore value is 1.
    my $s = new Thread::Semaphore($initial_value);
    $s->down($down_value);
    $s->up($up_value);

=head1 DESCRIPTION

Semaphores provide a mechanism to regulate access to resources. Semaphores,
unlike locks, aren't tied to particular scalars, and so may be used to
control access to anything you care to use them for.

Semaphores don't limit their values to zero or one, so they can be used to
control access to some resource that there may be more than one of. (For
example, filehandles.) Increment and decrement amounts aren't fixed at one
either, so threads can reserve or return multiple resources at once.

=head1 FUNCTIONS AND METHODS

=over 8

=item new

=item new NUMBER

C<new> creates a new semaphore, and initializes its count to the passed
number. If no number is passed, the semaphore's count is set to one.

=item down

=item down NUMBER

The C<down> method decreases the semaphore's count by the specified number,
or by one if no number has been specified. If the semaphore's count would drop
below zero, this method will block until such time that the semaphore's
count is equal to or larger than the amount you're C<down>ing the
semaphore's count by.

This is the semaphore "P operation" (the name derives from the Dutch
word "pak", which means "capture" -- the semaphore operations were
named by the late Dijkstra, who was Dutch).

=item up

=item up NUMBER

The C<up> method increases the semaphore's count by the number specified,
or by one if no number has been specified. This will unblock any thread blocked
trying to C<down> the semaphore if the C<up> raises the semaphore count
above the amount that the C<down>s are trying to decrement it by.

This is the semaphore "V operation" (the name derives from the Dutch
word "vrij", which means "release").

=back

=cut

sub new {
    my $class = shift;
    my $val : shared = @_ ? shift : 1;
    bless \$val, $class;
}

sub down {
    my $s = shift;
    lock($$s);
    my $inc = @_ ? shift : 1;
    cond_wait $$s until $$s >= $inc;
    $$s -= $inc;
}

sub up {
    my $s = shift;
    lock($$s);
    my $inc = @_ ? shift : 1;
    ($$s += $inc) > 0 and cond_broadcast $$s;
}

1;




More information about the dslinux-commit mailing list