1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763
|
/*
FALCON - The Falcon Programming Language.
FILE: threading_ext.cpp
Threading module binding extensions.
-------------------------------------------------------------------
Author: Giancarlo Niccolai
Begin: Thu, 10 Apr 2008 00:44:09 +0200
-------------------------------------------------------------------
(C) Copyright 2008: the FALCON developers (see list in AUTHORS file)
See LICENSE file for licensing details.
*/
/** \file
Threading module binding extensions.
*/
#include <falcon/vm.h>
#include <falcon/autocstring.h>
#include <falcon/stringstream.h>
#include <falcon/rosstream.h>
#include <falcon/garbagepointer.h>
#include "threading_ext.h"
#include "threading_mod.h"
#include "threading_st.h"
/*#
@beginmodule feathers.threading
*/
namespace Falcon {
namespace Ext {
static void onMainOver( VMachine* vm )
{
ThreadImpl* impl = getRunningThread();
if ( impl != 0 )
{
impl->disengage();
setRunningThread(0);
}
}
static ThreadImpl* checkMainThread( VMachine* vm )
{
ThreadImpl* self_th = getRunningThread();
if( self_th == 0 )
{
self_th = new ThreadImpl( vm );
self_th->name( "__main__" );
setRunningThread( self_th );
vm->setFinalizeCallback( &onMainOver );
self_th->decref();
}
return self_th;
}
/*#
@group waiting_funcs Waitings
@brief Wating functions and methods.
All the functions and methods meant to wait for any Synchronization
Structure (or more in general, for any waitable object) share a common
semantic which has the following characteristics:
- Waiting functions can wait on one or more waitable objects to become
available for acquisition.
- Waiting functions have an optional timeout value. If it's not specified,
or if it's less than zero, then the wait functions will wait forever that
one of the waited items can be acquired. If it's zero, they will check if
one of the waited items is currently available, and return immediately
(eventually acquiring the item). If it's greater than zero, they will wait
for the specified amount of seconds and fractions, and return nil if the
waited items are not ready to be acquired by the expiration time.
- A succesful wait consists in the acquisition of the waitable item. Acquisition
may not be exclusive; some waitable items may be acquired by more threads.
- In case of a failed wait, nil is returned.
- All the waiting functions in this group respect the VM Interruption protocol;
they may raise an InterruptedError if thir thread receives a @a Thread.stop
request or a VM interruption request generated by other modules or embedding
applications.
*/
/*#
@class Thread
@from Waitable
@brief Parallel agent main class.
The Thread class controls the execution of parallel agents in the
Falcon threading model.
The applicaiton willing to launch parallel agents should either
derive the logical agent from the Thread class, or assign
the run method of a Thread instance to a function of choice.
In example, both the following methods are valid:
@code
class MyThread from Thread
function run()
> "Hello from the thread"
end
end
function parallel( param )
> "Parallel called from another thread with ", param
end
// launching a thread using OOP inheritance...
mt = MyThread()
mt.start()
// ... or using functional overload
th = Thread()
th.run = [parallel, "A parameter" ]
th.start()
@endcode
The static method @a Threading.start can be used to call an
arbitrary function without encapsulating it in a Thread instance.
The @a Threading class provides a set of static methods that can
be used both by Thread instances and plain parallel code which
has not been encapsulated in a instance of a class derived from
Thread.
The @a Thread.start method will create a new system thread and
a new Virtual Machine, configured as the machine in which it
was called, but completely empty. The instance that is going
to be started will be copied (via serialization) to the new
Virtual Machine, and if the operation can be completed, the
new Virtual Machine will execute the @a Thread.run method
of the transferred instance.
As the new copy of the VM is clean, global objects and
results of former elaboration in the calling VM are not
available to the newborn thread, unless they are passed
as members of the derived class or as parameters of the
run deferred call.
This means that any synchronization structure needed for the
thread must be set in the thread subclass instance (or in the
parameters of the deferred call to @b run) before the thread
is started. If provided with suitable structures that can
transfer data to different threads, more data can be exchanged
at a later time.
*/
FALCON_FUNC Thread_init( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
Item* i_name = vm->param(0);
ThreadImpl *th;
// setup the thread instance
if( i_name == 0 )
{
th = new ThreadImpl;
}
else {
if( ! i_name->isString() )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "[S]" ) );
}
else {
th = new ThreadImpl( *i_name->asString() );
}
}
self->setUserData( new ThreadCarrier( th ) );
}
/*#
@method start Thread
@brief Launches the thread in a parallel environment.
@raise ThreadError if the thread cannot be started.
This method checks for this instance to have a runnable @a Thread.run
method, and then for this object to be fully serializable. Once
the instance is readied to be transferred to a parallel environment,
the method crates a new Virtual Machine, linking all the modules that
are linked to the VM where the start method is called.
Finally, the instance is transferred to the other VM, de-serialized there
and readied for execution. When everything is ready, the new thread is
finally started and the run method is executed in the parallel environment.
This method represents a single point of discontinuity in the calling
application. On failure, an error is raised, reporting the details of
the problem. Problems preventing parallel execution may be due to system
resources (i.e. limited thread handles or memory limits) or to programming
error (i.e. having one of the properties in this instance being a complex
object that cannot be serialized).
On success, the method return and the other thread is started immediately,
or as soon as the system is able to start it.
If this object is connected to an already running thread, trying to start it
will raise a ThreadError.
It is possible to start again the thread after it has terminated and joined,
provided this is not a detached thread.
@see Thread.join
@see Thread.detach
*/
FALCON_FUNC Thread_start( VMachine *vm )
{
// get the thread implementation starting the new thread.
// we want to setup the main thread before it starts anything else.
checkMainThread( vm );
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
thread->vm().appSearchPath( vm->appSearchPath() );
// Require this instance to provide a runnable method
Item i_run;
if( ! self->getMethod( "run", i_run ) )
{
throw new ThreadError( ErrorParam( FALTH_ERR_NOTRUN, __LINE__ ).
desc( FAL_STR( th_msg_notrunnable ) ) );
}
// refuse to run if running, and atomically change to running.
if( ! thread->startable() )
{
throw new ThreadError( ErrorParam( FALTH_ERR_RUNNING, __LINE__ ).
desc( FAL_STR( th_msg_running ) ) );
}
// First link in falcon.core module.
Runtime rt;
LiveModule *fc = vm->findModule( "falcon.core" );
if ( 0 != fc )
rt.addModule( const_cast<Module *>(fc->module()) );
// The main module goes after.
LiveModule* mainMod = vm->mainModule();
// Prelink the modules into the new VM
const LiveModuleMap &mods = vm->liveModules();
MapIterator iter = mods.begin();
while( iter.hasCurrent() )
{
LiveModule *lmod = *(LiveModule **) iter.currentValue();
if( lmod != fc && lmod != mainMod )
{
Module *mod = const_cast<Module*>(lmod->module());
rt.addModule( mod, lmod->isPrivate() );
}
iter.next();
}
// finally, insert the main module
if ( mainMod != 0 )
rt.addModule( const_cast<Module*>(mainMod->module()), mainMod->isPrivate() );
// Do not set error handler; errors will emerge in the module.
if ( ! thread->vm().link( &rt ) )
{
throw new ThreadError( ErrorParam( FALTH_ERR_PREPARE, __LINE__ )
.desc( FAL_STR( th_msg_errlink ) ) );
}
// Save the item.
StringStream sstream(512); // a good prealloc size
vm->self().serialize( &sstream, true );
// restore it in the new vm
sstream.seekBegin(0);
Item i_remoteThread;
#ifndef NDEBUG
Item::e_sercode result =
#endif
i_remoteThread.deserialize( &sstream, &thread->vm() );
fassert( result == Item::sc_ok );
// Setup the thread into the thread data.
i_remoteThread.asObject()->getMethod( "run", i_run );
thread->prepareThreadInstance( i_remoteThread, i_run );
// our machine is ready to go.
if ( ! thread->start() )
{
throw new ThreadError( ErrorParam( FALTH_ERR_START, __LINE__ ).
desc( FAL_STR(th_msg_errstart) ) );
}
}
/*#
@method stop Thread
@brief Interrupts the target thread.
This method sends a kind request for wait interruption to the VM running the
target thread. The VM interruption request will stop waiting calls and
raise an InterruptedError in the target thread. The thread may either honor
the request and terminate as soon as it can or discard the signalation and
resume normal execution.
*/
FALCON_FUNC Thread_stop( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
thread->interruptWaits();
thread->vm().interrupt();
}
/*#
@method detach Thread
@brief Disengage this threads from waits and garbage.
This method sets the thread for which it has been called in "detached" status.
This means that the thread will just run in background for the rest of the
application life, or for how long it needs, and the main application doesn't
want to be notified about the thread termination, nor to control its behavior
anymore.
The net effect is that the thread is free to run as long as the application
is alive. The VM running the thread is not bound anymore to the calling VM,
and stopping and destroying the calling VM will have no effect on the thread.
Normally, when the thread object is garbage collected, or when the calling
VM is destroyed and its garbage freed, the collector joins the system thread
before destroying the Falcon thread instance. In other words, the destructor
will wait for the target thread to terminate naturally.
By detaching the thread, the caller signals that it doesn't want to use this thread object
anymore, and that the system thread associated with this instance is free to
run beyond the lifetime of this Falcon item.
As a side effect, it is not anymore possible to join this thread; threads eventually
waiting for this thread to end will receive a JoinError, as threds trying to
wait for its termination after that @b detach has been called.
The detached state is not reversible. Once detached, a thread is left to
its own destiny. However, it's still possible to communicate to it through
synchronization structures and through methods in this instance, as i.e. @a Thread.stop.
*/
FALCON_FUNC Thread_detach( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *th = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
// declare this thread data is free
if( ! th->detach() )
{
// then, also the th must be zero by definition.
// so we can ignore it.
throw new ThreadError( ErrorParam( FALTH_ERR_NOTRUNNING, __LINE__ ).
desc( FAL_STR( th_msg_notrunning ) ) );
}
}
static void internal_thread_wait_array( VMachine *vm, ThreadImpl *thread )
{
// threadWait applied to arrays
Waitable *waited[ MAX_WAITER_OBJECTS ];
Item *i_array = vm->param(0);
Item *i_timeout = vm->param(1);
if ( i_timeout != 0 && ! i_timeout->isOrdinal() )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ".. Waitable ..|A, [N]" ) );
}
int64 microsecs = i_timeout == 0 ? -1 : (int64)(i_timeout->forceNumeric() * 1000000.0);
CoreArray &items = *i_array->asArray();
if ( items.length() > MAX_WAITER_OBJECTS )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ">32" ) );
}
// parameter check
uint32 objId;
for( objId = 0; objId < items.length(); objId++ )
{
if ( items[objId].dereference()->isObject() )
{
CoreObject* obj = items[objId].dereference()->asObjectSafe();
if ( obj->derivedFrom( "Thread" ) )
{
waited[ objId ] = &static_cast< ThreadCarrier *>( obj->getUserData() )->thread()->status();
continue;
}
if ( obj->derivedFrom( "Waitable" ) )
{
waited[ objId ] = static_cast< WaitableCarrier *>( obj->getUserData() )->waitable();
continue;
}
}
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ".. Waitable ..|A, [N]" ) );
}
int64 res = thread->waitForObjects( objId, waited, microsecs );
// if the res value is -2, then we have been interrupted.
if ( res == -2 )
vm->interrupted( true, true, true );
else
vm->retval( res );
}
static void internal_thread_wait( VMachine *vm, ThreadImpl *thread )
{
int pcount = vm->paramCount();
if( pcount == 0 )
{
// yield?
vm->interrupted( true, true );
vm->retnil();
return;
}
else if ( pcount > MAX_WAITER_OBJECTS )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ">32" ) );
}
Waitable *waited[ MAX_WAITER_OBJECTS ];
// parameter check
int objId;
for( objId = 0; objId < pcount - 1; objId++ )
{
Item* param = vm->param(objId);
if ( param->isObject() )
{
CoreObject* obj = param->asObjectSafe();
if ( obj->derivedFrom( "Thread" ) )
{
waited[ objId ] = &static_cast< ThreadCarrier *>( obj->getUserData() )
->thread()->status();
continue;
}
if ( obj->derivedFrom( "Waitable" ) )
{
waited[ objId ] = static_cast< WaitableCarrier *>( obj->getUserData() )->
waitable();
continue;
}
}
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ".. Waitable ..|A, [N]" ) );
}
int64 microsecs;
// the last element may be a numeric...
if( vm->param( objId )->isOrdinal() )
{
microsecs = (int64)(vm->param( objId )->forceNumeric() * 1000000.0);
}
else {
microsecs = -1; // infinite wait
bool bDone = false;
Item* param = vm->param( objId );
if ( param->isObject() )
{
CoreObject* obj = param->asObjectSafe();
if ( obj->derivedFrom( "Thread" ) )
{
waited[ objId ] = &static_cast< ThreadCarrier *>( obj->getUserData() )
->thread()->status();
bDone = true;
}
else if ( obj->derivedFrom( "Waitable" ) )
{
waited[ objId ] = static_cast< WaitableCarrier *>( obj->getUserData() )->
waitable();
bDone = true;
}
}
if ( ! bDone )
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( ".. Waitable ..|A, [N]" ) );
objId++;
}
int res = thread->waitForObjects( objId, waited, microsecs );
if ( res == -1 )
vm->retnil();
else if ( res == -2 )
vm->interrupted( true, true, true );
else
vm->retval( *vm->param( res ) );
}
/*#
@method vwait Thread
@brief Wait for one or more synchronization strucures to become available.
@param structArray Array of structures to wait for
@optparam waitTime Maximum timeout in seconds and fractions.
@return nil if timeout expires, an ID in the @b structArray or the acquired
structure.
@raise InterrutpedError in case the thread receives a stop request.
@ingroup waiting_funcs
This method waits for one of the structures in the given @b structArray to
become acquireable, and acquires it before returning.
This works exactly as @a Thread.wait, but, on success, the method returns
the ID of the acquired item in @b structArray rather than the object itself.
In this way, it is possible to rotate or change the list of items on which
to wait for at each call.
@see Thread.wait
*/
FALCON_FUNC Thread_vwait( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
internal_thread_wait_array( vm, thread );
}
/*#
@method wait Thread
@brief Wait for one or more synchronization strucures to become available.
@param ... One or more synchronization structure to wait for.
@optparam waitTime Maximum timeout in seconds and fractions.
@return nil if timeout expires, or the acquired item on success.
@raise InterrutpedError in case the thread receives a stop request.
@ingroup waiting_funcs
This method waits for one of the structures in the given parameters to
become acquireable, and acquires it before returning.
The acquired structure must be released manually after the thread has used the
shared resource.
Typical usage pattern is that of acquiring the needed structures in the thread
main loop, work with the achieved structure and release it. Also, it is useful
to test for interruption and eventually honor the interruption request
as soon as possibile:
@code
class MyThread from Thread
...
function run()
loop
try
// wait at max 1 second.
res = self.wait( resA, resB, 1.0 )
catch InterruptedError
// honor the request
return
end
// what are we up to?
switch res
case nil
// timed out; perform some periodic operations
case resA
// do things with resA
resA.release()
case resB
// do things with resB
resB.release()
end
// do extra work with signaled data (*)
end
end
end
@endcode
The method tries to acquire the resource in the same order they are passed
as paramters. If the first resource is always available when the thread
enters the wait, this will actually prevent the thread from acquiring other
resources. As some resources can be acquired relatively often, it is necessary
to be careful about this aspect. Repeated acquisition and release may cause
starving of other threads and of other resources being in need of handling.
It is advisable to release the resources as soon as possible and perform work on
the shared data after they have been release, in the code section marked with (*).
Also, if the structured waited on may become available at the same time, it is
advisable to use @a Thread.vwait instead, to easily rotate the order in which
the call tries to acquire the resources.
@see Threading.wait
*/
FALCON_FUNC Thread_wait( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
internal_thread_wait( vm, thread );
}
/*#
@method getError Thread
@brief Get the error that cause a thread to terminate, if any.
@return nil if the thread terminated correctly, the error that caused thread
termination otherwise.
@raise JoinError if the thread is not terminated or detached.
This method return the item that was raised by a thread and that wasn't caught
at thread toplevel. If a thread terminated because of an unhandled error, and
not because of a clean exit from the run method, this method will return the
raised item, which is usually an item of class Error.
*/
FALCON_FUNC Thread_getError( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
if ( ! thread->isTerminated() )
{
throw new JoinError( ErrorParam( FALTH_ERR_NOTTERM, __LINE__ ).
desc( FAL_STR( th_msg_threadnotterm ) ) );
}
if ( thread->hadError() )
{
vm->retval( thread->exitError()->scriptize( vm ) );
}
else
vm->retnil();
}
/*#
@method getReturn Thread
@brief Get the return value that was returned by the thread main function.
@return The value returned by the thread main fucntion.
@raise JoinError if the thread is not terminated or detached.
This method return the item that was returned by the main thread function,
wihch is the @a Thread.run method. If the thread terminated without
returning any value, nil will be returned.
@note The caller should ascertain that the thread wasn't terminated by
an uncaught error with @a Thread.hadError before to call this method.
@note The value retreived is actually a local copy of the value returned by
the terminated thread. Changing it won't affect other threads willing to
read the original value. Also, if the returned value is not serializable,
this method will raise a CodeError.
*/
FALCON_FUNC Thread_getReturn( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
if ( ! thread->isTerminated() )
{
throw new JoinError( ErrorParam( FALTH_ERR_NOTTERM, __LINE__ ).
desc( FAL_STR( th_msg_threadnotterm ) ) );
}
StringStream sstream(512); // a good prealloc size
thread->vm().regA().serialize( &sstream, true );
// restore it in the new vm
sstream.seekBegin(0);
vm->regA().deserialize( &sstream, vm );
}
/*#
@method hadError Thread
@brief Returns true if the target thread was terminated because of an uncaught raised item.
@return True if the thread was terminated because of an uncaught raise, false if it
terminated correctly.
@raise JoinError if the thread is not terminated or detached.
*/
FALCON_FUNC Thread_hadError( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
if ( ! thread->isTerminated() )
{
throw new JoinError( ErrorParam( FALTH_ERR_NOTTERM, __LINE__ ).
desc( FAL_STR( th_msg_threadnotterm ) ) );
}
vm->regA().setBoolean( thread->hadError() );
}
/*#
@method terminated Thread
@brief Returns true if the target thread is terminated.
@return True if the thread is terminated, false otherwise.
The method will return true if the target thread is not running anymore,
either because of a correct terminationor because of an error.
*/
FALCON_FUNC Thread_terminated( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
vm->retval ( thread->isTerminated() );
}
/*#
@method detached Thread
@brief Returns true if the target thread is detached.
@return True if the thread has been detached, false otherwise.
*/
FALCON_FUNC Thread_detached( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
vm->retval ( thread->isDetached() );
}
/*#
@method getThreadID Thread
@brief Gets an unique numeric ID for this thread.
@return A numeric thread ID.
This is an unique counter assigned to each thread as they
are created.
*/
FALCON_FUNC Thread_getThreadID( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
vm->retval( (int64) thread->getID() );
}
/*#
@method getName Thread
@brief Sets the symbolic name of this thread.
@return A string containing the name of this thread (may be empty if not set).
@see Thread.setName
@see Thread.toString
*/
FALCON_FUNC Thread_getName( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
CoreString* cs = new CoreString( thread->name() );
cs->bufferize();
vm->retval( cs );
}
/*#
@method setName Thread
@brief Sets the symbolic name of this thread.
@param name The new name for this thread.
@see Thread.getName
*/
FALCON_FUNC Thread_setName( VMachine *vm )
{
Item* i_name = vm->param(0);
if ( i_name == 0 || ! i_name->isString() )
{
throw new JoinError( ErrorParam( FALTH_ERR_NOTTERM, __LINE__ ).
desc( FAL_STR( th_msg_threadnotterm ) ) );
}
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
thread->name( *i_name->asString() );
}
/*#
@method toString Thread
@brief Returns a string representation of this thread.
@return A string containing anagraphic data for this thread.
@see Thread.setName
*/
FALCON_FUNC Thread_toString( VMachine *vm )
{
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
// if we have no VM Thread object for this thread yet...
CoreString* cs = new CoreString( "Thread \"" );
*cs += thread->name();
*cs += "\" ";
cs->writeNumber( (int64) thread->getID() );
if ( thread->getSystemID() != 0 )
{
*cs += " [0x";
cs->writeNumberHex( thread->getSystemID() );
*cs += "]";
}
else {
*cs += " [not started]";
}
vm->retval( cs );
}
/*#
@method getSystemID Thread
@brief Gets the system low level thread ID for this thread.
@return A numeric thread ID.
UThis is the system ID for the thread that is being run in the
target Falcon object; for those systems that doesn't provide a numeric
thread ID, this method returns a pointer to the system resources
identifying the thread (as an integer value). It is always granted that
two different living threads have different identifiers, but a thread
ID may be re-assigned to newly started threads after previous one are
dead.
If the thread isn't started, the method returns a meaningless number.
*/
FALCON_FUNC Thread_getSystemID( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
CoreObject *self = vm->self().asObject();
ThreadImpl *thread = static_cast<ThreadCarrier *>( self->getUserData() )->thread();
vm->retval( (int64) thread->getSystemID() );
}
/*#
@method sameThread Thread
@brief Returns true if the givevn thread is running the same thread as this object.
@param otherThread Another thread to be checked against.
@return True if the system thread running in this Falcon objects are the same.
*/
FALCON_FUNC Thread_sameThread( VMachine *vm )
{
Item *pth = vm->param( 0 );
if ( pth == 0 || ! pth->isObject() || ! pth->asObject()->derivedFrom( "Thread" ) )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "Thread" ) );
}
ThreadImpl *th = static_cast<ThreadCarrier *>( vm->self().asObject()->getUserData() )->thread();
ThreadImpl *self_th = static_cast<ThreadCarrier *>( pth->asObject()->getUserData())->thread();
vm->retval( self_th->equal( *th ) );
}
/*#
@method join Thread
@brief Waits for a thread to terminate and returns its return value.
@return The exit value of the target thread.
@raise JoinError if the thread is detached.
@raise ThreadError if the thread didn't terminate correctly (i.e. raised an error).
This method is actually a shortcut for waiting the thread to become acquireable,
and then checking for its termination value. Unless it can be proven that this
is the only thread interested to the exit value of the thread to be joined,
it is preferrable to use the @a Thread.wait method, and then checking the
return value of the target thread with @a Thread.getReturn.
If the thread cannot be acquired after termination because it has been
detached, this method will raise a @a JoinError. If the target thread
exits with error (raises an item at toplevel) this method raises a @a ThreadError,
and the item raised by the target thread is set as "next" error in the raised
ThreadError.
@note This method doesn't use system level "join" call or equivalent. Falcon
calls join(), or equivalent means to dispose of the system-level thread resources
only at garbage collecting, or in case a thread instance is used to start another
thread after termination. In other words, it is NOT necessary to call this @b join
method on threads started through this API.
*/
FALCON_FUNC Thread_join( VMachine *vm )
{
ThreadImpl *th = static_cast<ThreadCarrier *>( vm->self().asObject()->getUserData() )->thread();
// if we have no VM Thread object for this thread yet...
ThreadImpl *self_th = checkMainThread( vm );
// join may be used right after a wait; it is sensible to perform some fast checks before
// re-waiting.
th->acquire();
if ( ! th->isTerminated() )
{
th->release(); // allow re-acquisition after wait.
// ^^^^^ this is safe. The worst thing that can happen is the thread being
// terminated and re-launched while we start to wait for it.
Waitable *waited[ 1 ];
waited[0] = &th->status();
int64 res = self_th->waitForObjects( 1, waited, -1 );
// if the res value is -2, then we have been interrupted.
if ( res == -2 )
{
vm->interrupted( true, true, true );
// if we're interrupted, we didn't acquire
return;
}
// is the thread detached? -- then we must raise a join error.
if ( th->isDetached() )
{
JoinError *therr = new JoinError( ErrorParam( FALTH_ERR_JOIN, __LINE__ ).
desc( FAL_STR( th_msg_ejoin ) ) );
// we didn't acquire the thread.
throw therr;
}
}
else {
// perform just a check for pending interruption
if ( vm->interrupted( true, true ) )
{
th->release();
return;
}
}
// Read its output values.
if ( th->hadError() )
{
th->release();
// we got to raise a threading error containing the output error of the other vm.
ThreadError *therr = new ThreadError( ErrorParam( FALTH_ERR_JOINE, __LINE__ ).
desc( FAL_STR( th_msg_joinwitherr ) ) );
therr->appendSubError( th->exitError() );
throw therr;
}
else {
// return the item in the output value
StringStream sstream(512); // a good prealloc size
th->vm().regA().serialize( &sstream, true );
// restore it in the new vm
sstream.seekBegin(0);
vm->regA().deserialize( &sstream, vm );
th->release();
}
}
/*#
@method run Thread
@brief Overloading hook that will hold the thread main function.
@return A value that will be available for inspection in other threads.
This method is called by the new thread when the @a Thread.start method is
called. The new thread executes the function in run(), and when the function
returns, the thread is terminated.
Other threads may wait for this thread to terminate through one of the @a waiting_funcs
The value returned by this method is made available to inspecting threads
through serialization.
*/
//=================================================================
// Waitable class.
//
/*#
@class Waitable
@brief Base abstract class for synchronization Structures.
Actually, this class represents the interface exposed by synchronization structures
to script. Objects derived from this class can be used in functions and methods in
the group @a waiting_funcs. Usually, the Waitable class is implemented by
Structures, but any object providing a mean to be acquired, released and waited for
a change in the acquirability state can be derived from this class.
Currently, only the @a Waitable.release method is exposed to scripts; this means
that scripts cannot create their own waitable objects, but only use those provided
by this or other modules.
*/
/*#
@method release Waitable
@brief Releases a structure acquired by a waiting function.
Unless the nature of the acquired object is known, and that object is
known not to require release, every acquired object must be explicitly
released after a succesful wait.
@see Thread.wait
*/
FALCON_FUNC Waitable_release( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
wc->waitable()->release();
}
//=================================================================
// Grant class.
//
/*#
@class Grant
@from Waitable
@brief Grant for exclusive access to shared resources.
This class can be sucessfully waited only by a thread at a time.
When a thread acquires it, other threads will have to wait for
the acquirer to release this with @a Waitable.release.
If the grant is currently available, waiting functions return
immediately with the Grant acquired.
This structure can be seen as a sort of a "reverse heavy weight mutex".
*/
FALCON_FUNC Grant_init( VMachine *vm )
{
Grant *grant = new Grant;
WaitableCarrier *wc = new WaitableCarrier( grant );
vm->self().asObject()->setUserData( wc );
grant->decref();
}
//=================================================================
// Barrier class.
//
/*#
@class Barrier
@from Waitable
@optparam mode Set to true to initialize the barrier to open.
@brief Gate controlling the transit of threads for certain operations.
The Barrier synchronization structure is a structure that can be
either acquired by all or none of the threads willing to acquire it.
If the barrier is open, then any wait on it will cause the calling
thread to acquire it and proceed immediately, while if it's closed
the waiting threads will be blocked forever, until the barrier
gets open from the outside.
The methods @b open and @b close control the behavior of the barrier.
One use for the barriers is that of communicating a pool of threads
a kind termination request; by sharing the barrier with all the threads
in the pool, a controlling thread may control their behavior; if they
wait on the barrier and on other resources, when the barrier is open
they will acquire it, and in this way they will know that is time for
a clean termination:
@code
class AThread( struct, bar ) from Thread
bar = bar
struct = struct
...
function run()
loop
acquired = self.wait( self.bar, self.struct )
if acquired == self.bar
// end...
return nil
end
//... work on self.struct
end
end
end
@endcode
Release is a no-op for a barrier.
@note By default, the barrier is created in closed status. To create it in
open status, pass the @b mode parameter as a true value.
*/
FALCON_FUNC Barrier_init( VMachine *vm )
{
bool bMode = vm->paramCount() > 0 ? vm->param(0)->isTrue() : false;
Barrier *barrier = new Barrier( bMode );
WaitableCarrier *wc = new WaitableCarrier( barrier );
vm->self().asObject()->setUserData( wc );
barrier->decref();
}
/*#
@method open Barrier
@brief Opens the barrier.
Allow all the waiting threads to pass through the barrier.
*/
FALCON_FUNC Barrier_open( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
static_cast< Barrier *>( wc->waitable() )->open();
}
/*#
@method close Barrier
@brief Closes the barrier.
Prevents any thread to acquire the barrier. From this moment on,
all the threads trying to wait on this barrier will block.
*/
FALCON_FUNC Barrier_close( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
static_cast< Barrier *>( wc->waitable() )->close();
}
//=====================================================
// Event
//
/*#
@class Event
@from Waitable
@optparam mode Set to true to create a manual reset event.
@brief Signaler of relevant processing conditions.
Falcon events can be used to signal that a certain condition
is met, that a certain resource has become ready to be processed,
or that shared data is now available for exclusive access.
Events start their activity in "reset state". When they are
reset, a wait on them blocks any tread. When they are in "set"
state, they become acquireable. Only one thread at a time can
acquire a set event, and the acquiring threads holds exclusively the
event structure untill it releases it.
When the event is acquired, if the event has been created as automatic,
it is automically reset. New set requests can be then issued both by
the acquiring thread and by other threads while the event is being
held and the data associated to the event is processed. If the event is
created in manual mode, the event is not reset at acquisition; if the
conditions that the event has signaled is not anymore valid, the acquiring
thread must reset the event before releasing it.
When the acquiring thread releases the event, if the event is still set (or
if it has been set after the automatic or manual reset), another waiting
thread can be immediately selected to proceed and acquire the event.
Events support late signaling. If the event is set when there isn't any
thread waiting for it, the first thread trying to wait on that will proceed immediately,
acquiring the event and being free to reset it.
@note The semantic of this structure is slightly different from the
sematic of the well known "Event Variable" in the MS-Windows SDK. Mainly,
Falcon events allows only to one thread at a time to proceed, and grant
atomicity of access to data associated with the event.
By default, this constructor creates an automatic event,
whose set status is automatically reset as a thread is able
to acquire it. To create a manual reset event, which must
be reset by the acquiring thread when it finds that other
threas could not progress, call this constructor with
@b mode set to true.
*/
FALCON_FUNC Event_init( VMachine *vm )
{
// defaults to true (autoreset)
bool bMode = vm->paramCount() > 0 ? vm->param(0)->isTrue() : true;
Event *event = new Event( bMode );
WaitableCarrier *wc = new WaitableCarrier( event );
vm->self().asObject()->setUserData( wc );
event->decref();
}
/*#
@method set Event
@brief Sets the Event and allow a waiting thread to proceed.
*/
FALCON_FUNC Event_set( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
static_cast< Event *>( wc->waitable() )->set();
}
/*#
@method reset Event
@brief Resets the event, preventing threads from acquiring it.
*/
FALCON_FUNC Event_reset( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
static_cast< Event *>( wc->waitable() )->reset();
}
//=====================================================
// Counter
//
/*#
@class SyncCounter
@brief Implements a synchronization counter (semaphore).
@optparam count Initial counter value (defaults to 0).
This class implements a synchronization counter, commonly
known as "semaphore", which provides the following behavior:
- If the counter is greater than zero, the item can be acquired, and
the counter is atomically decremented.
- If the counter is zero, the acquiring thread must wait for the counter
to become greater than zero.
- The release operation increments the counter and eventually wakes up
waiting threads.
- The counter provides also a @a SyncCounter.post method which may increase
the counter of more than one unit (allowing the structure to be acquired
by more than one thread).
We have adopted the "counter" name rather than the more common "semaphore" to
avoid confusion with the Semaphore class used for coroutines, and also because
the @b post semantic is merged with the @b release method.
The counter is created with an initial count that defaults to zero; this means that
the first thread trying to acquire this structure will block until a @b post
or @b release is issued.
If a positive interger is given as @b count, then the same amount of threads
will be able to acquire the semaphore before one thread being blocked.
*/
FALCON_FUNC SyncCounter_init( VMachine *vm )
{
Item *i_initCount = vm->param(0);
if ( i_initCount != 0 && ! i_initCount->isOrdinal() )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "[N]" ) );
}
// defaults to true (autoreset)
int iCount = (int) (i_initCount == 0 ? 0 : i_initCount->forceInteger());
SyncCounter *counter = new SyncCounter( iCount );
WaitableCarrier *wc = new WaitableCarrier( counter );
vm->self().asObject()->setUserData( wc );
counter->decref();
}
/*#
@method post SyncCounter
@brief Releases the counter or increases the counter by more than one.
@optparam count The number of signals to be posted to this semaphore.
This method acts as release(), but it can be provided an optional parameter
to give more than one thread the ability to acquire this structure.
It is not possible to use this method to reduce the internal count.
*/
FALCON_FUNC SyncCounter_post( VMachine *vm )
{
Item *i_initCount = vm->param(0);
if ( i_initCount != 0 && ! i_initCount->isOrdinal() )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "[N]" ) );
}
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
SyncCounter *syncc = static_cast< SyncCounter *>( wc->waitable() );
syncc->post( (int) (i_initCount == 0 ? 1 : i_initCount->forceInteger()) );
}
//=====================================================
// SyncQueue
//
/*#
@class SyncQueue
@from Waitable
@brief Signaler of relevant processing conditions.
This class implements a synchronized Falcon items FIFO or LIFO queue that can
be waited on for non-empty status.
A single waiting thread will acquire the queue when it is not empty;
at that point, it can dequeue one or more items being previously
pushed, released the queue and processed the dequeued items.
Holding the queue in acquired status prevents concurrent insertion
of new items, as well as removal, so it's advisable to release the
queue as soon as possible, that is, as soon as the items that must be
processed are retrieved.
@note Always remember that items in the queue are serialized copies coming
from the pushing VMs. Serialization is a relatively expensive operation for
non-simple types, and may cause error raising if the pushed items are not
serializable.
*/
FALCON_FUNC SyncQueue_init( VMachine *vm )
{
SyncQueue *synq = new SyncQueue( );
WaitableCarrier *wc = new WaitableCarrier( synq );
vm->self().asObject()->setUserData( wc );
synq->decref();
}
static void internal_SyncQueue_push( VMachine *vm, bool front )
{
if( vm->paramCount() != 1 )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "X" ) );
}
StringStream ss;
// reserve a bit of space
uint32 written = 0;
ss.write( &written, sizeof( written ) );
if ( vm->param(0)->serialize( &ss, true ) != Item::sc_ok )
{
throw new CodeError( ErrorParam( e_inv_params, __LINE__ ).
extra( "not serializable" ) );
}
// and now write the real size
ss.seekBegin( 0 );
written = ss.length() - sizeof( written );
ss.write( &written, sizeof( written ) );
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
SyncQueue *synq = static_cast< SyncQueue *>( wc->waitable() );
if ( front )
synq->pushFront( ss.closeToBuffer() );
else
synq->pushBack( ss.closeToBuffer() );
}
static void internal_SyncQueue_pop( VMachine *vm, bool front )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
SyncQueue *synq = static_cast< SyncQueue *>( wc->waitable() );
void *data;
bool bSuccess = front ? synq->popFront( data ) : synq->popBack( data );
if ( ! bSuccess )
{
throw new ThreadError( ErrorParam( FALTH_ERR_QEMPTY, __LINE__ ).
desc( FAL_STR( th_msg_qempty ) ) );
}
uint32 *written = (uint32 *) data;
ROStringStream ss( ((char*)data)+sizeof( uint32 ), *written );
Item retreived;
if ( retreived.deserialize( &ss, vm ) != Item::sc_ok )
{
memFree( data );
throw new ThreadError( ErrorParam( FALTH_ERR_DESERIAL, __LINE__ ).
desc( FAL_STR( th_msg_errdes ) ) );
}
memFree( data );
vm->retval( retreived );
}
/*#
@method push SyncQueue
@param item The item to be pushed
@brief Pushes an item at the end of the queue.
@raise CodeError if the @b item is not serializable.
This method adds an item at the end of the queue. If the
queue was empty, waiting threads may be signaled to receive the
added item.
The @b item parameter must be a serializable item, or a CodeError
will be raised.
*/
FALCON_FUNC SyncQueue_push( VMachine *vm )
{
internal_SyncQueue_push( vm, false );
}
/*#
@method pushFront SyncQueue
@param item The item to be pushed
@brief Pushes an item in front of the queue.
@raise CodeError if the @b item is not serializable.
This method adds an item in front of the queue. If the
queue was empty, waiting threads may be signaled to receive the
added item.
The @b item parameter must be a serializable item, or a CodeError
will be raised.
*/
FALCON_FUNC SyncQueue_pushFront( VMachine *vm )
{
internal_SyncQueue_push( vm, true );
}
/*#
@method pop SyncQueue
@brief Pops an item from the back of the queue.
@return The item that was at the end of the queue.
@raise ThreadError if the queue is empty.
This method removes an item from the end of the queue and
returns it to the caller.
*/
FALCON_FUNC SyncQueue_pop( VMachine *vm )
{
internal_SyncQueue_pop( vm, false );
}
/*#
@method popFront SyncQueue
@brief Pops an item from the front of the queue.
@return The item that was in front of the queue.
@raise ThreadError if the queue is empty.
This method removes an item in front of the queue and
returns it to the caller.
*/
FALCON_FUNC SyncQueue_popFront( VMachine *vm )
{
internal_SyncQueue_pop( vm, true );
}
/*#
@method empty SyncQueue
@brief Returns true if the queue is empty.
@return True if the queue is empty.
Although it is possible to call this method in any moment,
it is consistent to call it only when the queue has been
acquired through a succesful wait.
*/
FALCON_FUNC SyncQueue_empty( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
SyncQueue *synq = static_cast< SyncQueue *>( wc->waitable() );
vm->retval( synq->empty() );
}
/*#
@method size SyncQueue
@brief Returns the count of elements currently stored in the queue.
@return Number of elements currently held in the queue.
Although it is possible to call this method in any moment,
it is consistent to call it only when the queue has been
acquired through a succesful wait.
*/
FALCON_FUNC SyncQueue_size( VMachine *vm )
{
WaitableCarrier *wc = static_cast< WaitableCarrier *>( vm->self().asObject()->getUserData() );
SyncQueue *synq = static_cast< SyncQueue *>( wc->waitable() );
vm->retval( (int64) synq->size() );
}
//=====================================================
// Generic threading class
//
/*#
@class Threading
@brief Access to static method that can be used to access threading functionalities.
This class offers a namespace for generic methods provided by the Threading module.
The mehods in this class are all static and can be directly called by items
not derived from the @a Thread class to gain access to multithread functionalities.
*/
/*#
@method wait Threading
@brief Wait for one or more synchronization strucures to become available.
@param ... One or more synchronization structure to wait for.
@optparam waitTime Maximum timeout in seconds and fractions.
@return nil if timeout expires, or the acquired item on success.
@raise InterrutpedError in case the thread receives a stop request.
@ingroup waiting_funcs
This method waits for one of the structures in the given parameters to
become acquireable, and acquires it before returning.
@see Thread.wait
*/
FALCON_FUNC Threading_wait( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
ThreadImpl *th = checkMainThread( vm );
internal_thread_wait( vm, th );
}
/*#
@method vwait Threading
@brief Wait for one or more synchronization strucures to become available.
@param structArray Array of structures to wait for
@optparam waitTime Maximum timeout in seconds and fractions.
@return nil if timeout expires, an ID in the @b structArray or the acquired
structure.
@raise InterrutpedError in case the thread receives a stop request.
@ingroup waiting_funcs
This method waits for one of the structures in the given @b structArray to
become acquireable, and acquires it before returning.
This works exactly as @a Threading.wait, but, on success, the method returns
the ID of the acquired item in @b structArray rather than the object itself.
In this way, it is possible to rotate or change the list of items on which
to wait for at each call.
@see Thread.wait
*/
FALCON_FUNC Threading_vwait( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
ThreadImpl *th = checkMainThread( vm );
internal_thread_wait_array( vm, th );
}
/*#
@method getCurrentID Threading
@brief Returns the current thread ID.
@return A numeric ID uniquely identifying the current thread.
@see Thread.getThreadID
*/
FALCON_FUNC Threading_getCurrentID( VMachine *vm )
{
// if we have no VM Thread object for this thread yet...
vm->retval( (int64) SysThread::getCurrentID() );
}
/*#
@method getCurrent Threading
@brief Returns a Thread object built on the current system thread.
@return A new instance of @a Thread.
This method creates an instance of the @a Thread class referencing the
current system thread. The returned object can then be shared, sent to other
threads or used directly to control thraead execution.
*/
FALCON_FUNC Threading_getCurrent( VMachine *vm )
{
ThreadImpl *th = checkMainThread( vm );
Item *th_class = vm->findWKI( "Thread" );
fassert( th_class != 0 && th_class->isClass() );
CoreObject *thread = th_class->asClass()->createInstance();
ThreadCarrier *carrier = new ThreadCarrier( th );
thread->setUserData( carrier );
vm->retval( thread );
}
/*#
@method sameThread Threading
@brief Returns true if the given thread refers to the running system thread.
@param thread Instance of the @a Thread class to be compared.
@return True if @b thread is referencing the currently running thread.
*/
FALCON_FUNC Threading_sameThread( VMachine *vm )
{
Item *pth = vm->param( 0 );
if ( pth == 0 || ! pth->isObject() || ! pth->asObject()->derivedFrom( "Thread" ) )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "Thread" ) );
}
ThreadImpl *th = checkMainThread( vm );
ThreadImpl *self_th = static_cast<ThreadCarrier *>( pth->asObject()->getUserData())->thread();
vm->retval( self_th->equal( *th ) );
}
/*#
@method start Threading
@brief Starts a new thread and instructs it to execute the given callable item.
@param callable A Falcon callable item.
@raise ThreadError if the given object is not callable.
This method works as @a Thread.run, but instead requiring a @a Thread class
instance (or an instance of a derived class), it can start execution of an arbitrary
symbol, including a method of a totally unrelated class.
In example:
@code
function thread_a( sync, data )
...
end
class MyClass
function thread_b( sync )
...
end
end
...
// sync and data are shareable structures
Threading.start( .[thread_a sync data] )
myInst = MyClass()
Threading.start( .[ myInst.thread_b sync data] )
@endcode
*/
FALCON_FUNC Threading_start( VMachine *vm )
{
Item *i_routine = vm->param( 0 );
if ( i_routine == 0 || ! i_routine->isCallable() )
{
throw new ParamError( ErrorParam( e_inv_params, __LINE__ ).
extra( "C" ) );
}
// Create the runtime that will hold all the modules
ThreadImpl *thread = new ThreadImpl;
// refuse to run if running, and atomically change to running.
if( ! thread->startable() )
{
throw new ThreadError( ErrorParam( FALTH_ERR_RUNNING, __LINE__ ).
desc( FAL_STR( th_msg_running ) ) );
}
// First link in falcon.core module.
Runtime rt;
LiveModule *fc = vm->findModule( "falcon.core" );
if ( 0 != fc )
rt.addModule( const_cast<Module *>(fc->module()) );
// The main module goes after.
LiveModule* mainMod = vm->mainModule();
// Prelink the modules into the new VM
const LiveModuleMap &mods = vm->liveModules();
MapIterator iter = mods.begin();
while( iter.hasCurrent() )
{
LiveModule *lmod = *(LiveModule **) iter.currentValue();
if( lmod != fc && lmod != mainMod )
{
Module *mod = const_cast<Module*>(lmod->module());
rt.addModule( mod, lmod->isPrivate() );
}
iter.next();
}
// finally, insert the main module
if ( mainMod != 0 )
rt.addModule( const_cast<Module*>(mainMod->module()), mainMod->isPrivate() );
// Do not set error handler; errors will emerge in the module.
if ( ! thread->vm().link( &rt ) )
{
throw new ThreadError( ErrorParam( FALTH_ERR_PREPARE, __LINE__ )
.desc( FAL_STR( th_msg_errlink ) ) );
}
// Save the item.
StringStream sstream(512); // a good prealloc size
i_routine->serialize( &sstream, true );
// restore it in the new vm
sstream.seekBegin(0);
Item i_remoteRoutine, i_nil;
i_remoteRoutine.deserialize( &sstream, &thread->vm() );
// Setup the thread into the thread data.
thread->prepareThreadInstance( i_nil, i_remoteRoutine );
// our machine is ready to go.
if ( thread->start() )
{
// return the thread to our caller.
Item *th_class = vm->findWKI( "Thread" );
fassert( th_class != 0 && th_class->isClass() );
CoreObject *objThread = th_class->asClass()->createInstance();
ThreadCarrier *carrier = new ThreadCarrier( thread );
objThread->setUserData( carrier );
vm->retval( objThread );
}
else {
throw new ThreadError( ErrorParam( FALTH_ERR_START, __LINE__ ).
desc( FAL_STR(th_msg_errstart) ) );
}
}
//=====================================================
// ThreadError class
//
/*#
@class ThreadError
@brief Error generated by thread related problems.
@optparam code A numeric error code.
@optparam description A textual description of the error code.
@optparam extra A descriptive message explaining the error conditions.
@from Error code, description, extra
See the Error class in the core module.
*/
FALCON_FUNC ThreadError_init ( ::Falcon::VMachine *vm )
{
CoreObject *einst = vm->self().asObject();
if( einst->getUserData() == 0 )
einst->setUserData( new ThreadError );
::Falcon::core::Error_init( vm );
}
//=====================================================
// JoinError class
//
/*#
@class JoinError
@brief Error generated when trying to wait for a unwaitable thread.
@optparam code A numeric error code.
@optparam description A textual description of the error code.
@optparam extra A descriptive message explaining the error conditions.
@from Error code, description, extra
This error is created when a wait operation is performed on a thread
object representin an unjoinable thread.
See the Error class in the core module.
*/
FALCON_FUNC JoinError_init ( ::Falcon::VMachine *vm )
{
CoreObject *einst = vm->self().asObject();
if( einst->getUserData() == 0 )
einst->setUserData( new JoinError );
::Falcon::core::Error_init( vm );
}
}
}
/* end of threading_ext.cpp */
|